LegacyEndpointCache work

This commit is contained in:
Vinnie Falco
2013-10-04 20:38:46 -07:00
parent bb29c8ba85
commit dc8420d32d
8 changed files with 317 additions and 264 deletions

View File

@@ -32,11 +32,15 @@ struct LegacyEndpoint
LegacyEndpoint (IPEndpoint const& address_)
: address (address_)
, whenInserted (RelativeTime::fromStartup())
{ }
IPEndpoint address;
// When we last gave the endpoint out for connection attempts
// When we inserted the endpoint into the cache
RelativeTime mutable whenInserted;
// When we last used the endpoint for outging connection attempts
RelativeTime mutable lastGet;
// True if we ever tried to connect

View File

@@ -26,6 +26,9 @@ namespace PeerFinder {
/** A container for managing the cache of legacy endpoints. */
class LegacyEndpointCache
{
public:
typedef std::vector <LegacyEndpoint const*> FlattenedList;
private:
typedef boost::multi_index_container <
LegacyEndpoint, boost::multi_index::indexed_by <
@@ -36,11 +39,114 @@ private:
> MapType;
MapType m_map;
Store& m_store;
Journal m_journal;
int m_mutationCount;
//--------------------------------------------------------------------------
/** Updates the database with the cache contents. */
void update ()
{
FlattenedList list (flatten());
m_store.updateLegacyEndpoints (list);
m_journal.debug << "Updated " << list.size() << " legacy endpoints";
}
/** Increments the mutation count and updates the database if needed. */
void mutate ()
{
if (++m_mutationCount >= legacyEndpointMutationsPerUpdate)
{
update();
m_mutationCount = 0;
}
}
/** Returns a flattened array of pointers to the legacy endpoints. */
FlattenedList flatten () const
{
FlattenedList list;
list.reserve (m_map.size());
for (MapType::iterator iter (m_map.begin());
iter != m_map.end(); ++iter)
list.push_back (&*iter);
return list;
}
/** Prune comparison function, strict weak ordering on desirability. */
struct PruneLess
{
static int checkedScore (LegacyEndpoint const& ep)
{
return ep.checked ? (ep.canAccept ? 2 : 1) : 0;
}
bool operator() (LegacyEndpoint const* lhs,
LegacyEndpoint const* rhs) const
{
// prefer checked and canAccept
int const checkedCompare (
checkedScore (*lhs) - checkedScore (*rhs));
if (checkedCompare > 0)
return true;
else if (checkedScore < 0)
return false;
// prefer newer entries
if (lhs->whenInserted > rhs->whenInserted)
return true;
else if (lhs->whenInserted < rhs->whenInserted)
return false;
return false;
}
};
/** Sort endpoints by desirability and discard the bottom half. */
void prune()
{
FlattenedList list (flatten());
if (list.size () < 3)
return;
std::random_shuffle (list.begin(), list.end());
std::sort (list.begin(), list.end(), PruneLess());
FlattenedList::const_iterator pos (list.begin() + list.size()/2 + 1);
std::size_t const n (m_map.size() - (pos - list.begin()));
MapType map;
for (FlattenedList::const_iterator iter (list.begin());
iter != pos; ++iter)
map.insert (**iter);
std::swap (map, m_map);
m_journal.info << "Pruned " << n << " legacy endpoints";
mutate();
}
/** Get comparison function. */
struct GetLess
{
bool operator() (LegacyEndpoint const* lhs,
LegacyEndpoint const* rhs) const
{
// Always prefer entries we tried longer ago. This should
// cycle through the entire cache before re-using an address
// for making a connection attempt.
//
if (lhs->lastGet < rhs->lastGet)
return true;
else if (lhs->lastGet > rhs->lastGet)
return false;
// Fall back to the prune desirability comparison
return PruneLess() (lhs, rhs);
}
};
public:
typedef std::vector <LegacyEndpoint const*> FlattenedList;
LegacyEndpointCache ()
LegacyEndpointCache (Store& store, Journal journal)
: m_store (store)
, m_journal (journal)
, m_mutationCount (0)
{
}
@@ -48,6 +154,23 @@ public:
{
}
/** Load the legacy endpoints cache from the database. */
void load ()
{
typedef std::vector <IPEndpoint> List;
List list;
m_store.loadLegacyEndpoints (list);
std::size_t n (0);
for (List::const_iterator iter (list.begin());
iter != list.end(); ++iter)
{
std::pair <LegacyEndpoint&, bool> result (insert (*iter));
if (result.second)
++n;
}
m_journal.debug << "Loaded " << n << " legacy endpoints";
}
/** Attempt to insert the endpoint.
The caller is responsible for making sure the address is valid.
The return value provides a reference to the new or existing endpoint.
@@ -57,10 +180,14 @@ public:
{
std::pair <MapType::iterator, bool> result (
m_map.insert (LegacyEndpoint (address)));
if (m_map.size() > legacyEndpointCacheSize)
prune();
if (result.second)
mutate();
return std::make_pair (*result.first, result.second);
}
/** Returns a pointer to the legacy endpoint or nullptr. */
/** Returns a pointer to the legacy endpoint if it exists, else nullptr. */
LegacyEndpoint const* find (IPEndpoint const& address)
{
MapType::iterator iter (m_map.find (address));
@@ -79,24 +206,19 @@ public:
{
endpoint->checked = true;
endpoint->canAccept = canAccept;
mutate();
}
}
struct Compare
{
bool operator() (LegacyEndpoint const* lhs,
LegacyEndpoint const* rhs) const
{
return lhs->lastGet < rhs->lastGet;
}
};
/** Appends up to n addresses for establishing outbound peers. */
/** Appends up to n addresses for establishing outbound peers.
Also updates the lastGet field of the LegacyEndpoint so we will avoid
re-using the address until we have tried all the others.
*/
void get (std::size_t n, std::vector <IPEndpoint>& result) const
{
FlattenedList list (flatten());
std::random_shuffle (list.begin(), list.end());
std::sort (list.begin(), list.end(), Compare());
std::sort (list.begin(), list.end(), GetLess());
n = std::min (n, list.size());
RelativeTime const now (RelativeTime::fromStartup());
for (FlattenedList::iterator iter (list.begin());
@@ -104,18 +226,7 @@ public:
{
result.push_back ((*iter)->address);
(*iter)->lastGet = now;
}
}
/** Returns a flattened array of pointers to the legacy endpoints. */
FlattenedList flatten () const
{
FlattenedList list;
list.reserve (m_map.size());
for (MapType::iterator iter (m_map.begin());
iter != m_map.end(); ++iter)
list.push_back (&*iter);
return list;
}
}
};

View File

@@ -23,38 +23,6 @@
namespace ripple {
namespace PeerFinder {
//--------------------------------------------------------------------------
/*
typedef boost::multi_index_container <
Endpoint, boost::multi_index::indexed_by <
boost::multi_index::hashed_unique <
BOOST_MULTI_INDEX_MEMBER(PeerFinder::Endpoint,IPEndpoint,address)>
>
> EndpointCache;
*/
// Describes an Endpoint in the global Endpoint table
// This includes the Endpoint as well as some additional information
//
struct EndpointInfo
{
Endpoint endpoint;
};
inline bool operator< (EndpointInfo const& lhs, EndpointInfo const& rhs)
{
return lhs.endpoint < rhs.endpoint;
}
inline bool operator== (EndpointInfo const& lhs, EndpointInfo const& rhs)
{
return lhs.endpoint == rhs.endpoint;
}
//--------------------------------------------------------------------------
typedef boost::multi_index_container <
PeerInfo, boost::multi_index::indexed_by <
boost::multi_index::hashed_unique <
@@ -105,6 +73,7 @@ public:
Config m_config;
// A list of dynamic sources consulted as a fallback
// VFALCO TODO Replace with SharedPtr <Source>
std::vector <ScopedPointer <Source> > m_sources;
// The current tally of peer slot statistics
@@ -114,7 +83,6 @@ public:
Peers m_peers;
LegacyEndpointCache m_legacyCache;
bool m_legacyCacheDirty;
//----------------------------------------------------------------------
@@ -127,7 +95,7 @@ public:
, m_store (store)
, m_checker (checker)
, m_journal (journal)
, m_legacyCacheDirty (false)
, m_legacyCache (store, journal)
{
}
@@ -137,20 +105,7 @@ public:
//
void load ()
{
typedef std::vector <IPEndpoint> List;
List list;
m_store.loadLegacyEndpoints (list);
for (List::const_iterator iter (list.begin());
iter != list.end(); ++iter)
m_legacyCache.insert (*iter);
m_journal.debug << "Loaded " << list.size() << " legacy endpoints";
}
// Called when a peer's id is unexpectedly not found
//
void peerNotFound (PeerID const& id)
{
m_journal.fatal << "Missing peer " << id;
m_legacyCache.load();
}
// Returns a suitable Endpoint representing us.
@@ -171,15 +126,23 @@ public:
return ep;
}
// Returns true if the IPEndpoint contains no invalid data.
//
bool validIPEndpoint (IPEndpoint const& address)
{
if (! address.isPublic())
return false;
if (address.port() == 0)
return false;
return true;
}
// Returns true if the Endpoint contains no invalid data.
//
bool validEndpoint (Endpoint const& endpoint)
{
if (! endpoint.address.isPublic())
return false;
if (endpoint.port == 0)
return false;
return false;
return validIPEndpoint (
endpoint.address.withPort (endpoint.port));
}
// Prunes invalid endpoints from a list
@@ -228,7 +191,9 @@ public:
{
if (m_slots.outDesired > m_slots.outboundCount)
{
int const needed (m_slots.outDesired - m_slots.outboundCount);
int const needed (std::min (
m_slots.outDesired - m_slots.outboundCount,
int (maxAddressesPerAttempt)));
std::vector <IPEndpoint> list;
m_legacyCache.get (needed, list);
@@ -242,9 +207,9 @@ public:
//
void fetch (Source& source)
{
#if 0
m_journal.debug << "Fetching " << source.name();
#if 0
Source::IPEndpoints endpoints;
source.fetch (endpoints, m_journal);
@@ -254,18 +219,19 @@ public:
iter != endpoints.end(); ++iter)
m_legacyCache->insert (*iter);
if (m_legacyCache->size() > (numberOfLegacyEndpoints/2))
if (m_legacyCache->size() > (legacyEndpointCacheSize/2))
{
m_legacyCache.swap();
m_legacyCache->clear();
}
m_legacyCacheDirty = true;
}
#endif
}
//----------------------------------------------------------------------
//
// Logic
//
void setConfig (Config const& config)
{
@@ -289,6 +255,59 @@ public:
m_journal.debug << "Processing Update";
}
//--------------------------------------------------------------------------
//
// LegacyEndpoint
//
// Completion handler for a LegacyEndpoint listening test.
//
void onCheckLegacyEndpoint (IPEndpoint const& endpoint,
Checker::Result const& result)
{
if (result.error == boost::asio::error::operation_aborted)
return;
RelativeTime const now (RelativeTime::fromStartup());
if (! result.error)
{
if (result.canAccept)
m_journal.info << "Legacy address " << endpoint <<
" passed listening test";
else
m_journal.warning << "Legacy address " << endpoint <<
" cannot accept incoming connections";
}
else
{
m_journal.error << "Listening test for legacy address " <<
endpoint << " failed: " << result.error.message();
}
}
void onPeerLegacyEndpoint (IPEndpoint const& address)
{
if (! validIPEndpoint (address))
return;
std::pair <LegacyEndpoint&, bool> result (
m_legacyCache.insert (address));
if (result.second)
{
// its new
m_journal.trace << "New legacy endpoint: " << address;
// VFALCO NOTE Temporarily we are doing a check on each
// legacy endpoint to test the async code
//
m_checker.async_test (address, bind (
&Logic::onCheckLegacyEndpoint,
this, address, _1));
}
}
//--------------------------------------------------------------------------
// Send mtENDPOINTS for each peer as needed
//
void sendEndpoints ()
@@ -313,48 +332,41 @@ public:
}
}
// Called when a peer connection is established.
// We are guaranteed that the PeerID is not already in our map.
//
void onPeerConnected (PeerID const& id,
IPEndpoint const& address, bool inbound)
{
m_journal.debug << "Peer connected: " << address;
// If this is outgoing, record the success
if (! inbound)
m_legacyCache.checked (address, true);
std::pair <Peers::iterator, bool> result (
m_peers.insert (
PeerInfo (id, address, inbound)));
if (result.second)
{
//PeerInfo const& peer (*result.first);
m_slots.addPeer (m_config, inbound);
}
else
{
// already exists!
m_journal.error << "Duplicate peer " << id;
//m_callback.disconnectPeer (id);
}
bassert (result.second);
m_slots.addPeer (m_config, inbound);
}
// Called when a peer is disconnected.
// We are guaranteed to get this exactly once for each
// corresponding call to onPeerConnected.
//
void onPeerDisconnected (PeerID const& id)
{
Peers::iterator iter (m_peers.find (id));
if (iter != m_peers.end())
{
// found
PeerInfo const& peer (*iter);
m_journal.debug << "Peer disconnected: " << peer.address;
m_slots.dropPeer (m_config, peer.inbound);
m_peers.erase (iter);
}
else
{
m_journal.debug << "Peer disconnected: " << id;
peerNotFound (id);
}
bassert (iter != m_peers.end());
PeerInfo const& peer (*iter);
m_journal.debug << "Peer disconnected: " << peer.address;
m_slots.dropPeer (m_config, peer.inbound);
m_peers.erase (iter);
}
// Called when the Checker completes a connectivity test
//
void onCheckEndpoint (PeerID const& id, Checker::Result const& result)
void onCheckEndpoint (PeerID const& id,
IPEndpoint address, Checker::Result const& result)
{
if (result.error == boost::asio::error::operation_aborted)
return;
@@ -395,147 +407,73 @@ public:
}
}
// Called when the Checker completes a connectivity test for a legacy address
//
void onCheckLegacyEndpoint (IPEndpoint const& endpoint,
Checker::Result const& result)
{
if (result.error == boost::asio::error::operation_aborted)
return;
RelativeTime const now (RelativeTime::fromStartup());
if (! result.error)
{
if (result.canAccept)
m_journal.info << "Legacy address " << endpoint <<
" passed listening test";
else
m_journal.warning << "Legacy address " << endpoint <<
" cannot accept incoming connections";
}
else
{
m_journal.error << "Listening test for legacy address " <<
endpoint << " failed: " << result.error.message();
}
}
// Processes a list of Endpoint received from a peer.
//
void onPeerEndpoints (PeerID const& id, std::vector <Endpoint> endpoints)
{
pruneEndpoints (endpoints);
pruneEndpoints (endpoints);
Peers::iterator iter (m_peers.find (id));
if (iter != m_peers.end())
bassert (iter != m_peers.end());
RelativeTime const now (RelativeTime::fromStartup());
PeerInfo const& peer (*iter);
if (now >= peer.whenReceiveEndpoints)
{
RelativeTime const now (RelativeTime::fromStartup());
PeerInfo const& peer (*iter);
m_journal.debug << "Received " << endpoints.size() <<
"Endpoint descriptors from " << peer.address;
if (now >= peer.whenReceiveEndpoints)
// We charge a load penalty if the peer sends us more than
// numberOfEndpoints peers in a single message
if (endpoints.size() > numberOfEndpoints)
{
m_journal.debug << "Received " << endpoints.size() <<
"Endpoint descriptors from " << peer.address;
// We charge a load penalty if the peer sends us more than
// numberOfEndpoints peers in a single message
if (endpoints.size() > numberOfEndpoints)
{
m_journal.warning << "Charging peer " << peer.address <<
" for sending too many endpoints";
m_journal.warning << "Charging peer " << peer.address <<
" for sending too many endpoints";
m_callback.chargePeerLoadPenalty(id);
}
m_callback.chargePeerLoadPenalty(id);
}
// process the list
// process the list
{
bool foundZeroHops (false);
bool chargedPenalty (false);
for (std::vector <Endpoint>::const_iterator iter (endpoints.begin());
iter != endpoints.end(); ++iter)
{
bool foundZeroHops (false);
bool chargedPenalty (false);
for (std::vector <Endpoint>::const_iterator iter (endpoints.begin());
iter != endpoints.end(); ++iter)
Endpoint const& endpoint (*iter);
if (endpoint.hops == 0)
{
Endpoint const& endpoint (*iter);
if (endpoint.hops == 0)
if (! foundZeroHops)
{
if (! foundZeroHops)
{
foundZeroHops = true;
m_checker.async_test (endpoint.address.withPort (
endpoint.port), bind (&Logic::onCheckEndpoint,
this, id, _1));
}
else if (! chargedPenalty)
{
// Only charge them once (?)
chargedPenalty = true;
// More than one zero-hops message?!
m_journal.warning << "Charging peer " << peer.address <<
" for sending more than one hops==0 endpoint";
m_callback.chargePeerLoadPenalty (id);
}
foundZeroHops = true;
IPEndpoint const address (
endpoint.address.withPort (endpoint.port));
m_checker.async_test (address, bind (&Logic::onCheckEndpoint,
this, id, address, _1));
}
else if (! chargedPenalty)
{
// Only charge them once (?)
chargedPenalty = true;
// More than one zero-hops message?!
m_journal.warning << "Charging peer " << peer.address <<
" for sending more than one hops==0 endpoint";
m_callback.chargePeerLoadPenalty (id);
}
}
}
}
peer.whenReceiveEndpoints = now + secondsPerEndpoints;
}
else
{
m_journal.warning << "Charging peer " << peer.address <<
" for sending too quickly";
m_callback.chargePeerLoadPenalty (id);
}
peer.whenReceiveEndpoints = now + secondsPerEndpoints;
}
else
{
peerNotFound (id);
m_journal.warning << "Charging peer " << peer.address <<
" for sending too quickly";
m_callback.chargePeerLoadPenalty (id);
}
}
void onPeerLegacyEndpoint (IPEndpoint const& ep)
{
// filter invalid addresses
if (! ep.isPublic())
return;
if (ep.port() == 0)
return;
std::pair <LegacyEndpoint&, bool> result (
m_legacyCache.insert (ep));
if (result.second)
{
// its new
m_legacyCacheDirty = true;
m_journal.trace << "Legacy endpoint: " << ep;
m_checker.async_test (ep, bind (
&Logic::onCheckLegacyEndpoint,
this, ep, _1));
}
}
// Updates the Store with the current set of legacy endpoints
//
void storeLegacyEndpoints ()
{
if (!m_legacyCacheDirty)
return;
#if 0
std::vector <IPEndpoint> list;
createLegacyEndpointList (list);
m_journal.debug << "Updating " << list.size() << " legacy endpoints";
m_store.storeLegacyEndpoints (list);
m_legacyCacheDirty = false;
#endif
}
};
}

