From 82d8d9a09245acc3358a7714b3ab80b8f9075583 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Fri, 4 Oct 2013 21:29:13 -0700 Subject: [PATCH] PeerFinder work, Source fetch, show port in log --- Builds/VisualStudio2012/RippleD.vcxproj | 1 + .../VisualStudio2012/RippleD.vcxproj.filters | 3 + src/ripple/peerfinder/api/Endpoint.h | 1 - src/ripple/peerfinder/impl/CachedEndpoint.h | 50 ++ src/ripple/peerfinder/impl/Endpoint.cpp | 3 +- src/ripple/peerfinder/impl/EndpointCache.h | 34 +- .../peerfinder/impl/LegacyEndpointCache.h | 11 +- src/ripple/peerfinder/impl/Logic.h | 496 ++++++++++-------- src/ripple/peerfinder/impl/Manager.cpp | 17 +- src/ripple/peerfinder/impl/PeerInfo.h | 4 +- src/ripple/peerfinder/impl/Source.h | 24 +- src/ripple/peerfinder/impl/SourceStrings.cpp | 11 +- src/ripple/peerfinder/impl/SourceStrings.h | 4 +- src/ripple/peerfinder/impl/Tuning.h | 4 + src/ripple/peerfinder/ripple_peerfinder.cpp | 5 +- src/ripple_app/peers/Peer.cpp | 16 +- src/ripple_app/peers/Peers.cpp | 10 +- 17 files changed, 423 insertions(+), 271 deletions(-) create mode 100644 src/ripple/peerfinder/impl/CachedEndpoint.h diff --git a/Builds/VisualStudio2012/RippleD.vcxproj b/Builds/VisualStudio2012/RippleD.vcxproj index ae525e03d..8a19dd34f 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj +++ b/Builds/VisualStudio2012/RippleD.vcxproj @@ -1641,6 +1641,7 @@ + diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters index de9465722..91d84bbae 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters @@ -2217,6 +2217,9 @@ [1] Ripple\peerfinder\impl + + [1] Ripple\peerfinder\impl + diff --git a/src/ripple/peerfinder/api/Endpoint.h b/src/ripple/peerfinder/api/Endpoint.h index 074ce9f0c..7a7b2f311 100644 --- a/src/ripple/peerfinder/api/Endpoint.h +++ b/src/ripple/peerfinder/api/Endpoint.h @@ -29,7 +29,6 @@ struct Endpoint Endpoint (); IPEndpoint address; - uint16 port; int hops; uint32 incomingSlotsAvailable; uint32 incomingSlotsMax; diff --git a/src/ripple/peerfinder/impl/CachedEndpoint.h b/src/ripple/peerfinder/impl/CachedEndpoint.h new file mode 100644 index 000000000..f70678e1a --- /dev/null +++ b/src/ripple/peerfinder/impl/CachedEndpoint.h @@ -0,0 +1,50 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_CACHEDENDPOINT_H_INCLUDED +#define RIPPLE_PEERFINDER_CACHEDENDPOINT_H_INCLUDED + +namespace ripple { +namespace PeerFinder { + +struct CachedEndpoint +{ + CachedEndpoint (Endpoint const& endpoint) + : hops (endpoint.hops) + , incomingSlotsAvailable (endpoint.incomingSlotsAvailable) + , incomingSlotsMax (endpoint.incomingSlotsMax) + , uptimeMinutes (endpoint.uptimeMinutes) + , featureList (endpoint.featureList) + { + } + + int hops; + uint32 incomingSlotsAvailable; + uint32 incomingSlotsMax; + uint32 uptimeMinutes; + std::string featureList; + + // The peer closest to the endpoint, measured in hops. + PeerID origin; +}; + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/Endpoint.cpp b/src/ripple/peerfinder/impl/Endpoint.cpp index 44979d1e4..25ad77c62 100644 --- a/src/ripple/peerfinder/impl/Endpoint.cpp +++ b/src/ripple/peerfinder/impl/Endpoint.cpp @@ -23,8 +23,7 @@ namespace ripple { namespace PeerFinder { Endpoint::Endpoint () - : port (0) - , hops (0) + : hops (0) , incomingSlotsAvailable (0) , incomingSlotsMax (0) , uptimeMinutes (0) diff --git a/src/ripple/peerfinder/impl/EndpointCache.h b/src/ripple/peerfinder/impl/EndpointCache.h index f3ae74b5c..ee644ec54 100644 --- a/src/ripple/peerfinder/impl/EndpointCache.h +++ b/src/ripple/peerfinder/impl/EndpointCache.h @@ -23,17 +23,35 @@ namespace ripple { namespace PeerFinder { -/** This container holds the master set of Endpoints. */ +/** The Endpoint cache holds the short-lived relayed Endpoint messages. +*/ class EndpointCache { -public: - EndpointCache (); - ~EndpointCache (); - - // Informs the cache we've received an endpoint. - void update (Endpoint const& ep); - private: + typedef boost::unordered_map < + IPEndpoint, CachedEndpoint, IPEndpoint::hasher> Table; + + Journal m_journal; + + Table m_now; + Table m_prev; + +public: + explicit EndpointCache (Journal journal) + : m_journal (journal) + { + } + + ~EndpointCache () + { + } + + // Insert or update an existing entry with the new message + // + void update (Endpoint const& ep) + { + + } }; } diff --git a/src/ripple/peerfinder/impl/LegacyEndpointCache.h b/src/ripple/peerfinder/impl/LegacyEndpointCache.h index 50b143c98..8c7bc8779 100644 --- a/src/ripple/peerfinder/impl/LegacyEndpointCache.h +++ b/src/ripple/peerfinder/impl/LegacyEndpointCache.h @@ -56,6 +56,10 @@ private: /** Increments the mutation count and updates the database if needed. */ void mutate () { + // This flag keeps us from updating while we are loading + if (m_mutationCount == -1) + return; + if (++m_mutationCount >= legacyEndpointMutationsPerUpdate) { update(); @@ -146,7 +150,7 @@ public: LegacyEndpointCache (Store& store, Journal journal) : m_store (store) , m_journal (journal) - , m_mutationCount (0) + , m_mutationCount (-1) { } @@ -164,11 +168,12 @@ public: for (List::const_iterator iter (list.begin()); iter != list.end(); ++iter) { - std::pair result (insert (*iter)); + std::pair result (insert (*iter)); if (result.second) ++n; } m_journal.debug << "Loaded " << n << " legacy endpoints"; + m_mutationCount = 0; } /** Attempt to insert the endpoint. @@ -176,7 +181,7 @@ public: The return value provides a reference to the new or existing endpoint. The bool indicates whether or not the insertion took place. */ - std::pair insert (IPEndpoint const& address) + std::pair insert (IPEndpoint const& address) { std::pair result ( m_map.insert (LegacyEndpoint (address))); diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index 60cd129a4..dc85f4ea2 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -23,6 +23,19 @@ namespace ripple { namespace PeerFinder { +// Fresh endpoints are ones we have seen recently via mtENDPOINTS. +// These are best to give out to someone who needs additional +// connections as quickly as possible, since it is very likely +// that the fresh endpoints have open incoming slots. +// +// Reliable endpoints are ones which are highly likely to be +// connectible over long periods of time. They might not necessarily +// have an incoming slot, but they are good for bootstrapping when +// there are no peers yet. Typically these are what we would want +// to store in a database or local config file for a future launch. + +//------------------------------------------------------------------------------ + typedef boost::multi_index_container < PeerInfo, boost::multi_index::indexed_by < boost::multi_index::hashed_unique < @@ -46,25 +59,23 @@ public: struct State { State () - { - } + : stopping (false) + { } - // Fresh endpoints are ones we have seen recently via mtENDPOINTS. - // These are best to give out to someone who needs additional - // connections as quickly as possible, since it is very likely - // that the fresh endpoints have open incoming slots. - // - //EndpointCache fresh; + /** True if we are stopping. */ + bool stopping; - // Reliable endpoints are ones which are highly likely to be - // connectible over long periods of time. They might not necessarily - // have an incoming slot, but they are good for bootstrapping when - // there are no peers yet. Typically these are what we would want - // to store in a database or local config file for a future launch. - //Endpoints reliable; + /** The source we are currently fetching. + This is used to cancel I/O during program exit. + */ + SharedPtr fetchSource; }; - //---------------------------------------------------------------------- + typedef SharedData SharedState; + + SharedState m_state; + + //-------------------------------------------------------------------------- Callback& m_callback; Store& m_store; @@ -72,9 +83,8 @@ public: Journal m_journal; Config m_config; - // A list of dynamic sources consulted as a fallback - // VFALCO TODO Replace with SharedPtr - std::vector > m_sources; + // A list of dynamic sources to consult as a fallback + std::vector > m_sources; // The current tally of peer slot statistics Slots m_slots; @@ -82,9 +92,11 @@ public: // Our view of the current set of connected peers. Peers m_peers; + EndpointCache m_cache; + LegacyEndpointCache m_legacyCache; - //---------------------------------------------------------------------- + //-------------------------------------------------------------------------- Logic ( Callback& callback, @@ -95,11 +107,26 @@ public: , m_store (store) , m_checker (checker) , m_journal (journal) + , m_cache (journal) , m_legacyCache (store, journal) { } - //---------------------------------------------------------------------- + /** Stop the logic. + This will cancel the current fetch and set the stopping flag + to `true` to prevent further fetches. + Thread safety: + Safe to call from any thread. + */ + void stop () + { + SharedState::Access state (m_state); + state->stopping = true; + if (state->fetchSource != nullptr) + state->fetchSource->cancel (); + } + + //-------------------------------------------------------------------------- // Load persistent state information from the Store // @@ -116,8 +143,8 @@ public: bassert (m_config.wantIncoming); Endpoint ep; - // ep.address = ? - ep.port = m_config.listeningPort; + ep.address = IPEndpoint ( + IPEndpoint::V4 ()).withPort (m_config.listeningPort); ep.hops = 0; ep.incomingSlotsAvailable = m_slots.inboundSlots; ep.incomingSlotsMax = m_slots.inboundSlotsMaximum; @@ -137,54 +164,6 @@ public: return true; } - // Returns true if the Endpoint contains no invalid data. - // - bool validEndpoint (Endpoint const& endpoint) - { - return validIPEndpoint ( - endpoint.address.withPort (endpoint.port)); - } - - // Prunes invalid endpoints from a list - // - void pruneEndpoints (std::vector & list) - { - for (std::vector ::iterator iter (list.begin()); - iter != list.end(); ++iter) - { - while (! validEndpoint (*iter)) - { - m_journal.error << "Pruned invalid endpoint " << iter->address; - iter = list.erase (iter); - if (iter == list.end()) - break; - } - } - } - - // Send mtENDPOINTS for the specified peer - // - void sendEndpoints (PeerInfo const& peer) - { - typedef std::vector List; - std::vector endpoints; - - // fill in endpoints - - // Add us to the list if we want incoming - if (m_slots.inboundSlots > 0) - endpoints.push_back (thisEndpoint ()); - - if (! endpoints.empty()) - m_callback.sendPeerEndpoints (peer.id, endpoints); - } - - // Assembles a list from the legacy endpoint container - // - void createLegacyEndpointList (std::vector & list) - { - } - // Make outgoing connections to bring us up to desired out count // void makeOutgoingConnections () @@ -203,35 +182,11 @@ public: } } - // Fetch the list of IPEndpoint from the specified source - // - void fetch (Source& source) - { - m_journal.debug << "Fetching " << source.name(); - -#if 0 - Source::IPEndpoints endpoints; - source.fetch (endpoints, m_journal); - - if (! endpoints.empty()) - { - for (Source::IPEndpoints::const_iterator iter (endpoints.begin()); - iter != endpoints.end(); ++iter) - m_legacyCache->insert (*iter); - - if (m_legacyCache->size() > (legacyEndpointCacheSize/2)) - { - m_legacyCache.swap(); - m_legacyCache->clear(); - } - } -#endif - } - - //---------------------------------------------------------------------- + //-------------------------------------------------------------------------- // // Logic // + //-------------------------------------------------------------------------- void setConfig (Config const& config) { @@ -239,13 +194,12 @@ public: m_slots.update (m_config); } - void addStaticSource (Source* source) + void addStaticSource (SharedPtr const& source) { - ScopedPointer p (source); - fetch (*source); + fetch (source); } - void addSource (Source* source) + void addSource (SharedPtr const& source) { m_sources.push_back (source); } @@ -255,83 +209,6 @@ public: m_journal.debug << "Processing Update"; } - //-------------------------------------------------------------------------- - // - // LegacyEndpoint - // - - // Completion handler for a LegacyEndpoint listening test. - // - void onCheckLegacyEndpoint (IPEndpoint const& endpoint, - Checker::Result const& result) - { - if (result.error == boost::asio::error::operation_aborted) - return; - - RelativeTime const now (RelativeTime::fromStartup()); - - if (! result.error) - { - if (result.canAccept) - m_journal.info << "Legacy address " << endpoint << - " passed listening test"; - else - m_journal.warning << "Legacy address " << endpoint << - " cannot accept incoming connections"; - } - else - { - m_journal.error << "Listening test for legacy address " << - endpoint << " failed: " << result.error.message(); - } - } - - void onPeerLegacyEndpoint (IPEndpoint const& address) - { - if (! validIPEndpoint (address)) - return; - std::pair result ( - m_legacyCache.insert (address)); - if (result.second) - { - // its new - m_journal.trace << "New legacy endpoint: " << address; - - // VFALCO NOTE Temporarily we are doing a check on each - // legacy endpoint to test the async code - // - m_checker.async_test (address, bind ( - &Logic::onCheckLegacyEndpoint, - this, address, _1)); - } - } - - //-------------------------------------------------------------------------- - - // Send mtENDPOINTS for each peer as needed - // - void sendEndpoints () - { - if (! m_peers.empty()) - { - m_journal.debug << "Sending mtENDPOINTS"; - - RelativeTime const now (RelativeTime::fromStartup()); - - for (Peers::iterator iter (m_peers.begin()); - iter != m_peers.end(); ++iter) - { - PeerInfo const& peer (*iter); - if (peer.whenSendEndpoints <= now) - { - sendEndpoints (peer); - peer.whenSendEndpoints = now + - RelativeTime (secondsPerEndpoints); - } - } - } - } - // Called when a peer connection is established. // We are guaranteed that the PeerID is not already in our map. // @@ -363,6 +240,85 @@ public: m_peers.erase (iter); } + //-------------------------------------------------------------------------- + // + // CachedEndpoints + // + //-------------------------------------------------------------------------- + + // Returns true if the Endpoint contains no invalid data. + // + bool validEndpoint (Endpoint const& endpoint) + { + // This function is here in case we add more stuff + // we want to validate to the Endpoint struct. + // + return validIPEndpoint (endpoint.address); + } + + // Prunes invalid endpoints from a list. + // + void pruneEndpoints ( + std::string const& source, std::vector & list) + { + for (std::vector ::iterator iter (list.begin()); + iter != list.end();) + { + if (! validEndpoint (*iter)) + { + iter = list.erase (iter); + m_journal.error << + "Invalid endpoint " << iter->address << + " from " << source; + } + else + { + ++iter; + } + } + } + + // Send mtENDPOINTS for the specified peer + // + void sendEndpoints (PeerInfo const& peer) + { + typedef std::vector List; + std::vector endpoints; + + // fill in endpoints + + // Add us to the list if we want incoming + if (m_slots.inboundSlots > 0) + endpoints.push_back (thisEndpoint ()); + + if (! endpoints.empty()) + m_callback.sendPeerEndpoints (peer.id, endpoints); + } + + // Send mtENDPOINTS for each peer as needed + // + void sendEndpoints () + { + if (! m_peers.empty()) + { + m_journal.debug << "Sending mtENDPOINTS"; + + RelativeTime const now (RelativeTime::fromStartup()); + + for (Peers::iterator iter (m_peers.begin()); + iter != m_peers.end(); ++iter) + { + PeerInfo const& peer (*iter); + if (peer.whenSendEndpoints <= now) + { + sendEndpoints (peer); + peer.whenSendEndpoints = now + + RelativeTime (secondsPerEndpoints); + } + } + } + } + // Called when the Checker completes a connectivity test // void onCheckEndpoint (PeerID const& id, @@ -407,71 +363,169 @@ public: } } - // Processes a list of Endpoint received from a peer. + // Called when a peer sends us the mtENDPOINTS message. // - void onPeerEndpoints (PeerID const& id, std::vector endpoints) + void onPeerEndpoints (PeerID const& id, std::vector list) { - pruneEndpoints (endpoints); - Peers::iterator iter (m_peers.find (id)); bassert (iter != m_peers.end()); RelativeTime const now (RelativeTime::fromStartup()); PeerInfo const& peer (*iter); - if (now >= peer.whenReceiveEndpoints) + pruneEndpoints (peer.address.to_string(), list); + + // Log at higher severity if this is the first time + m_journal.stream (peer.whenAcceptEndpoints.isZero() ? + Journal::kInfo : Journal::kTrace) << + "Received " << list.size() << + " endpoints from " << peer.address; + + // We charge a load penalty if the peer sends us more than + // numberOfEndpoints peers in a single message + if (list.size() > numberOfEndpoints) { - m_journal.debug << "Received " << endpoints.size() << - "Endpoint descriptors from " << peer.address; - - // We charge a load penalty if the peer sends us more than - // numberOfEndpoints peers in a single message - if (endpoints.size() > numberOfEndpoints) - { - m_journal.warning << "Charging peer " << peer.address << - " for sending too many endpoints"; + m_journal.warning << "Charging peer " << peer.address << + " for sending too many endpoints"; - m_callback.chargePeerLoadPenalty(id); - } + m_callback.chargePeerLoadPenalty(id); + } - // process the list + // process the list + { + bool foundNeighbor (false); + bool chargedPenalty (false); + for (std::vector ::const_iterator iter (list.begin()); + iter != list.end(); ++iter) { - bool foundZeroHops (false); - bool chargedPenalty (false); - for (std::vector ::const_iterator iter (endpoints.begin()); - iter != endpoints.end(); ++iter) + Endpoint const& endpoint (*iter); + if (endpoint.hops == 0) { - Endpoint const& endpoint (*iter); - if (endpoint.hops == 0) + if (! foundNeighbor) { - if (! foundZeroHops) - { - foundZeroHops = true; - IPEndpoint const address ( - endpoint.address.withPort (endpoint.port)); - m_checker.async_test (address, bind (&Logic::onCheckEndpoint, - this, id, address, _1)); - } - else if (! chargedPenalty) - { - // Only charge them once (?) - chargedPenalty = true; - // More than one zero-hops message?! - m_journal.warning << "Charging peer " << peer.address << - " for sending more than one hops==0 endpoint"; - m_callback.chargePeerLoadPenalty (id); - } + foundNeighbor = true; + // Test the peer's listening port if its the first time + if (! peer.checked) + m_checker.async_test (endpoint.address, bind ( + &Logic::onCheckEndpoint, this, id, + endpoint.address, _1)); + } + else if (! chargedPenalty) + { + // Only charge them once (?) + chargedPenalty = true; + // More than one zero-hops message?! + m_journal.warning << "Charging peer " << peer.address << + " for sending more than one hops==0 endpoint"; + m_callback.chargePeerLoadPenalty (id); } } } + } - peer.whenReceiveEndpoints = now + secondsPerEndpoints; + peer.whenAcceptEndpoints = now + secondsPerEndpoints; + } + + //-------------------------------------------------------------------------- + // + // LegacyEndpoint + // + //-------------------------------------------------------------------------- + + // Fetch addresses into the LegacyEndpointCache for bootstrapping + // + void fetch (SharedPtr const& source) + { + Source::Results results; + + { + { + SharedState::Access state (m_state); + if (state->stopping) + return; + state->fetchSource = source; + } + + source->fetch (results, m_journal); + + { + SharedState::Access state (m_state); + if (state->stopping) + return; + state->fetchSource = nullptr; + } + } + + if (! results.error) + { + std::size_t newEntries (0); + for (std::vector ::const_iterator iter (results.list.begin()); + iter != results.list.end(); ++iter) + { + std::pair result ( + m_legacyCache.insert (*iter)); + if (result.second) + ++newEntries; + } + + m_journal.debug << + "Fetched " << results.list.size() << + " legacy endpoints (" << newEntries << " new) " + "from " << source->name(); } else { - m_journal.warning << "Charging peer " << peer.address << - " for sending too quickly"; - m_callback.chargePeerLoadPenalty (id); + m_journal.error << + "Fetch " << source->name() << "failed: " << + results.error.message(); + } + } + + // Completion handler for a LegacyEndpoint listening test. + // + void onCheckLegacyEndpoint (IPEndpoint const& endpoint, + Checker::Result const& result) + { + if (result.error == boost::asio::error::operation_aborted) + return; + + RelativeTime const now (RelativeTime::fromStartup()); + + if (! result.error) + { + if (result.canAccept) + m_journal.info << "Legacy address " << endpoint << + " passed listening test"; + else + m_journal.warning << "Legacy address " << endpoint << + " cannot accept incoming connections"; + } + else + { + m_journal.error << "Listening test for legacy address " << + endpoint << " failed: " << result.error.message(); + } + } + + void onPeerLegacyEndpoint (IPEndpoint const& address) + { + if (! validIPEndpoint (address)) + return; + std::pair result ( + m_legacyCache.insert (address)); + if (result.second) + { + // its new + m_journal.trace << "New legacy endpoint: " << address; + +#if 0 + // VFALCO NOTE Temporarily we are doing a check on each + // legacy endpoint to test the async code + // + m_checker.async_test (address, bind ( + &Logic::onCheckLegacyEndpoint, + this, address, _1)); +#endif } } }; diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp index 6cfd62335..514f40644 100644 --- a/src/ripple/peerfinder/impl/Manager.cpp +++ b/src/ripple/peerfinder/impl/Manager.cpp @@ -279,19 +279,12 @@ public: void onStop () { + m_journal.debug << "Stopping"; m_checker.cancel (); - - if (this->Thread::isThreadRunning ()) - { - m_journal.debug << "Stopping"; - m_connectTimer.cancel(); - m_endpointsTimer.cancel(); - m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this)); - } - else - { - stopped(); - } + m_logic.stop (); + m_connectTimer.cancel(); + m_endpointsTimer.cancel(); + m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this)); } //-------------------------------------------------------------------------- diff --git a/src/ripple/peerfinder/impl/PeerInfo.h b/src/ripple/peerfinder/impl/PeerInfo.h index ed1ef993d..34f7c5251 100644 --- a/src/ripple/peerfinder/impl/PeerInfo.h +++ b/src/ripple/peerfinder/impl/PeerInfo.h @@ -39,7 +39,7 @@ struct PeerInfo , checked (inbound_ ? false : true) , canAccept (inbound_ ? false : true) , whenSendEndpoints (RelativeTime::fromStartup()) - , whenReceiveEndpoints (RelativeTime::fromStartup()) + , whenAcceptEndpoints (RelativeTime::fromStartup()) { } @@ -62,7 +62,7 @@ struct PeerInfo // This is to prevent flooding or spamming. Receipt of mtENDPOINTS // sooner than the allotted time should impose a load charge. // - RelativeTime mutable whenReceiveEndpoints; + RelativeTime mutable whenAcceptEndpoints; // All the Endpoint records we have received from this peer Endpoints mutable endpoints; diff --git a/src/ripple/peerfinder/impl/Source.h b/src/ripple/peerfinder/impl/Source.h index 4492f0c42..3ef81fef2 100644 --- a/src/ripple/peerfinder/impl/Source.h +++ b/src/ripple/peerfinder/impl/Source.h @@ -23,17 +23,31 @@ namespace ripple { namespace PeerFinder { -/** A source of IPEndpoint for peers. */ -class Source +/** A static or dynamic source of peer addresses. + These are used as fallbacks when we are bootstrapping and don't have + a local cache, or when none of our addresses are functioning. Typically + sources will represent things like static text in the config file, a + separate local file with addresses, or a remote HTTPS URL that can + be updated automatically. Another solution is to use a custom DNS server + that hands out peer IP addresses when name lookups are performed. +*/ +class Source : public SharedObject { public: - typedef std::vector IPEndpoints; + /** The results of a fetch. */ + struct Results + { + // error_code on a failure + ErrorCode error; + + // list of fetched endpoints + std::vector list; + }; virtual ~Source () { } virtual std::string const& name () = 0; - virtual void cancel () { } - virtual void fetch (IPEndpoints& list, Journal journal) = 0; + virtual void fetch (Results& results, Journal journal) = 0; }; } diff --git a/src/ripple/peerfinder/impl/SourceStrings.cpp b/src/ripple/peerfinder/impl/SourceStrings.cpp index 1735bf3a7..8532cc5f6 100644 --- a/src/ripple/peerfinder/impl/SourceStrings.cpp +++ b/src/ripple/peerfinder/impl/SourceStrings.cpp @@ -38,18 +38,17 @@ public: return m_name; } - void fetch (IPEndpoints& list, Journal journal) + void fetch (Results& results, Journal journal) { - list.resize (0); - list.reserve (m_strings.size()); - + results.list.resize (0); + results.list.reserve (m_strings.size()); for (int i = 0; i < m_strings.size (); ++i) { IPEndpoint ep ( IPEndpoint::from_string_altform ( m_strings [i])); if (! ep.empty()) - list.push_back (ep); + results.list.push_back (ep); } } @@ -60,7 +59,7 @@ private: //------------------------------------------------------------------------------ -SourceStrings* SourceStrings::New (std::string const& name, Strings const& strings) +SharedPtr SourceStrings::New (std::string const& name, Strings const& strings) { return new SourceStringsImp (name, strings); } diff --git a/src/ripple/peerfinder/impl/SourceStrings.h b/src/ripple/peerfinder/impl/SourceStrings.h index c250433c0..5d40da905 100644 --- a/src/ripple/peerfinder/impl/SourceStrings.h +++ b/src/ripple/peerfinder/impl/SourceStrings.h @@ -23,13 +23,13 @@ namespace ripple { namespace PeerFinder { -/** Provides an IPEndpoint list from a set of strings. */ +/** Provides addresses from a static set of strings. */ class SourceStrings : public Source { public: typedef std::vector Strings; - static SourceStrings* New (std::string const& name, Strings const& strings); + static SharedPtr New (std::string const& name, Strings const& strings); }; } diff --git a/src/ripple/peerfinder/impl/Tuning.h b/src/ripple/peerfinder/impl/Tuning.h index 34422bf4b..84874d4ea 100644 --- a/src/ripple/peerfinder/impl/Tuning.h +++ b/src/ripple/peerfinder/impl/Tuning.h @@ -51,6 +51,10 @@ enum // The most Endpoint we will accept in mtENDPOINTS ,numberOfEndpointsMax = 20 + // How long an Endpoint will stay in the cache + // This should be a small multiple of the broadcast frequency + ,cachedEndpointSecondsToLive = 60 + //--------------------------------------------------------- // // LegacyEndpoint Settings diff --git a/src/ripple/peerfinder/ripple_peerfinder.cpp b/src/ripple/peerfinder/ripple_peerfinder.cpp index 131e5d0a3..bcb8125e8 100644 --- a/src/ripple/peerfinder/ripple_peerfinder.cpp +++ b/src/ripple/peerfinder/ripple_peerfinder.cpp @@ -28,7 +28,7 @@ #include "beast/modules/beast_core/system/BeforeBoost.h" #include #include -#include +#include #include #include #include @@ -36,6 +36,8 @@ #include "beast/modules/beast_sqdb/beast_sqdb.h" #include "beast/modules/beast_asio/beast_asio.h" +#include "beast/beast/boost/ErrorCode.h" + namespace ripple { using namespace beast; } @@ -43,6 +45,7 @@ using namespace beast; # include "impl/Tuning.h" # include "impl/Checker.h" #include "impl/CheckerAdapter.h" +# include "impl/CachedEndpoint.h" #include "impl/EndpointCache.h" #include "impl/Slots.h" #include "impl/Source.h" diff --git a/src/ripple_app/peers/Peer.cpp b/src/ripple_app/peers/Peer.cpp index b826d707d..0a0124d12 100644 --- a/src/ripple_app/peers/Peer.cpp +++ b/src/ripple_app/peers/Peer.cpp @@ -307,8 +307,15 @@ private: boost::asio::ip::address_v4::bytes_type bytes (addr.to_v4().to_bytes()); m_remoteAddress = IPEndpoint (IPEndpoint::V4 ( bytes[0], bytes[1], bytes[2], bytes[3]), 0); + if (! m_isInbound) + m_remoteAddress = m_remoteAddress.withPort ( + getNativeSocket().remote_endpoint().port()); + } + else + { + // TODO: Support ipv6 + bassertfalse; } - m_remoteAddressSet = true; if (m_socket->getFlags ().set (MultiSocket::Flag::proxy) && m_isInbound) @@ -1683,8 +1690,7 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet) in_addr addr; addr.s_addr = tm.ipv4().ipv4(); IPEndpoint::V4 v4 (ntohl (addr.s_addr)); - endpoint.address = IPEndpoint (v4, 0); - endpoint.port = tm.ipv4().ipv4port (); + endpoint.address = IPEndpoint (v4, tm.ipv4().ipv4port ()); } else { @@ -1695,8 +1701,8 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet) // by performing a connectivity test. // bassert (m_remoteAddressSet); - endpoint.address = m_remoteAddress.withPort (0); - endpoint.port = tm.ipv4().ipv4port (); + endpoint.address = m_remoteAddress.withPort ( + tm.ipv4().ipv4port ()); } // slots diff --git a/src/ripple_app/peers/Peers.cpp b/src/ripple_app/peers/Peers.cpp index 005392fe0..7dd72a7d8 100644 --- a/src/ripple_app/peers/Peers.cpp +++ b/src/ripple_app/peers/Peers.cpp @@ -149,7 +149,7 @@ public: toNetworkByteOrder (ep.address.v4().value)); else tme.mutable_ipv4()->set_ipv4(0); - tme.mutable_ipv4()->set_ipv4port (ep.port); + tme.mutable_ipv4()->set_ipv4port (ep.address.port()); tme.set_hops (ep.hops); tme.set_slots (ep.incomingSlotsAvailable); @@ -718,8 +718,12 @@ bool PeersImp::peerConnected (Peer::ref peer, const RippleAddress& naPeer, mConnectedMap[naPeer] = peer; bNew = true; - // Notify peerfinder since this is a connection that we didn't know about and are keeping - getPeerFinder ().onPeerConnected (RipplePublicKey (peer->getNodePublic()), peer->getPeerEndpoint(), peer->isInbound()); + // Notify peerfinder since this is a connection that we didn't + // know about and are keeping + // + getPeerFinder ().onPeerConnected (RipplePublicKey ( + peer->getNodePublic()), peer->getPeerEndpoint(), + peer->isInbound()); assert (peer->getPeerId () != 0); mPeerIdMap.insert (std::make_pair (peer->getPeerId (), peer));