From 72681fa7fb036b67ee6c980626f6d78698c19e07 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sat, 5 Oct 2013 11:59:17 -0700 Subject: [PATCH] PeerFinder work --- Builds/VisualStudio2012/RippleD.vcxproj | 4 +- .../VisualStudio2012/RippleD.vcxproj.filters | 12 +-- .../impl/{EndpointCache.cpp => Cache.cpp} | 0 .../impl/{EndpointCache.h => Cache.h} | 45 ++++++++-- src/ripple/peerfinder/impl/CachedEndpoint.h | 17 +--- src/ripple/peerfinder/impl/Logic.h | 82 ++++++++++++------- src/ripple/peerfinder/impl/Manager.cpp | 23 ++++-- src/ripple/peerfinder/impl/PeerInfo.h | 10 ++- src/ripple/peerfinder/impl/Tuning.h | 4 +- src/ripple/peerfinder/ripple_peerfinder.cpp | 8 +- src/ripple_app/peers/Peers.cpp | 6 +- 11 files changed, 136 insertions(+), 75 deletions(-) rename src/ripple/peerfinder/impl/{EndpointCache.cpp => Cache.cpp} (100%) rename src/ripple/peerfinder/impl/{EndpointCache.h => Cache.h} (56%) diff --git a/Builds/VisualStudio2012/RippleD.vcxproj b/Builds/VisualStudio2012/RippleD.vcxproj index 8a19dd34f..783b1d0e9 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj +++ b/Builds/VisualStudio2012/RippleD.vcxproj @@ -90,7 +90,7 @@ true true - + true true true @@ -1644,7 +1644,7 @@ - + diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters index 91d84bbae..7dfd9629f 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters @@ -1080,9 +1080,6 @@ [1] Ripple\peerfinder\impl - - [1] Ripple\peerfinder\impl - [1] Ripple\peerfinder\impl @@ -1092,6 +1089,9 @@ [1] Ripple\peerfinder\impl + + [1] Ripple\peerfinder\impl + @@ -2175,9 +2175,6 @@ [1] Ripple\peerfinder\impl - - [1] Ripple\peerfinder\impl - [1] Ripple\peerfinder\impl @@ -2217,6 +2214,9 @@ [1] Ripple\peerfinder\impl + + [1] Ripple\peerfinder\impl + [1] Ripple\peerfinder\impl diff --git a/src/ripple/peerfinder/impl/EndpointCache.cpp b/src/ripple/peerfinder/impl/Cache.cpp similarity index 100% rename from src/ripple/peerfinder/impl/EndpointCache.cpp rename to src/ripple/peerfinder/impl/Cache.cpp diff --git a/src/ripple/peerfinder/impl/EndpointCache.h b/src/ripple/peerfinder/impl/Cache.h similarity index 56% rename from src/ripple/peerfinder/impl/EndpointCache.h rename to src/ripple/peerfinder/impl/Cache.h index ee644ec54..aee3ca624 100644 --- a/src/ripple/peerfinder/impl/EndpointCache.h +++ b/src/ripple/peerfinder/impl/Cache.h @@ -25,7 +25,7 @@ namespace PeerFinder { /** The Endpoint cache holds the short-lived relayed Endpoint messages. */ -class EndpointCache +class Cache { private: typedef boost::unordered_map < @@ -36,21 +36,52 @@ private: Table m_now; Table m_prev; + // Refresh the existing entry with a new message + void refresh (CachedEndpoint& entry, Endpoint const& message) + { + entry.message.hops = std::min (entry.message.hops, message.hops); + + // Copy the other fields based on uptime + if (entry.message.uptimeMinutes < message.uptimeMinutes) + { + entry.message.incomingSlotsAvailable = message.incomingSlotsAvailable; + entry.message.incomingSlotsMax = message.incomingSlotsMax; + entry.message.uptimeMinutes = message.uptimeMinutes; + entry.message.featureList = message.featureList; + } + } + public: - explicit EndpointCache (Journal journal) + explicit Cache (Journal journal) : m_journal (journal) { } - ~EndpointCache () + ~Cache () { } - // Insert or update an existing entry with the new message - // - void update (Endpoint const& ep) + // Cycle the tables + void cycle() { - + std::swap (m_now, m_prev); + m_now.clear(); + } + + // Insert or update an existing entry with the new message + void insert (Endpoint const& message) + { + Table::iterator iter (m_prev.find (message.address)); + if (iter != m_prev.end()) + { + } + else + { + std::pair result ( + m_now.emplace (message.address, message)); + if (!result.second) + refresh (result.first->second, message); + } } }; diff --git a/src/ripple/peerfinder/impl/CachedEndpoint.h b/src/ripple/peerfinder/impl/CachedEndpoint.h index f70678e1a..2ea0fae6a 100644 --- a/src/ripple/peerfinder/impl/CachedEndpoint.h +++ b/src/ripple/peerfinder/impl/CachedEndpoint.h @@ -25,23 +25,12 @@ namespace PeerFinder { struct CachedEndpoint { - CachedEndpoint (Endpoint const& endpoint) - : hops (endpoint.hops) - , incomingSlotsAvailable (endpoint.incomingSlotsAvailable) - , incomingSlotsMax (endpoint.incomingSlotsMax) - , uptimeMinutes (endpoint.uptimeMinutes) - , featureList (endpoint.featureList) + CachedEndpoint (Endpoint const& message_) + : message (message_) { } - int hops; - uint32 incomingSlotsAvailable; - uint32 incomingSlotsMax; - uint32 uptimeMinutes; - std::string featureList; - - // The peer closest to the endpoint, measured in hops. - PeerID origin; + Endpoint message; }; } diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index dc85f4ea2..4bb8ca873 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -92,7 +92,7 @@ public: // Our view of the current set of connected peers. Peers m_peers; - EndpointCache m_cache; + Cache m_cache; LegacyEndpointCache m_legacyCache; @@ -204,9 +204,14 @@ public: m_sources.push_back (source); } - void onUpdate () + // Called periodically to cycle and age the varioous caches. + // + void cycleCache() { - m_journal.debug << "Processing Update"; + m_cache.cycle(); + for (Peers::iterator iter (m_peers.begin()); + iter != m_peers.end(); ++iter) + iter->received.cycle(); } // Called when a peer connection is established. @@ -301,7 +306,7 @@ public: { if (! m_peers.empty()) { - m_journal.debug << "Sending mtENDPOINTS"; + m_journal.trace << "Sending mtENDPOINTS"; RelativeTime const now (RelativeTime::fromStartup()); @@ -313,7 +318,7 @@ public: { sendEndpoints (peer); peer.whenSendEndpoints = now + - RelativeTime (secondsPerEndpoints); + RelativeTime (secondsPerMessage); } } } @@ -387,43 +392,64 @@ public: { m_journal.warning << "Charging peer " << peer.address << " for sending too many endpoints"; - + m_callback.chargePeerLoadPenalty(id); } - // process the list + // Process each entry + // + int neighborCount (0); + for (std::vector ::const_iterator iter (list.begin()); + iter != list.end(); ++iter) { - bool foundNeighbor (false); - bool chargedPenalty (false); - for (std::vector ::const_iterator iter (list.begin()); - iter != list.end(); ++iter) + Endpoint const& message (*iter); + + // Remember that this peer gave us this address + peer.received.insert (message.address); + + if (message.hops == 0) { - Endpoint const& endpoint (*iter); - if (endpoint.hops == 0) + ++neighborCount; + if (neighborCount == 1) { - if (! foundNeighbor) + if (! peer.checked) { - foundNeighbor = true; - // Test the peer's listening port if its the first time - if (! peer.checked) - m_checker.async_test (endpoint.address, bind ( - &Logic::onCheckEndpoint, this, id, - endpoint.address, _1)); + // Test the peer's listening port before + // adding it to the cache for the first time. + // + m_checker.async_test (message.address, bind ( + &Logic::onCheckEndpoint, this, id, + message.address, _1)); + + // Note that we simply discard the first Endpoint + // that the neighbor sends when we perform the + // listening test. They will just send us another + // one in a few seconds. } - else if (! chargedPenalty) + else if (peer.canAccept) { - // Only charge them once (?) - chargedPenalty = true; - // More than one zero-hops message?! - m_journal.warning << "Charging peer " << peer.address << - " for sending more than one hops==0 endpoint"; - m_callback.chargePeerLoadPenalty (id); + // We only add to the cache if the neighbor passed the + // listening test, else we silently drop their message + // since their listening port is misconfigured. + // + m_cache.insert (message); } } } + else + { + m_cache.insert (message); + } } - peer.whenAcceptEndpoints = now + secondsPerEndpoints; + if (neighborCount > 1) + { + m_journal.warning << "Peer " << peer.address << + " sent " << neighborCount << " entries with hops=0"; + // VFALCO TODO Should we apply load charges? + } + + peer.whenAcceptEndpoints = now + secondsPerMessage; } //-------------------------------------------------------------------------- diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp index 514f40644..d4d1cefa8 100644 --- a/src/ripple/peerfinder/impl/Manager.cpp +++ b/src/ripple/peerfinder/impl/Manager.cpp @@ -188,7 +188,8 @@ public: CheckerAdapter m_checker; Logic m_logic; DeadlineTimer m_connectTimer; - DeadlineTimer m_endpointsTimer; + DeadlineTimer m_messageTimer; + DeadlineTimer m_cacheTimer; //-------------------------------------------------------------------------- @@ -200,7 +201,8 @@ public: , m_checker (m_queue) , m_logic (callback, m_store, m_checker, journal) , m_connectTimer (this) - , m_endpointsTimer (this) + , m_messageTimer (this) + , m_cacheTimer (this) { #if 1 #if BEAST_MSVC @@ -283,7 +285,8 @@ public: m_checker.cancel (); m_logic.stop (); m_connectTimer.cancel(); - m_endpointsTimer.cancel(); + m_messageTimer.cancel(); + m_cacheTimer.cancel(); m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this)); } @@ -296,10 +299,15 @@ public: m_queue.dispatch (bind (&Logic::makeOutgoingConnections, &m_logic)); m_connectTimer.setExpiration (secondsPerConnect); } - else if (timer == m_endpointsTimer) + else if (timer == m_messageTimer) { m_queue.dispatch (bind (&Logic::sendEndpoints, &m_logic)); - m_endpointsTimer.setExpiration (secondsPerEndpoints); + m_messageTimer.setExpiration (secondsPerMessage); + } + else if (timer == m_cacheTimer) + { + m_queue.dispatch (bind (&Logic::cycleCache, &m_logic)); + m_cacheTimer.setExpiration (cacheSecondsToLive); } } @@ -326,8 +334,9 @@ public: } m_connectTimer.setExpiration (secondsPerConnect); - m_endpointsTimer.setExpiration (secondsPerEndpoints); - + m_messageTimer.setExpiration (secondsPerMessage); + m_cacheTimer.setExpiration (cacheSecondsToLive); + m_queue.post (bind (&Logic::makeOutgoingConnections, &m_logic)); } diff --git a/src/ripple/peerfinder/impl/PeerInfo.h b/src/ripple/peerfinder/impl/PeerInfo.h index 34f7c5251..d3a48fcb5 100644 --- a/src/ripple/peerfinder/impl/PeerInfo.h +++ b/src/ripple/peerfinder/impl/PeerInfo.h @@ -23,7 +23,7 @@ namespace ripple { namespace PeerFinder { -typedef AgedHistory > Endpoints; +//typedef AgedHistory > Endpoints; //-------------------------------------------------------------------------- @@ -64,8 +64,12 @@ struct PeerInfo // RelativeTime mutable whenAcceptEndpoints; - // All the Endpoint records we have received from this peer - Endpoints mutable endpoints; + // The set of all recent IPEndpoint that we have seen from this peer. + // We try to avoid sending a peer the same addresses they gave us. + // + CycledSet mutable received; }; } diff --git a/src/ripple/peerfinder/impl/Tuning.h b/src/ripple/peerfinder/impl/Tuning.h index 84874d4ea..f74213e2c 100644 --- a/src/ripple/peerfinder/impl/Tuning.h +++ b/src/ripple/peerfinder/impl/Tuning.h @@ -43,7 +43,7 @@ enum // // How often we send or accept mtENDPOINTS messages per peer - ,secondsPerEndpoints = 5 + ,secondsPerMessage = 5 // How many Endpoint to send in each mtENDPOINTS ,numberOfEndpoints = 10 @@ -53,7 +53,7 @@ enum // How long an Endpoint will stay in the cache // This should be a small multiple of the broadcast frequency - ,cachedEndpointSecondsToLive = 60 + ,cacheSecondsToLive = 60 //--------------------------------------------------------- // diff --git a/src/ripple/peerfinder/ripple_peerfinder.cpp b/src/ripple/peerfinder/ripple_peerfinder.cpp index bcb8125e8..52852d599 100644 --- a/src/ripple/peerfinder/ripple_peerfinder.cpp +++ b/src/ripple/peerfinder/ripple_peerfinder.cpp @@ -21,7 +21,7 @@ #include "ripple_peerfinder.h" -#include "../../ripple/types/api/AgedHistory.h" +#include "../../ripple/types/ripple_types.h" #include @@ -45,8 +45,8 @@ using namespace beast; # include "impl/Tuning.h" # include "impl/Checker.h" #include "impl/CheckerAdapter.h" -# include "impl/CachedEndpoint.h" -#include "impl/EndpointCache.h" +# include "impl/CachedEndpoint.h" +#include "impl/Cache.h" #include "impl/Slots.h" #include "impl/Source.h" #include "impl/SourceStrings.h" @@ -60,7 +60,7 @@ using namespace beast; #include "impl/Checker.cpp" #include "impl/Config.cpp" #include "impl/Endpoint.cpp" -#include "impl/EndpointCache.cpp" +#include "impl/Cache.cpp" #include "impl/Manager.cpp" #include "impl/Slots.cpp" #include "impl/SourceStrings.cpp" diff --git a/src/ripple_app/peers/Peers.cpp b/src/ripple_app/peers/Peers.cpp index 7dd72a7d8..015f58019 100644 --- a/src/ripple_app/peers/Peers.cpp +++ b/src/ripple_app/peers/Peers.cpp @@ -112,8 +112,10 @@ public: void preparePeerFinder() { PeerFinder::Config config; - - // config.maxPeerCount = ? + +#if RIPPLE_USE_PEERFINDER + config.maxPeerCount = 100; +#endif config.wantIncoming = (! getConfig ().PEER_PRIVATE) &&