View File

@@ -189,7 +189,6 @@ public:
Logic m_logic;
DeadlineTimer m_connectTimer;
DeadlineTimer m_endpointsTimer;
RelativeTime m_whenStoreLegacyEndpoints;
//--------------------------------------------------------------------------
@@ -311,18 +310,6 @@ public:
}
}
// Checks to see if its time to update legacy endpoints
void storeLegacyEndpoints()
{
RelativeTime const now (RelativeTime::fromStartup());
if (now >= m_whenStoreLegacyEndpoints)
{
m_logic.storeLegacyEndpoints ();
m_whenStoreLegacyEndpoints = now
+ RelativeTime (legacyEndpointUpdateSeconds);
}
}
void init ()
{
m_journal.debug << "Initializing";
@@ -355,14 +342,10 @@ public:
{
m_journal.debug << "Started";
m_whenStoreLegacyEndpoints = RelativeTime::fromStartup()
+ RelativeTime (legacyEndpointUpdateSeconds);
init ();
while (! this->threadShouldExit())
{
storeLegacyEndpoints();
m_queue.run_one();
}

View File

@@ -32,8 +32,8 @@ public:
virtual void loadLegacyEndpoints (
std::vector <IPEndpoint>& list) = 0;
virtual void storeLegacyEndpoints (
std::vector <IPEndpoint> const& list) = 0;
virtual void updateLegacyEndpoints (
std::vector <LegacyEndpoint const*> const& list) = 0;
};
}

