diff --git a/Builds/VisualStudio2012/RippleD.vcxproj b/Builds/VisualStudio2012/RippleD.vcxproj
index ae525e03d..8a19dd34f 100644
--- a/Builds/VisualStudio2012/RippleD.vcxproj
+++ b/Builds/VisualStudio2012/RippleD.vcxproj
@@ -1641,6 +1641,7 @@
+
diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters
index de9465722..91d84bbae 100644
--- a/Builds/VisualStudio2012/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters
@@ -2217,6 +2217,9 @@
[1] Ripple\peerfinder\impl
+
+ [1] Ripple\peerfinder\impl
+
diff --git a/src/ripple/peerfinder/api/Endpoint.h b/src/ripple/peerfinder/api/Endpoint.h
index 074ce9f0c..7a7b2f311 100644
--- a/src/ripple/peerfinder/api/Endpoint.h
+++ b/src/ripple/peerfinder/api/Endpoint.h
@@ -29,7 +29,6 @@ struct Endpoint
Endpoint ();
IPEndpoint address;
- uint16 port;
int hops;
uint32 incomingSlotsAvailable;
uint32 incomingSlotsMax;
diff --git a/src/ripple/peerfinder/impl/CachedEndpoint.h b/src/ripple/peerfinder/impl/CachedEndpoint.h
new file mode 100644
index 000000000..f70678e1a
--- /dev/null
+++ b/src/ripple/peerfinder/impl/CachedEndpoint.h
@@ -0,0 +1,50 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef RIPPLE_PEERFINDER_CACHEDENDPOINT_H_INCLUDED
+#define RIPPLE_PEERFINDER_CACHEDENDPOINT_H_INCLUDED
+
+namespace ripple {
+namespace PeerFinder {
+
+struct CachedEndpoint
+{
+ CachedEndpoint (Endpoint const& endpoint)
+ : hops (endpoint.hops)
+ , incomingSlotsAvailable (endpoint.incomingSlotsAvailable)
+ , incomingSlotsMax (endpoint.incomingSlotsMax)
+ , uptimeMinutes (endpoint.uptimeMinutes)
+ , featureList (endpoint.featureList)
+ {
+ }
+
+ int hops;
+ uint32 incomingSlotsAvailable;
+ uint32 incomingSlotsMax;
+ uint32 uptimeMinutes;
+ std::string featureList;
+
+ // The peer closest to the endpoint, measured in hops.
+ PeerID origin;
+};
+
+}
+}
+
+#endif
diff --git a/src/ripple/peerfinder/impl/Endpoint.cpp b/src/ripple/peerfinder/impl/Endpoint.cpp
index 44979d1e4..25ad77c62 100644
--- a/src/ripple/peerfinder/impl/Endpoint.cpp
+++ b/src/ripple/peerfinder/impl/Endpoint.cpp
@@ -23,8 +23,7 @@ namespace ripple {
namespace PeerFinder {
Endpoint::Endpoint ()
- : port (0)
- , hops (0)
+ : hops (0)
, incomingSlotsAvailable (0)
, incomingSlotsMax (0)
, uptimeMinutes (0)
diff --git a/src/ripple/peerfinder/impl/EndpointCache.h b/src/ripple/peerfinder/impl/EndpointCache.h
index f3ae74b5c..ee644ec54 100644
--- a/src/ripple/peerfinder/impl/EndpointCache.h
+++ b/src/ripple/peerfinder/impl/EndpointCache.h
@@ -23,17 +23,35 @@
namespace ripple {
namespace PeerFinder {
-/** This container holds the master set of Endpoints. */
+/** The Endpoint cache holds the short-lived relayed Endpoint messages.
+*/
class EndpointCache
{
-public:
- EndpointCache ();
- ~EndpointCache ();
-
- // Informs the cache we've received an endpoint.
- void update (Endpoint const& ep);
-
private:
+ typedef boost::unordered_map <
+ IPEndpoint, CachedEndpoint, IPEndpoint::hasher> Table;
+
+ Journal m_journal;
+
+ Table m_now;
+ Table m_prev;
+
+public:
+ explicit EndpointCache (Journal journal)
+ : m_journal (journal)
+ {
+ }
+
+ ~EndpointCache ()
+ {
+ }
+
+ // Insert or update an existing entry with the new message
+ //
+ void update (Endpoint const& ep)
+ {
+
+ }
};
}
diff --git a/src/ripple/peerfinder/impl/LegacyEndpointCache.h b/src/ripple/peerfinder/impl/LegacyEndpointCache.h
index 50b143c98..8c7bc8779 100644
--- a/src/ripple/peerfinder/impl/LegacyEndpointCache.h
+++ b/src/ripple/peerfinder/impl/LegacyEndpointCache.h
@@ -56,6 +56,10 @@ private:
/** Increments the mutation count and updates the database if needed. */
void mutate ()
{
+ // This flag keeps us from updating while we are loading
+ if (m_mutationCount == -1)
+ return;
+
if (++m_mutationCount >= legacyEndpointMutationsPerUpdate)
{
update();
@@ -146,7 +150,7 @@ public:
LegacyEndpointCache (Store& store, Journal journal)
: m_store (store)
, m_journal (journal)
- , m_mutationCount (0)
+ , m_mutationCount (-1)
{
}
@@ -164,11 +168,12 @@ public:
for (List::const_iterator iter (list.begin());
iter != list.end(); ++iter)
{
- std::pair result (insert (*iter));
+ std::pair result (insert (*iter));
if (result.second)
++n;
}
m_journal.debug << "Loaded " << n << " legacy endpoints";
+ m_mutationCount = 0;
}
/** Attempt to insert the endpoint.
@@ -176,7 +181,7 @@ public:
The return value provides a reference to the new or existing endpoint.
The bool indicates whether or not the insertion took place.
*/
- std::pair insert (IPEndpoint const& address)
+ std::pair insert (IPEndpoint const& address)
{
std::pair result (
m_map.insert (LegacyEndpoint (address)));
diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h
index 60cd129a4..dc85f4ea2 100644
--- a/src/ripple/peerfinder/impl/Logic.h
+++ b/src/ripple/peerfinder/impl/Logic.h
@@ -23,6 +23,19 @@
namespace ripple {
namespace PeerFinder {
+// Fresh endpoints are ones we have seen recently via mtENDPOINTS.
+// These are best to give out to someone who needs additional
+// connections as quickly as possible, since it is very likely
+// that the fresh endpoints have open incoming slots.
+//
+// Reliable endpoints are ones which are highly likely to be
+// connectible over long periods of time. They might not necessarily
+// have an incoming slot, but they are good for bootstrapping when
+// there are no peers yet. Typically these are what we would want
+// to store in a database or local config file for a future launch.
+
+//------------------------------------------------------------------------------
+
typedef boost::multi_index_container <
PeerInfo, boost::multi_index::indexed_by <
boost::multi_index::hashed_unique <
@@ -46,25 +59,23 @@ public:
struct State
{
State ()
- {
- }
+ : stopping (false)
+ { }
- // Fresh endpoints are ones we have seen recently via mtENDPOINTS.
- // These are best to give out to someone who needs additional
- // connections as quickly as possible, since it is very likely
- // that the fresh endpoints have open incoming slots.
- //
- //EndpointCache fresh;
+ /** True if we are stopping. */
+ bool stopping;
- // Reliable endpoints are ones which are highly likely to be
- // connectible over long periods of time. They might not necessarily
- // have an incoming slot, but they are good for bootstrapping when
- // there are no peers yet. Typically these are what we would want
- // to store in a database or local config file for a future launch.
- //Endpoints reliable;
+ /** The source we are currently fetching.
+ This is used to cancel I/O during program exit.
+ */
+ SharedPtr fetchSource;
};
- //----------------------------------------------------------------------
+ typedef SharedData SharedState;
+
+ SharedState m_state;
+
+ //--------------------------------------------------------------------------
Callback& m_callback;
Store& m_store;
@@ -72,9 +83,8 @@ public:
Journal m_journal;
Config m_config;
- // A list of dynamic sources consulted as a fallback
- // VFALCO TODO Replace with SharedPtr
- std::vector > m_sources;
+ // A list of dynamic sources to consult as a fallback
+ std::vector > m_sources;
// The current tally of peer slot statistics
Slots m_slots;
@@ -82,9 +92,11 @@ public:
// Our view of the current set of connected peers.
Peers m_peers;
+ EndpointCache m_cache;
+
LegacyEndpointCache m_legacyCache;
- //----------------------------------------------------------------------
+ //--------------------------------------------------------------------------
Logic (
Callback& callback,
@@ -95,11 +107,26 @@ public:
, m_store (store)
, m_checker (checker)
, m_journal (journal)
+ , m_cache (journal)
, m_legacyCache (store, journal)
{
}
- //----------------------------------------------------------------------
+ /** Stop the logic.
+ This will cancel the current fetch and set the stopping flag
+ to `true` to prevent further fetches.
+ Thread safety:
+ Safe to call from any thread.
+ */
+ void stop ()
+ {
+ SharedState::Access state (m_state);
+ state->stopping = true;
+ if (state->fetchSource != nullptr)
+ state->fetchSource->cancel ();
+ }
+
+ //--------------------------------------------------------------------------
// Load persistent state information from the Store
//
@@ -116,8 +143,8 @@ public:
bassert (m_config.wantIncoming);
Endpoint ep;
- // ep.address = ?
- ep.port = m_config.listeningPort;
+ ep.address = IPEndpoint (
+ IPEndpoint::V4 ()).withPort (m_config.listeningPort);
ep.hops = 0;
ep.incomingSlotsAvailable = m_slots.inboundSlots;
ep.incomingSlotsMax = m_slots.inboundSlotsMaximum;
@@ -137,54 +164,6 @@ public:
return true;
}
- // Returns true if the Endpoint contains no invalid data.
- //
- bool validEndpoint (Endpoint const& endpoint)
- {
- return validIPEndpoint (
- endpoint.address.withPort (endpoint.port));
- }
-
- // Prunes invalid endpoints from a list
- //
- void pruneEndpoints (std::vector & list)
- {
- for (std::vector ::iterator iter (list.begin());
- iter != list.end(); ++iter)
- {
- while (! validEndpoint (*iter))
- {
- m_journal.error << "Pruned invalid endpoint " << iter->address;
- iter = list.erase (iter);
- if (iter == list.end())
- break;
- }
- }
- }
-
- // Send mtENDPOINTS for the specified peer
- //
- void sendEndpoints (PeerInfo const& peer)
- {
- typedef std::vector List;
- std::vector endpoints;
-
- // fill in endpoints
-
- // Add us to the list if we want incoming
- if (m_slots.inboundSlots > 0)
- endpoints.push_back (thisEndpoint ());
-
- if (! endpoints.empty())
- m_callback.sendPeerEndpoints (peer.id, endpoints);
- }
-
- // Assembles a list from the legacy endpoint container
- //
- void createLegacyEndpointList (std::vector & list)
- {
- }
-
// Make outgoing connections to bring us up to desired out count
//
void makeOutgoingConnections ()
@@ -203,35 +182,11 @@ public:
}
}
- // Fetch the list of IPEndpoint from the specified source
- //
- void fetch (Source& source)
- {
- m_journal.debug << "Fetching " << source.name();
-
-#if 0
- Source::IPEndpoints endpoints;
- source.fetch (endpoints, m_journal);
-
- if (! endpoints.empty())
- {
- for (Source::IPEndpoints::const_iterator iter (endpoints.begin());
- iter != endpoints.end(); ++iter)
- m_legacyCache->insert (*iter);
-
- if (m_legacyCache->size() > (legacyEndpointCacheSize/2))
- {
- m_legacyCache.swap();
- m_legacyCache->clear();
- }
- }
-#endif
- }
-
- //----------------------------------------------------------------------
+ //--------------------------------------------------------------------------
//
// Logic
//
+ //--------------------------------------------------------------------------
void setConfig (Config const& config)
{
@@ -239,13 +194,12 @@ public:
m_slots.update (m_config);
}
- void addStaticSource (Source* source)
+ void addStaticSource (SharedPtr const& source)
{
- ScopedPointer p (source);
- fetch (*source);
+ fetch (source);
}
- void addSource (Source* source)
+ void addSource (SharedPtr const& source)
{
m_sources.push_back (source);
}
@@ -255,83 +209,6 @@ public:
m_journal.debug << "Processing Update";
}
- //--------------------------------------------------------------------------
- //
- // LegacyEndpoint
- //
-
- // Completion handler for a LegacyEndpoint listening test.
- //
- void onCheckLegacyEndpoint (IPEndpoint const& endpoint,
- Checker::Result const& result)
- {
- if (result.error == boost::asio::error::operation_aborted)
- return;
-
- RelativeTime const now (RelativeTime::fromStartup());
-
- if (! result.error)
- {
- if (result.canAccept)
- m_journal.info << "Legacy address " << endpoint <<
- " passed listening test";
- else
- m_journal.warning << "Legacy address " << endpoint <<
- " cannot accept incoming connections";
- }
- else
- {
- m_journal.error << "Listening test for legacy address " <<
- endpoint << " failed: " << result.error.message();
- }
- }
-
- void onPeerLegacyEndpoint (IPEndpoint const& address)
- {
- if (! validIPEndpoint (address))
- return;
- std::pair result (
- m_legacyCache.insert (address));
- if (result.second)
- {
- // its new
- m_journal.trace << "New legacy endpoint: " << address;
-
- // VFALCO NOTE Temporarily we are doing a check on each
- // legacy endpoint to test the async code
- //
- m_checker.async_test (address, bind (
- &Logic::onCheckLegacyEndpoint,
- this, address, _1));
- }
- }
-
- //--------------------------------------------------------------------------
-
- // Send mtENDPOINTS for each peer as needed
- //
- void sendEndpoints ()
- {
- if (! m_peers.empty())
- {
- m_journal.debug << "Sending mtENDPOINTS";
-
- RelativeTime const now (RelativeTime::fromStartup());
-
- for (Peers::iterator iter (m_peers.begin());
- iter != m_peers.end(); ++iter)
- {
- PeerInfo const& peer (*iter);
- if (peer.whenSendEndpoints <= now)
- {
- sendEndpoints (peer);
- peer.whenSendEndpoints = now +
- RelativeTime (secondsPerEndpoints);
- }
- }
- }
- }
-
// Called when a peer connection is established.
// We are guaranteed that the PeerID is not already in our map.
//
@@ -363,6 +240,85 @@ public:
m_peers.erase (iter);
}
+ //--------------------------------------------------------------------------
+ //
+ // CachedEndpoints
+ //
+ //--------------------------------------------------------------------------
+
+ // Returns true if the Endpoint contains no invalid data.
+ //
+ bool validEndpoint (Endpoint const& endpoint)
+ {
+ // This function is here in case we add more stuff
+ // we want to validate to the Endpoint struct.
+ //
+ return validIPEndpoint (endpoint.address);
+ }
+
+ // Prunes invalid endpoints from a list.
+ //
+ void pruneEndpoints (
+ std::string const& source, std::vector & list)
+ {
+ for (std::vector ::iterator iter (list.begin());
+ iter != list.end();)
+ {
+ if (! validEndpoint (*iter))
+ {
+ iter = list.erase (iter);
+ m_journal.error <<
+ "Invalid endpoint " << iter->address <<
+ " from " << source;
+ }
+ else
+ {
+ ++iter;
+ }
+ }
+ }
+
+ // Send mtENDPOINTS for the specified peer
+ //
+ void sendEndpoints (PeerInfo const& peer)
+ {
+ typedef std::vector List;
+ std::vector endpoints;
+
+ // fill in endpoints
+
+ // Add us to the list if we want incoming
+ if (m_slots.inboundSlots > 0)
+ endpoints.push_back (thisEndpoint ());
+
+ if (! endpoints.empty())
+ m_callback.sendPeerEndpoints (peer.id, endpoints);
+ }
+
+ // Send mtENDPOINTS for each peer as needed
+ //
+ void sendEndpoints ()
+ {
+ if (! m_peers.empty())
+ {
+ m_journal.debug << "Sending mtENDPOINTS";
+
+ RelativeTime const now (RelativeTime::fromStartup());
+
+ for (Peers::iterator iter (m_peers.begin());
+ iter != m_peers.end(); ++iter)
+ {
+ PeerInfo const& peer (*iter);
+ if (peer.whenSendEndpoints <= now)
+ {
+ sendEndpoints (peer);
+ peer.whenSendEndpoints = now +
+ RelativeTime (secondsPerEndpoints);
+ }
+ }
+ }
+ }
+
// Called when the Checker completes a connectivity test
//
void onCheckEndpoint (PeerID const& id,
@@ -407,71 +363,169 @@ public:
}
}
- // Processes a list of Endpoint received from a peer.
+ // Called when a peer sends us the mtENDPOINTS message.
//
- void onPeerEndpoints (PeerID const& id, std::vector endpoints)
+ void onPeerEndpoints (PeerID const& id, std::vector list)
{
- pruneEndpoints (endpoints);
-
Peers::iterator iter (m_peers.find (id));
bassert (iter != m_peers.end());
RelativeTime const now (RelativeTime::fromStartup());
PeerInfo const& peer (*iter);
- if (now >= peer.whenReceiveEndpoints)
+ pruneEndpoints (peer.address.to_string(), list);
+
+ // Log at higher severity if this is the first time
+ m_journal.stream (peer.whenAcceptEndpoints.isZero() ?
+ Journal::kInfo : Journal::kTrace) <<
+ "Received " << list.size() <<
+ " endpoints from " << peer.address;
+
+ // We charge a load penalty if the peer sends us more than
+ // numberOfEndpoints peers in a single message
+ if (list.size() > numberOfEndpoints)
{
- m_journal.debug << "Received " << endpoints.size() <<
- "Endpoint descriptors from " << peer.address;
-
- // We charge a load penalty if the peer sends us more than
- // numberOfEndpoints peers in a single message
- if (endpoints.size() > numberOfEndpoints)
- {
- m_journal.warning << "Charging peer " << peer.address <<
- " for sending too many endpoints";
+ m_journal.warning << "Charging peer " << peer.address <<
+ " for sending too many endpoints";
- m_callback.chargePeerLoadPenalty(id);
- }
+ m_callback.chargePeerLoadPenalty(id);
+ }
- // process the list
+ // process the list
+ {
+ bool foundNeighbor (false);
+ bool chargedPenalty (false);
+ for (std::vector ::const_iterator iter (list.begin());
+ iter != list.end(); ++iter)
{
- bool foundZeroHops (false);
- bool chargedPenalty (false);
- for (std::vector ::const_iterator iter (endpoints.begin());
- iter != endpoints.end(); ++iter)
+ Endpoint const& endpoint (*iter);
+ if (endpoint.hops == 0)
{
- Endpoint const& endpoint (*iter);
- if (endpoint.hops == 0)
+ if (! foundNeighbor)
{
- if (! foundZeroHops)
- {
- foundZeroHops = true;
- IPEndpoint const address (
- endpoint.address.withPort (endpoint.port));
- m_checker.async_test (address, bind (&Logic::onCheckEndpoint,
- this, id, address, _1));
- }
- else if (! chargedPenalty)
- {
- // Only charge them once (?)
- chargedPenalty = true;
- // More than one zero-hops message?!
- m_journal.warning << "Charging peer " << peer.address <<
- " for sending more than one hops==0 endpoint";
- m_callback.chargePeerLoadPenalty (id);
- }
+ foundNeighbor = true;
+ // Test the peer's listening port if its the first time
+ if (! peer.checked)
+ m_checker.async_test (endpoint.address, bind (
+ &Logic::onCheckEndpoint, this, id,
+ endpoint.address, _1));
+ }
+ else if (! chargedPenalty)
+ {
+ // Only charge them once (?)
+ chargedPenalty = true;
+ // More than one zero-hops message?!
+ m_journal.warning << "Charging peer " << peer.address <<
+ " for sending more than one hops==0 endpoint";
+ m_callback.chargePeerLoadPenalty (id);
}
}
}
+ }
- peer.whenReceiveEndpoints = now + secondsPerEndpoints;
+ peer.whenAcceptEndpoints = now + secondsPerEndpoints;
+ }
+
+ //--------------------------------------------------------------------------
+ //
+ // LegacyEndpoint
+ //
+ //--------------------------------------------------------------------------
+
+ // Fetch addresses into the LegacyEndpointCache for bootstrapping
+ //
+ void fetch (SharedPtr const& source)
+ {
+ Source::Results results;
+
+ {
+ {
+ SharedState::Access state (m_state);
+ if (state->stopping)
+ return;
+ state->fetchSource = source;
+ }
+
+ source->fetch (results, m_journal);
+
+ {
+ SharedState::Access state (m_state);
+ if (state->stopping)
+ return;
+ state->fetchSource = nullptr;
+ }
+ }
+
+ if (! results.error)
+ {
+ std::size_t newEntries (0);
+ for (std::vector ::const_iterator iter (results.list.begin());
+ iter != results.list.end(); ++iter)
+ {
+ std::pair result (
+ m_legacyCache.insert (*iter));
+ if (result.second)
+ ++newEntries;
+ }
+
+ m_journal.debug <<
+ "Fetched " << results.list.size() <<
+ " legacy endpoints (" << newEntries << " new) "
+ "from " << source->name();
}
else
{
- m_journal.warning << "Charging peer " << peer.address <<
- " for sending too quickly";
- m_callback.chargePeerLoadPenalty (id);
+ m_journal.error <<
+ "Fetch " << source->name() << "failed: " <<
+ results.error.message();
+ }
+ }
+
+ // Completion handler for a LegacyEndpoint listening test.
+ //
+ void onCheckLegacyEndpoint (IPEndpoint const& endpoint,
+ Checker::Result const& result)
+ {
+ if (result.error == boost::asio::error::operation_aborted)
+ return;
+
+ RelativeTime const now (RelativeTime::fromStartup());
+
+ if (! result.error)
+ {
+ if (result.canAccept)
+ m_journal.info << "Legacy address " << endpoint <<
+ " passed listening test";
+ else
+ m_journal.warning << "Legacy address " << endpoint <<
+ " cannot accept incoming connections";
+ }
+ else
+ {
+ m_journal.error << "Listening test for legacy address " <<
+ endpoint << " failed: " << result.error.message();
+ }
+ }
+
+ void onPeerLegacyEndpoint (IPEndpoint const& address)
+ {
+ if (! validIPEndpoint (address))
+ return;
+ std::pair result (
+ m_legacyCache.insert (address));
+ if (result.second)
+ {
+ // its new
+ m_journal.trace << "New legacy endpoint: " << address;
+
+#if 0
+ // VFALCO NOTE Temporarily we are doing a check on each
+ // legacy endpoint to test the async code
+ //
+ m_checker.async_test (address, bind (
+ &Logic::onCheckLegacyEndpoint,
+ this, address, _1));
+#endif
}
}
};
diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp
index 6cfd62335..514f40644 100644
--- a/src/ripple/peerfinder/impl/Manager.cpp
+++ b/src/ripple/peerfinder/impl/Manager.cpp
@@ -279,19 +279,12 @@ public:
void onStop ()
{
+ m_journal.debug << "Stopping";
m_checker.cancel ();
-
- if (this->Thread::isThreadRunning ())
- {
- m_journal.debug << "Stopping";
- m_connectTimer.cancel();
- m_endpointsTimer.cancel();
- m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this));
- }
- else
- {
- stopped();
- }
+ m_logic.stop ();
+ m_connectTimer.cancel();
+ m_endpointsTimer.cancel();
+ m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this));
}
//--------------------------------------------------------------------------
diff --git a/src/ripple/peerfinder/impl/PeerInfo.h b/src/ripple/peerfinder/impl/PeerInfo.h
index ed1ef993d..34f7c5251 100644
--- a/src/ripple/peerfinder/impl/PeerInfo.h
+++ b/src/ripple/peerfinder/impl/PeerInfo.h
@@ -39,7 +39,7 @@ struct PeerInfo
, checked (inbound_ ? false : true)
, canAccept (inbound_ ? false : true)
, whenSendEndpoints (RelativeTime::fromStartup())
- , whenReceiveEndpoints (RelativeTime::fromStartup())
+ , whenAcceptEndpoints (RelativeTime::fromStartup())
{
}
@@ -62,7 +62,7 @@ struct PeerInfo
// This is to prevent flooding or spamming. Receipt of mtENDPOINTS
// sooner than the allotted time should impose a load charge.
//
- RelativeTime mutable whenReceiveEndpoints;
+ RelativeTime mutable whenAcceptEndpoints;
// All the Endpoint records we have received from this peer
Endpoints mutable endpoints;
diff --git a/src/ripple/peerfinder/impl/Source.h b/src/ripple/peerfinder/impl/Source.h
index 4492f0c42..3ef81fef2 100644
--- a/src/ripple/peerfinder/impl/Source.h
+++ b/src/ripple/peerfinder/impl/Source.h
@@ -23,17 +23,31 @@
namespace ripple {
namespace PeerFinder {
-/** A source of IPEndpoint for peers. */
-class Source
+/** A static or dynamic source of peer addresses.
+ These are used as fallbacks when we are bootstrapping and don't have
+ a local cache, or when none of our addresses are functioning. Typically
+ sources will represent things like static text in the config file, a
+ separate local file with addresses, or a remote HTTPS URL that can
+ be updated automatically. Another solution is to use a custom DNS server
+ that hands out peer IP addresses when name lookups are performed.
+*/
+class Source : public SharedObject
{
public:
- typedef std::vector IPEndpoints;
+ /** The results of a fetch. */
+ struct Results
+ {
+ // error_code on a failure
+ ErrorCode error;
+
+ // list of fetched endpoints
+ std::vector list;
+ };
virtual ~Source () { }
virtual std::string const& name () = 0;
-
virtual void cancel () { }
- virtual void fetch (IPEndpoints& list, Journal journal) = 0;
+ virtual void fetch (Results& results, Journal journal) = 0;
};
}
diff --git a/src/ripple/peerfinder/impl/SourceStrings.cpp b/src/ripple/peerfinder/impl/SourceStrings.cpp
index 1735bf3a7..8532cc5f6 100644
--- a/src/ripple/peerfinder/impl/SourceStrings.cpp
+++ b/src/ripple/peerfinder/impl/SourceStrings.cpp
@@ -38,18 +38,17 @@ public:
return m_name;
}
- void fetch (IPEndpoints& list, Journal journal)
+ void fetch (Results& results, Journal journal)
{
- list.resize (0);
- list.reserve (m_strings.size());
-
+ results.list.resize (0);
+ results.list.reserve (m_strings.size());
for (int i = 0; i < m_strings.size (); ++i)
{
IPEndpoint ep (
IPEndpoint::from_string_altform (
m_strings [i]));
if (! ep.empty())
- list.push_back (ep);
+ results.list.push_back (ep);
}
}
@@ -60,7 +59,7 @@ private:
//------------------------------------------------------------------------------
-SourceStrings* SourceStrings::New (std::string const& name, Strings const& strings)
+SharedPtr SourceStrings::New (std::string const& name, Strings const& strings)
{
return new SourceStringsImp (name, strings);
}
diff --git a/src/ripple/peerfinder/impl/SourceStrings.h b/src/ripple/peerfinder/impl/SourceStrings.h
index c250433c0..5d40da905 100644
--- a/src/ripple/peerfinder/impl/SourceStrings.h
+++ b/src/ripple/peerfinder/impl/SourceStrings.h
@@ -23,13 +23,13 @@
namespace ripple {
namespace PeerFinder {
-/** Provides an IPEndpoint list from a set of strings. */
+/** Provides addresses from a static set of strings. */
class SourceStrings : public Source
{
public:
typedef std::vector Strings;
- static SourceStrings* New (std::string const& name, Strings const& strings);
+ static SharedPtr New (std::string const& name, Strings const& strings);
};
}
diff --git a/src/ripple/peerfinder/impl/Tuning.h b/src/ripple/peerfinder/impl/Tuning.h
index 34422bf4b..84874d4ea 100644
--- a/src/ripple/peerfinder/impl/Tuning.h
+++ b/src/ripple/peerfinder/impl/Tuning.h
@@ -51,6 +51,10 @@ enum
// The most Endpoint we will accept in mtENDPOINTS
,numberOfEndpointsMax = 20
+ // How long an Endpoint will stay in the cache
+ // This should be a small multiple of the broadcast frequency
+ ,cachedEndpointSecondsToLive = 60
+
//---------------------------------------------------------
//
// LegacyEndpoint Settings
diff --git a/src/ripple/peerfinder/ripple_peerfinder.cpp b/src/ripple/peerfinder/ripple_peerfinder.cpp
index 131e5d0a3..bcb8125e8 100644
--- a/src/ripple/peerfinder/ripple_peerfinder.cpp
+++ b/src/ripple/peerfinder/ripple_peerfinder.cpp
@@ -28,7 +28,7 @@
#include "beast/modules/beast_core/system/BeforeBoost.h"
#include
#include
-#include
+#include
#include
#include
#include
@@ -36,6 +36,8 @@
#include "beast/modules/beast_sqdb/beast_sqdb.h"
#include "beast/modules/beast_asio/beast_asio.h"
+#include "beast/beast/boost/ErrorCode.h"
+
namespace ripple {
using namespace beast;
}
@@ -43,6 +45,7 @@ using namespace beast;
# include "impl/Tuning.h"
# include "impl/Checker.h"
#include "impl/CheckerAdapter.h"
+# include "impl/CachedEndpoint.h"
#include "impl/EndpointCache.h"
#include "impl/Slots.h"
#include "impl/Source.h"
diff --git a/src/ripple_app/peers/Peer.cpp b/src/ripple_app/peers/Peer.cpp
index b826d707d..0a0124d12 100644
--- a/src/ripple_app/peers/Peer.cpp
+++ b/src/ripple_app/peers/Peer.cpp
@@ -307,8 +307,15 @@ private:
boost::asio::ip::address_v4::bytes_type bytes (addr.to_v4().to_bytes());
m_remoteAddress = IPEndpoint (IPEndpoint::V4 (
bytes[0], bytes[1], bytes[2], bytes[3]), 0);
+ if (! m_isInbound)
+ m_remoteAddress = m_remoteAddress.withPort (
+ getNativeSocket().remote_endpoint().port());
+ }
+ else
+ {
+ // TODO: Support ipv6
+ bassertfalse;
}
-
m_remoteAddressSet = true;
if (m_socket->getFlags ().set (MultiSocket::Flag::proxy) && m_isInbound)
@@ -1683,8 +1690,7 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet)
in_addr addr;
addr.s_addr = tm.ipv4().ipv4();
IPEndpoint::V4 v4 (ntohl (addr.s_addr));
- endpoint.address = IPEndpoint (v4, 0);
- endpoint.port = tm.ipv4().ipv4port ();
+ endpoint.address = IPEndpoint (v4, tm.ipv4().ipv4port ());
}
else
{
@@ -1695,8 +1701,8 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet)
// by performing a connectivity test.
//
bassert (m_remoteAddressSet);
- endpoint.address = m_remoteAddress.withPort (0);
- endpoint.port = tm.ipv4().ipv4port ();
+ endpoint.address = m_remoteAddress.withPort (
+ tm.ipv4().ipv4port ());
}
// slots
diff --git a/src/ripple_app/peers/Peers.cpp b/src/ripple_app/peers/Peers.cpp
index 005392fe0..7dd72a7d8 100644
--- a/src/ripple_app/peers/Peers.cpp
+++ b/src/ripple_app/peers/Peers.cpp
@@ -149,7 +149,7 @@ public:
toNetworkByteOrder (ep.address.v4().value));
else
tme.mutable_ipv4()->set_ipv4(0);
- tme.mutable_ipv4()->set_ipv4port (ep.port);
+ tme.mutable_ipv4()->set_ipv4port (ep.address.port());
tme.set_hops (ep.hops);
tme.set_slots (ep.incomingSlotsAvailable);
@@ -718,8 +718,12 @@ bool PeersImp::peerConnected (Peer::ref peer, const RippleAddress& naPeer,
mConnectedMap[naPeer] = peer;
bNew = true;
- // Notify peerfinder since this is a connection that we didn't know about and are keeping
- getPeerFinder ().onPeerConnected (RipplePublicKey (peer->getNodePublic()), peer->getPeerEndpoint(), peer->isInbound());
+ // Notify peerfinder since this is a connection that we didn't
+ // know about and are keeping
+ //
+ getPeerFinder ().onPeerConnected (RipplePublicKey (
+ peer->getNodePublic()), peer->getPeerEndpoint(),
+ peer->isInbound());
assert (peer->getPeerId () != 0);
mPeerIdMap.insert (std::make_pair (peer->getPeerId (), peer));