diff --git a/src/ripple/peerfinder/impl/LegacyEndpoint.h b/src/ripple/peerfinder/impl/LegacyEndpoint.h index 2d1632597..4480f36d3 100644 --- a/src/ripple/peerfinder/impl/LegacyEndpoint.h +++ b/src/ripple/peerfinder/impl/LegacyEndpoint.h @@ -32,11 +32,15 @@ struct LegacyEndpoint LegacyEndpoint (IPEndpoint const& address_) : address (address_) + , whenInserted (RelativeTime::fromStartup()) { } IPEndpoint address; - // When we last gave the endpoint out for connection attempts + // When we inserted the endpoint into the cache + RelativeTime mutable whenInserted; + + // When we last used the endpoint for outging connection attempts RelativeTime mutable lastGet; // True if we ever tried to connect diff --git a/src/ripple/peerfinder/impl/LegacyEndpointCache.h b/src/ripple/peerfinder/impl/LegacyEndpointCache.h index 7971bc5b5..50b143c98 100644 --- a/src/ripple/peerfinder/impl/LegacyEndpointCache.h +++ b/src/ripple/peerfinder/impl/LegacyEndpointCache.h @@ -26,6 +26,9 @@ namespace PeerFinder { /** A container for managing the cache of legacy endpoints. */ class LegacyEndpointCache { +public: + typedef std::vector FlattenedList; + private: typedef boost::multi_index_container < LegacyEndpoint, boost::multi_index::indexed_by < @@ -36,11 +39,114 @@ private: > MapType; MapType m_map; + Store& m_store; + Journal m_journal; + int m_mutationCount; + + //-------------------------------------------------------------------------- + + /** Updates the database with the cache contents. */ + void update () + { + FlattenedList list (flatten()); + m_store.updateLegacyEndpoints (list); + m_journal.debug << "Updated " << list.size() << " legacy endpoints"; + } + + /** Increments the mutation count and updates the database if needed. */ + void mutate () + { + if (++m_mutationCount >= legacyEndpointMutationsPerUpdate) + { + update(); + m_mutationCount = 0; + } + } + + /** Returns a flattened array of pointers to the legacy endpoints. */ + FlattenedList flatten () const + { + FlattenedList list; + list.reserve (m_map.size()); + for (MapType::iterator iter (m_map.begin()); + iter != m_map.end(); ++iter) + list.push_back (&*iter); + return list; + } + + /** Prune comparison function, strict weak ordering on desirability. */ + struct PruneLess + { + static int checkedScore (LegacyEndpoint const& ep) + { + return ep.checked ? (ep.canAccept ? 2 : 1) : 0; + } + + bool operator() (LegacyEndpoint const* lhs, + LegacyEndpoint const* rhs) const + { + // prefer checked and canAccept + int const checkedCompare ( + checkedScore (*lhs) - checkedScore (*rhs)); + if (checkedCompare > 0) + return true; + else if (checkedScore < 0) + return false; + + // prefer newer entries + if (lhs->whenInserted > rhs->whenInserted) + return true; + else if (lhs->whenInserted < rhs->whenInserted) + return false; + + return false; + } + }; + + /** Sort endpoints by desirability and discard the bottom half. */ + void prune() + { + FlattenedList list (flatten()); + if (list.size () < 3) + return; + std::random_shuffle (list.begin(), list.end()); + std::sort (list.begin(), list.end(), PruneLess()); + FlattenedList::const_iterator pos (list.begin() + list.size()/2 + 1); + std::size_t const n (m_map.size() - (pos - list.begin())); + MapType map; + for (FlattenedList::const_iterator iter (list.begin()); + iter != pos; ++iter) + map.insert (**iter); + std::swap (map, m_map); + m_journal.info << "Pruned " << n << " legacy endpoints"; + mutate(); + } + + /** Get comparison function. */ + struct GetLess + { + bool operator() (LegacyEndpoint const* lhs, + LegacyEndpoint const* rhs) const + { + // Always prefer entries we tried longer ago. This should + // cycle through the entire cache before re-using an address + // for making a connection attempt. + // + if (lhs->lastGet < rhs->lastGet) + return true; + else if (lhs->lastGet > rhs->lastGet) + return false; + + // Fall back to the prune desirability comparison + return PruneLess() (lhs, rhs); + } + }; public: - typedef std::vector FlattenedList; - - LegacyEndpointCache () + LegacyEndpointCache (Store& store, Journal journal) + : m_store (store) + , m_journal (journal) + , m_mutationCount (0) { } @@ -48,6 +154,23 @@ public: { } + /** Load the legacy endpoints cache from the database. */ + void load () + { + typedef std::vector List; + List list; + m_store.loadLegacyEndpoints (list); + std::size_t n (0); + for (List::const_iterator iter (list.begin()); + iter != list.end(); ++iter) + { + std::pair result (insert (*iter)); + if (result.second) + ++n; + } + m_journal.debug << "Loaded " << n << " legacy endpoints"; + } + /** Attempt to insert the endpoint. The caller is responsible for making sure the address is valid. The return value provides a reference to the new or existing endpoint. @@ -57,10 +180,14 @@ public: { std::pair result ( m_map.insert (LegacyEndpoint (address))); + if (m_map.size() > legacyEndpointCacheSize) + prune(); + if (result.second) + mutate(); return std::make_pair (*result.first, result.second); } - /** Returns a pointer to the legacy endpoint or nullptr. */ + /** Returns a pointer to the legacy endpoint if it exists, else nullptr. */ LegacyEndpoint const* find (IPEndpoint const& address) { MapType::iterator iter (m_map.find (address)); @@ -79,24 +206,19 @@ public: { endpoint->checked = true; endpoint->canAccept = canAccept; + mutate(); } } - struct Compare - { - bool operator() (LegacyEndpoint const* lhs, - LegacyEndpoint const* rhs) const - { - return lhs->lastGet < rhs->lastGet; - } - }; - - /** Appends up to n addresses for establishing outbound peers. */ + /** Appends up to n addresses for establishing outbound peers. + Also updates the lastGet field of the LegacyEndpoint so we will avoid + re-using the address until we have tried all the others. + */ void get (std::size_t n, std::vector & result) const { FlattenedList list (flatten()); std::random_shuffle (list.begin(), list.end()); - std::sort (list.begin(), list.end(), Compare()); + std::sort (list.begin(), list.end(), GetLess()); n = std::min (n, list.size()); RelativeTime const now (RelativeTime::fromStartup()); for (FlattenedList::iterator iter (list.begin()); @@ -104,18 +226,7 @@ public: { result.push_back ((*iter)->address); (*iter)->lastGet = now; - } - } - - /** Returns a flattened array of pointers to the legacy endpoints. */ - FlattenedList flatten () const - { - FlattenedList list; - list.reserve (m_map.size()); - for (MapType::iterator iter (m_map.begin()); - iter != m_map.end(); ++iter) - list.push_back (&*iter); - return list; + } } }; diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index d42167bdf..60cd129a4 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -23,38 +23,6 @@ namespace ripple { namespace PeerFinder { -//-------------------------------------------------------------------------- - -/* -typedef boost::multi_index_container < - Endpoint, boost::multi_index::indexed_by < - - boost::multi_index::hashed_unique < - BOOST_MULTI_INDEX_MEMBER(PeerFinder::Endpoint,IPEndpoint,address)> - > -> EndpointCache; -*/ - -// Describes an Endpoint in the global Endpoint table -// This includes the Endpoint as well as some additional information -// -struct EndpointInfo -{ - Endpoint endpoint; -}; - -inline bool operator< (EndpointInfo const& lhs, EndpointInfo const& rhs) -{ - return lhs.endpoint < rhs.endpoint; -} - -inline bool operator== (EndpointInfo const& lhs, EndpointInfo const& rhs) -{ - return lhs.endpoint == rhs.endpoint; -} - -//-------------------------------------------------------------------------- - typedef boost::multi_index_container < PeerInfo, boost::multi_index::indexed_by < boost::multi_index::hashed_unique < @@ -105,6 +73,7 @@ public: Config m_config; // A list of dynamic sources consulted as a fallback + // VFALCO TODO Replace with SharedPtr std::vector > m_sources; // The current tally of peer slot statistics @@ -114,7 +83,6 @@ public: Peers m_peers; LegacyEndpointCache m_legacyCache; - bool m_legacyCacheDirty; //---------------------------------------------------------------------- @@ -127,7 +95,7 @@ public: , m_store (store) , m_checker (checker) , m_journal (journal) - , m_legacyCacheDirty (false) + , m_legacyCache (store, journal) { } @@ -137,20 +105,7 @@ public: // void load () { - typedef std::vector List; - List list; - m_store.loadLegacyEndpoints (list); - for (List::const_iterator iter (list.begin()); - iter != list.end(); ++iter) - m_legacyCache.insert (*iter); - m_journal.debug << "Loaded " << list.size() << " legacy endpoints"; - } - - // Called when a peer's id is unexpectedly not found - // - void peerNotFound (PeerID const& id) - { - m_journal.fatal << "Missing peer " << id; + m_legacyCache.load(); } // Returns a suitable Endpoint representing us. @@ -171,15 +126,23 @@ public: return ep; } + // Returns true if the IPEndpoint contains no invalid data. + // + bool validIPEndpoint (IPEndpoint const& address) + { + if (! address.isPublic()) + return false; + if (address.port() == 0) + return false; + return true; + } + // Returns true if the Endpoint contains no invalid data. // bool validEndpoint (Endpoint const& endpoint) { - if (! endpoint.address.isPublic()) - return false; - if (endpoint.port == 0) - return false; - return false; + return validIPEndpoint ( + endpoint.address.withPort (endpoint.port)); } // Prunes invalid endpoints from a list @@ -228,7 +191,9 @@ public: { if (m_slots.outDesired > m_slots.outboundCount) { - int const needed (m_slots.outDesired - m_slots.outboundCount); + int const needed (std::min ( + m_slots.outDesired - m_slots.outboundCount, + int (maxAddressesPerAttempt))); std::vector list; m_legacyCache.get (needed, list); @@ -242,9 +207,9 @@ public: // void fetch (Source& source) { -#if 0 m_journal.debug << "Fetching " << source.name(); +#if 0 Source::IPEndpoints endpoints; source.fetch (endpoints, m_journal); @@ -254,18 +219,19 @@ public: iter != endpoints.end(); ++iter) m_legacyCache->insert (*iter); - if (m_legacyCache->size() > (numberOfLegacyEndpoints/2)) + if (m_legacyCache->size() > (legacyEndpointCacheSize/2)) { m_legacyCache.swap(); m_legacyCache->clear(); } - - m_legacyCacheDirty = true; } #endif } //---------------------------------------------------------------------- + // + // Logic + // void setConfig (Config const& config) { @@ -289,6 +255,59 @@ public: m_journal.debug << "Processing Update"; } + //-------------------------------------------------------------------------- + // + // LegacyEndpoint + // + + // Completion handler for a LegacyEndpoint listening test. + // + void onCheckLegacyEndpoint (IPEndpoint const& endpoint, + Checker::Result const& result) + { + if (result.error == boost::asio::error::operation_aborted) + return; + + RelativeTime const now (RelativeTime::fromStartup()); + + if (! result.error) + { + if (result.canAccept) + m_journal.info << "Legacy address " << endpoint << + " passed listening test"; + else + m_journal.warning << "Legacy address " << endpoint << + " cannot accept incoming connections"; + } + else + { + m_journal.error << "Listening test for legacy address " << + endpoint << " failed: " << result.error.message(); + } + } + + void onPeerLegacyEndpoint (IPEndpoint const& address) + { + if (! validIPEndpoint (address)) + return; + std::pair result ( + m_legacyCache.insert (address)); + if (result.second) + { + // its new + m_journal.trace << "New legacy endpoint: " << address; + + // VFALCO NOTE Temporarily we are doing a check on each + // legacy endpoint to test the async code + // + m_checker.async_test (address, bind ( + &Logic::onCheckLegacyEndpoint, + this, address, _1)); + } + } + + //-------------------------------------------------------------------------- + // Send mtENDPOINTS for each peer as needed // void sendEndpoints () @@ -313,48 +332,41 @@ public: } } + // Called when a peer connection is established. + // We are guaranteed that the PeerID is not already in our map. + // void onPeerConnected (PeerID const& id, IPEndpoint const& address, bool inbound) { m_journal.debug << "Peer connected: " << address; - + // If this is outgoing, record the success + if (! inbound) + m_legacyCache.checked (address, true); std::pair result ( m_peers.insert ( PeerInfo (id, address, inbound))); - if (result.second) - { - //PeerInfo const& peer (*result.first); - m_slots.addPeer (m_config, inbound); - } - else - { - // already exists! - m_journal.error << "Duplicate peer " << id; - //m_callback.disconnectPeer (id); - } + bassert (result.second); + m_slots.addPeer (m_config, inbound); } + // Called when a peer is disconnected. + // We are guaranteed to get this exactly once for each + // corresponding call to onPeerConnected. + // void onPeerDisconnected (PeerID const& id) { Peers::iterator iter (m_peers.find (id)); - if (iter != m_peers.end()) - { - // found - PeerInfo const& peer (*iter); - m_journal.debug << "Peer disconnected: " << peer.address; - m_slots.dropPeer (m_config, peer.inbound); - m_peers.erase (iter); - } - else - { - m_journal.debug << "Peer disconnected: " << id; - peerNotFound (id); - } + bassert (iter != m_peers.end()); + PeerInfo const& peer (*iter); + m_journal.debug << "Peer disconnected: " << peer.address; + m_slots.dropPeer (m_config, peer.inbound); + m_peers.erase (iter); } // Called when the Checker completes a connectivity test // - void onCheckEndpoint (PeerID const& id, Checker::Result const& result) + void onCheckEndpoint (PeerID const& id, + IPEndpoint address, Checker::Result const& result) { if (result.error == boost::asio::error::operation_aborted) return; @@ -395,147 +407,73 @@ public: } } - // Called when the Checker completes a connectivity test for a legacy address - // - void onCheckLegacyEndpoint (IPEndpoint const& endpoint, - Checker::Result const& result) - { - if (result.error == boost::asio::error::operation_aborted) - return; - - RelativeTime const now (RelativeTime::fromStartup()); - - if (! result.error) - { - if (result.canAccept) - m_journal.info << "Legacy address " << endpoint << - " passed listening test"; - else - m_journal.warning << "Legacy address " << endpoint << - " cannot accept incoming connections"; - } - else - { - m_journal.error << "Listening test for legacy address " << - endpoint << " failed: " << result.error.message(); - } - } - // Processes a list of Endpoint received from a peer. // void onPeerEndpoints (PeerID const& id, std::vector endpoints) { - pruneEndpoints (endpoints); - + pruneEndpoints (endpoints); + Peers::iterator iter (m_peers.find (id)); - if (iter != m_peers.end()) + bassert (iter != m_peers.end()); + + RelativeTime const now (RelativeTime::fromStartup()); + PeerInfo const& peer (*iter); + + if (now >= peer.whenReceiveEndpoints) { - RelativeTime const now (RelativeTime::fromStartup()); - PeerInfo const& peer (*iter); + m_journal.debug << "Received " << endpoints.size() << + "Endpoint descriptors from " << peer.address; - if (now >= peer.whenReceiveEndpoints) + // We charge a load penalty if the peer sends us more than + // numberOfEndpoints peers in a single message + if (endpoints.size() > numberOfEndpoints) { - m_journal.debug << "Received " << endpoints.size() << - "Endpoint descriptors from " << peer.address; - - // We charge a load penalty if the peer sends us more than - // numberOfEndpoints peers in a single message - if (endpoints.size() > numberOfEndpoints) - { - m_journal.warning << "Charging peer " << peer.address << - " for sending too many endpoints"; + m_journal.warning << "Charging peer " << peer.address << + " for sending too many endpoints"; - m_callback.chargePeerLoadPenalty(id); - } + m_callback.chargePeerLoadPenalty(id); + } - // process the list + // process the list + { + bool foundZeroHops (false); + bool chargedPenalty (false); + for (std::vector ::const_iterator iter (endpoints.begin()); + iter != endpoints.end(); ++iter) { - bool foundZeroHops (false); - bool chargedPenalty (false); - for (std::vector ::const_iterator iter (endpoints.begin()); - iter != endpoints.end(); ++iter) + Endpoint const& endpoint (*iter); + if (endpoint.hops == 0) { - Endpoint const& endpoint (*iter); - if (endpoint.hops == 0) + if (! foundZeroHops) { - if (! foundZeroHops) - { - foundZeroHops = true; - m_checker.async_test (endpoint.address.withPort ( - endpoint.port), bind (&Logic::onCheckEndpoint, - this, id, _1)); - } - else if (! chargedPenalty) - { - // Only charge them once (?) - chargedPenalty = true; - // More than one zero-hops message?! - m_journal.warning << "Charging peer " << peer.address << - " for sending more than one hops==0 endpoint"; - m_callback.chargePeerLoadPenalty (id); - } + foundZeroHops = true; + IPEndpoint const address ( + endpoint.address.withPort (endpoint.port)); + m_checker.async_test (address, bind (&Logic::onCheckEndpoint, + this, id, address, _1)); + } + else if (! chargedPenalty) + { + // Only charge them once (?) + chargedPenalty = true; + // More than one zero-hops message?! + m_journal.warning << "Charging peer " << peer.address << + " for sending more than one hops==0 endpoint"; + m_callback.chargePeerLoadPenalty (id); } } } + } - peer.whenReceiveEndpoints = now + secondsPerEndpoints; - } - else - { - m_journal.warning << "Charging peer " << peer.address << - " for sending too quickly"; - m_callback.chargePeerLoadPenalty (id); - } + peer.whenReceiveEndpoints = now + secondsPerEndpoints; } else { - peerNotFound (id); + m_journal.warning << "Charging peer " << peer.address << + " for sending too quickly"; + m_callback.chargePeerLoadPenalty (id); } } - - void onPeerLegacyEndpoint (IPEndpoint const& ep) - { - // filter invalid addresses - if (! ep.isPublic()) - return; - - if (ep.port() == 0) - return; - - std::pair result ( - m_legacyCache.insert (ep)); - - if (result.second) - { - // its new - m_legacyCacheDirty = true; - m_journal.trace << "Legacy endpoint: " << ep; - - m_checker.async_test (ep, bind ( - &Logic::onCheckLegacyEndpoint, - this, ep, _1)); - } - } - - // Updates the Store with the current set of legacy endpoints - // - void storeLegacyEndpoints () - { - if (!m_legacyCacheDirty) - return; - -#if 0 - std::vector list; - - createLegacyEndpointList (list); - - m_journal.debug << "Updating " << list.size() << " legacy endpoints"; - - m_store.storeLegacyEndpoints (list); - - m_legacyCacheDirty = false; -#endif - } }; } diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp index d7086f7ab..6cfd62335 100644 --- a/src/ripple/peerfinder/impl/Manager.cpp +++ b/src/ripple/peerfinder/impl/Manager.cpp @@ -189,7 +189,6 @@ public: Logic m_logic; DeadlineTimer m_connectTimer; DeadlineTimer m_endpointsTimer; - RelativeTime m_whenStoreLegacyEndpoints; //-------------------------------------------------------------------------- @@ -311,18 +310,6 @@ public: } } - // Checks to see if its time to update legacy endpoints - void storeLegacyEndpoints() - { - RelativeTime const now (RelativeTime::fromStartup()); - if (now >= m_whenStoreLegacyEndpoints) - { - m_logic.storeLegacyEndpoints (); - m_whenStoreLegacyEndpoints = now - + RelativeTime (legacyEndpointUpdateSeconds); - } - } - void init () { m_journal.debug << "Initializing"; @@ -355,14 +342,10 @@ public: { m_journal.debug << "Started"; - m_whenStoreLegacyEndpoints = RelativeTime::fromStartup() - + RelativeTime (legacyEndpointUpdateSeconds); - init (); while (! this->threadShouldExit()) { - storeLegacyEndpoints(); m_queue.run_one(); } diff --git a/src/ripple/peerfinder/impl/Store.h b/src/ripple/peerfinder/impl/Store.h index 828d8f279..042ec13f7 100644 --- a/src/ripple/peerfinder/impl/Store.h +++ b/src/ripple/peerfinder/impl/Store.h @@ -32,8 +32,8 @@ public: virtual void loadLegacyEndpoints ( std::vector & list) = 0; - virtual void storeLegacyEndpoints ( - std::vector const& list) = 0; + virtual void updateLegacyEndpoints ( + std::vector const& list) = 0; }; } diff --git a/src/ripple/peerfinder/impl/StoreSqdb.h b/src/ripple/peerfinder/impl/StoreSqdb.h index e0e88c29d..2af1fc696 100644 --- a/src/ripple/peerfinder/impl/StoreSqdb.h +++ b/src/ripple/peerfinder/impl/StoreSqdb.h @@ -100,10 +100,10 @@ public: } } - void storeLegacyEndpoints ( - std::vector const& list) + void updateLegacyEndpoints ( + std::vector const& list) { - typedef std::vector List; + typedef std::vector List; Error error; @@ -127,7 +127,7 @@ public: for (List::const_iterator iter (list.begin()); !error && iter != list.end(); ++iter) { - IPEndpoint const& ep (*iter); + IPEndpoint const& ep ((*iter)->address); s = ep.to_string(); st.execute_and_fetch (error); } diff --git a/src/ripple/peerfinder/impl/Tuning.h b/src/ripple/peerfinder/impl/Tuning.h index 4b5ba5abe..34422bf4b 100644 --- a/src/ripple/peerfinder/impl/Tuning.h +++ b/src/ripple/peerfinder/impl/Tuning.h @@ -26,23 +26,41 @@ namespace PeerFinder { // Tunable constants enum { + //--------------------------------------------------------- + // + // Connection policy settings + // + // How often we will try to make outgoing connections - secondsPerConnect = 10, + secondsPerConnect = 10 + + // The largest connections we will attempt simultaneously + ,maxAddressesPerAttempt = 30 + + //--------------------------------------------------------- + // + // Endpoint settings + // // How often we send or accept mtENDPOINTS messages per peer - secondsPerEndpoints = 5, + ,secondsPerEndpoints = 5 // How many Endpoint to send in each mtENDPOINTS - numberOfEndpoints = 10, + ,numberOfEndpoints = 10 // The most Endpoint we will accept in mtENDPOINTS - numberOfEndpointsMax = 20, + ,numberOfEndpointsMax = 20 + + //--------------------------------------------------------- + // + // LegacyEndpoint Settings + // // How many legacy endpoints to keep in our cache - numberOfLegacyEndpoints = 1000, + ,legacyEndpointCacheSize = 1000 - // How often legacy endpoints are updated in the database - legacyEndpointUpdateSeconds = 60 * 60 + // How many cache mutations between each database update + ,legacyEndpointMutationsPerUpdate = 50 }; } diff --git a/src/ripple/peerfinder/ripple_peerfinder.cpp b/src/ripple/peerfinder/ripple_peerfinder.cpp index 013568123..131e5d0a3 100644 --- a/src/ripple/peerfinder/ripple_peerfinder.cpp +++ b/src/ripple/peerfinder/ripple_peerfinder.cpp @@ -40,18 +40,17 @@ namespace ripple { using namespace beast; } +# include "impl/Tuning.h" # include "impl/Checker.h" #include "impl/CheckerAdapter.h" #include "impl/EndpointCache.h" #include "impl/Slots.h" #include "impl/Source.h" #include "impl/SourceStrings.h" - # include "impl/LegacyEndpoint.h" +# include "impl/Store.h" # include "impl/LegacyEndpointCache.h" # include "impl/PeerInfo.h" -# include "impl/Store.h" -# include "impl/Tuning.h" #include "impl/StoreSqdb.h" #include "impl/Logic.h"