View File

@@ -100,10 +100,10 @@ public:
}
}
void storeLegacyEndpoints (
std::vector <IPEndpoint> const& list)
void updateLegacyEndpoints (
std::vector <LegacyEndpoint const*> const& list)
{
typedef std::vector <IPEndpoint> List;
typedef std::vector <LegacyEndpoint const*> List;
Error error;
@@ -127,7 +127,7 @@ public:
for (List::const_iterator iter (list.begin());
!error && iter != list.end(); ++iter)
{
IPEndpoint const& ep (*iter);
IPEndpoint const& ep ((*iter)->address);
s = ep.to_string();
st.execute_and_fetch (error);
}

View File

@@ -26,23 +26,41 @@ namespace PeerFinder {
// Tunable constants
enum
{
//---------------------------------------------------------
//
// Connection policy settings
//
// How often we will try to make outgoing connections
secondsPerConnect = 10,
secondsPerConnect = 10
// The largest connections we will attempt simultaneously
,maxAddressesPerAttempt = 30
//---------------------------------------------------------
//
// Endpoint settings
//
// How often we send or accept mtENDPOINTS messages per peer
secondsPerEndpoints = 5,
,secondsPerEndpoints = 5
// How many Endpoint to send in each mtENDPOINTS
numberOfEndpoints = 10,
,numberOfEndpoints = 10
// The most Endpoint we will accept in mtENDPOINTS
numberOfEndpointsMax = 20,
,numberOfEndpointsMax = 20
//---------------------------------------------------------
//
// LegacyEndpoint Settings
//
// How many legacy endpoints to keep in our cache
numberOfLegacyEndpoints = 1000,
,legacyEndpointCacheSize = 1000
// How often legacy endpoints are updated in the database
legacyEndpointUpdateSeconds = 60 * 60
// How many cache mutations between each database update
,legacyEndpointMutationsPerUpdate = 50
};
}

View File

@@ -40,18 +40,17 @@ namespace ripple {
using namespace beast;
}
# include "impl/Tuning.h"
# include "impl/Checker.h"
#include "impl/CheckerAdapter.h"
#include "impl/EndpointCache.h"
#include "impl/Slots.h"
#include "impl/Source.h"
#include "impl/SourceStrings.h"
# include "impl/LegacyEndpoint.h"
# include "impl/Store.h"
# include "impl/LegacyEndpointCache.h"
# include "impl/PeerInfo.h"
# include "impl/Store.h"
# include "impl/Tuning.h"
#include "impl/StoreSqdb.h"
#include "impl/Logic.h"