PeerFinder work, Source fetch, show port in log

This commit is contained in:
Vinnie Falco
2013-10-04 21:29:13 -07:00
parent 30ff139a29
commit 82d8d9a092
17 changed files with 423 additions and 271 deletions

View File

@@ -1641,6 +1641,7 @@
<ClInclude Include="..\..\src\ripple\peerfinder\api\Endpoint.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\api\Endpoint.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\api\Manager.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\api\Manager.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\api\Types.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\api\Types.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CachedEndpoint.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Checker.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\Checker.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\EndpointCache.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\EndpointCache.h" />

View File

@@ -2217,6 +2217,9 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h"> <ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h">
<Filter>[1] Ripple\peerfinder\impl</Filter> <Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CachedEndpoint.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<CustomBuild Include="..\..\src\ripple_data\protocol\ripple.proto"> <CustomBuild Include="..\..\src\ripple_data\protocol\ripple.proto">

View File

@@ -29,7 +29,6 @@ struct Endpoint
Endpoint (); Endpoint ();
IPEndpoint address; IPEndpoint address;
uint16 port;
int hops; int hops;
uint32 incomingSlotsAvailable; uint32 incomingSlotsAvailable;
uint32 incomingSlotsMax; uint32 incomingSlotsMax;

View File

@@ -0,0 +1,50 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_CACHEDENDPOINT_H_INCLUDED
#define RIPPLE_PEERFINDER_CACHEDENDPOINT_H_INCLUDED
namespace ripple {
namespace PeerFinder {
struct CachedEndpoint
{
CachedEndpoint (Endpoint const& endpoint)
: hops (endpoint.hops)
, incomingSlotsAvailable (endpoint.incomingSlotsAvailable)
, incomingSlotsMax (endpoint.incomingSlotsMax)
, uptimeMinutes (endpoint.uptimeMinutes)
, featureList (endpoint.featureList)
{
}
int hops;
uint32 incomingSlotsAvailable;
uint32 incomingSlotsMax;
uint32 uptimeMinutes;
std::string featureList;
// The peer closest to the endpoint, measured in hops.
PeerID origin;
};
}
}
#endif

View File

@@ -23,8 +23,7 @@ namespace ripple {
namespace PeerFinder { namespace PeerFinder {
Endpoint::Endpoint () Endpoint::Endpoint ()
: port (0) : hops (0)
, hops (0)
, incomingSlotsAvailable (0) , incomingSlotsAvailable (0)
, incomingSlotsMax (0) , incomingSlotsMax (0)
, uptimeMinutes (0) , uptimeMinutes (0)

View File

@@ -23,17 +23,35 @@
namespace ripple { namespace ripple {
namespace PeerFinder { namespace PeerFinder {
/** This container holds the master set of Endpoints. */ /** The Endpoint cache holds the short-lived relayed Endpoint messages.
*/
class EndpointCache class EndpointCache
{ {
public:
EndpointCache ();
~EndpointCache ();
// Informs the cache we've received an endpoint.
void update (Endpoint const& ep);
private: private:
typedef boost::unordered_map <
IPEndpoint, CachedEndpoint, IPEndpoint::hasher> Table;
Journal m_journal;
Table m_now;
Table m_prev;
public:
explicit EndpointCache (Journal journal)
: m_journal (journal)
{
}
~EndpointCache ()
{
}
// Insert or update an existing entry with the new message
//
void update (Endpoint const& ep)
{
}
}; };
} }

View File

@@ -56,6 +56,10 @@ private:
/** Increments the mutation count and updates the database if needed. */ /** Increments the mutation count and updates the database if needed. */
void mutate () void mutate ()
{ {
// This flag keeps us from updating while we are loading
if (m_mutationCount == -1)
return;
if (++m_mutationCount >= legacyEndpointMutationsPerUpdate) if (++m_mutationCount >= legacyEndpointMutationsPerUpdate)
{ {
update(); update();
@@ -146,7 +150,7 @@ public:
LegacyEndpointCache (Store& store, Journal journal) LegacyEndpointCache (Store& store, Journal journal)
: m_store (store) : m_store (store)
, m_journal (journal) , m_journal (journal)
, m_mutationCount (0) , m_mutationCount (-1)
{ {
} }
@@ -164,11 +168,12 @@ public:
for (List::const_iterator iter (list.begin()); for (List::const_iterator iter (list.begin());
iter != list.end(); ++iter) iter != list.end(); ++iter)
{ {
std::pair <LegacyEndpoint&, bool> result (insert (*iter)); std::pair <LegacyEndpoint const&, bool> result (insert (*iter));
if (result.second) if (result.second)
++n; ++n;
} }
m_journal.debug << "Loaded " << n << " legacy endpoints"; m_journal.debug << "Loaded " << n << " legacy endpoints";
m_mutationCount = 0;
} }
/** Attempt to insert the endpoint. /** Attempt to insert the endpoint.
@@ -176,7 +181,7 @@ public:
The return value provides a reference to the new or existing endpoint. The return value provides a reference to the new or existing endpoint.
The bool indicates whether or not the insertion took place. The bool indicates whether or not the insertion took place.
*/ */
std::pair <LegacyEndpoint&, bool> insert (IPEndpoint const& address) std::pair <LegacyEndpoint const&, bool> insert (IPEndpoint const& address)
{ {
std::pair <MapType::iterator, bool> result ( std::pair <MapType::iterator, bool> result (
m_map.insert (LegacyEndpoint (address))); m_map.insert (LegacyEndpoint (address)));

View File

@@ -23,6 +23,19 @@
namespace ripple { namespace ripple {
namespace PeerFinder { namespace PeerFinder {
// Fresh endpoints are ones we have seen recently via mtENDPOINTS.
// These are best to give out to someone who needs additional
// connections as quickly as possible, since it is very likely
// that the fresh endpoints have open incoming slots.
//
// Reliable endpoints are ones which are highly likely to be
// connectible over long periods of time. They might not necessarily
// have an incoming slot, but they are good for bootstrapping when
// there are no peers yet. Typically these are what we would want
// to store in a database or local config file for a future launch.
//------------------------------------------------------------------------------
typedef boost::multi_index_container < typedef boost::multi_index_container <
PeerInfo, boost::multi_index::indexed_by < PeerInfo, boost::multi_index::indexed_by <
boost::multi_index::hashed_unique < boost::multi_index::hashed_unique <
@@ -46,25 +59,23 @@ public:
struct State struct State
{ {
State () State ()
{ : stopping (false)
} { }
// Fresh endpoints are ones we have seen recently via mtENDPOINTS. /** True if we are stopping. */
// These are best to give out to someone who needs additional bool stopping;
// connections as quickly as possible, since it is very likely
// that the fresh endpoints have open incoming slots.
//
//EndpointCache fresh;
// Reliable endpoints are ones which are highly likely to be /** The source we are currently fetching.
// connectible over long periods of time. They might not necessarily This is used to cancel I/O during program exit.
// have an incoming slot, but they are good for bootstrapping when */
// there are no peers yet. Typically these are what we would want SharedPtr <Source> fetchSource;
// to store in a database or local config file for a future launch.
//Endpoints reliable;
}; };
//---------------------------------------------------------------------- typedef SharedData <State> SharedState;
SharedState m_state;
//--------------------------------------------------------------------------
Callback& m_callback; Callback& m_callback;
Store& m_store; Store& m_store;
@@ -72,9 +83,8 @@ public:
Journal m_journal; Journal m_journal;
Config m_config; Config m_config;
// A list of dynamic sources consulted as a fallback // A list of dynamic sources to consult as a fallback
// VFALCO TODO Replace with SharedPtr <Source> std::vector <SharedPtr <Source> > m_sources;
std::vector <ScopedPointer <Source> > m_sources;
// The current tally of peer slot statistics // The current tally of peer slot statistics
Slots m_slots; Slots m_slots;
@@ -82,9 +92,11 @@ public:
// Our view of the current set of connected peers. // Our view of the current set of connected peers.
Peers m_peers; Peers m_peers;
EndpointCache m_cache;
LegacyEndpointCache m_legacyCache; LegacyEndpointCache m_legacyCache;
//---------------------------------------------------------------------- //--------------------------------------------------------------------------
Logic ( Logic (
Callback& callback, Callback& callback,
@@ -95,11 +107,26 @@ public:
, m_store (store) , m_store (store)
, m_checker (checker) , m_checker (checker)
, m_journal (journal) , m_journal (journal)
, m_cache (journal)
, m_legacyCache (store, journal) , m_legacyCache (store, journal)
{ {
} }
//---------------------------------------------------------------------- /** Stop the logic.
This will cancel the current fetch and set the stopping flag
to `true` to prevent further fetches.
Thread safety:
Safe to call from any thread.
*/
void stop ()
{
SharedState::Access state (m_state);
state->stopping = true;
if (state->fetchSource != nullptr)
state->fetchSource->cancel ();
}
//--------------------------------------------------------------------------
// Load persistent state information from the Store // Load persistent state information from the Store
// //
@@ -116,8 +143,8 @@ public:
bassert (m_config.wantIncoming); bassert (m_config.wantIncoming);
Endpoint ep; Endpoint ep;
// ep.address = ? ep.address = IPEndpoint (
ep.port = m_config.listeningPort; IPEndpoint::V4 ()).withPort (m_config.listeningPort);
ep.hops = 0; ep.hops = 0;
ep.incomingSlotsAvailable = m_slots.inboundSlots; ep.incomingSlotsAvailable = m_slots.inboundSlots;
ep.incomingSlotsMax = m_slots.inboundSlotsMaximum; ep.incomingSlotsMax = m_slots.inboundSlotsMaximum;
@@ -137,54 +164,6 @@ public:
return true; return true;
} }
// Returns true if the Endpoint contains no invalid data.
//
bool validEndpoint (Endpoint const& endpoint)
{
return validIPEndpoint (
endpoint.address.withPort (endpoint.port));
}
// Prunes invalid endpoints from a list
//
void pruneEndpoints (std::vector <Endpoint>& list)
{
for (std::vector <Endpoint>::iterator iter (list.begin());
iter != list.end(); ++iter)
{
while (! validEndpoint (*iter))
{
m_journal.error << "Pruned invalid endpoint " << iter->address;
iter = list.erase (iter);
if (iter == list.end())
break;
}
}
}
// Send mtENDPOINTS for the specified peer
//
void sendEndpoints (PeerInfo const& peer)
{
typedef std::vector <Endpoint> List;
std::vector <Endpoint> endpoints;
// fill in endpoints
// Add us to the list if we want incoming
if (m_slots.inboundSlots > 0)
endpoints.push_back (thisEndpoint ());
if (! endpoints.empty())
m_callback.sendPeerEndpoints (peer.id, endpoints);
}
// Assembles a list from the legacy endpoint container
//
void createLegacyEndpointList (std::vector <IPEndpoint>& list)
{
}
// Make outgoing connections to bring us up to desired out count // Make outgoing connections to bring us up to desired out count
// //
void makeOutgoingConnections () void makeOutgoingConnections ()
@@ -203,35 +182,11 @@ public:
} }
} }
// Fetch the list of IPEndpoint from the specified source //--------------------------------------------------------------------------
//
void fetch (Source& source)
{
m_journal.debug << "Fetching " << source.name();
#if 0
Source::IPEndpoints endpoints;
source.fetch (endpoints, m_journal);
if (! endpoints.empty())
{
for (Source::IPEndpoints::const_iterator iter (endpoints.begin());
iter != endpoints.end(); ++iter)
m_legacyCache->insert (*iter);
if (m_legacyCache->size() > (legacyEndpointCacheSize/2))
{
m_legacyCache.swap();
m_legacyCache->clear();
}
}
#endif
}
//----------------------------------------------------------------------
// //
// Logic // Logic
// //
//--------------------------------------------------------------------------
void setConfig (Config const& config) void setConfig (Config const& config)
{ {
@@ -239,13 +194,12 @@ public:
m_slots.update (m_config); m_slots.update (m_config);
} }
void addStaticSource (Source* source) void addStaticSource (SharedPtr <Source> const& source)
{ {
ScopedPointer <Source> p (source); fetch (source);
fetch (*source);
} }
void addSource (Source* source) void addSource (SharedPtr <Source> const& source)
{ {
m_sources.push_back (source); m_sources.push_back (source);
} }
@@ -255,83 +209,6 @@ public:
m_journal.debug << "Processing Update"; m_journal.debug << "Processing Update";
} }
//--------------------------------------------------------------------------
//
// LegacyEndpoint
//
// Completion handler for a LegacyEndpoint listening test.
//
void onCheckLegacyEndpoint (IPEndpoint const& endpoint,
Checker::Result const& result)
{
if (result.error == boost::asio::error::operation_aborted)
return;
RelativeTime const now (RelativeTime::fromStartup());
if (! result.error)
{
if (result.canAccept)
m_journal.info << "Legacy address " << endpoint <<
" passed listening test";
else
m_journal.warning << "Legacy address " << endpoint <<
" cannot accept incoming connections";
}
else
{
m_journal.error << "Listening test for legacy address " <<
endpoint << " failed: " << result.error.message();
}
}
void onPeerLegacyEndpoint (IPEndpoint const& address)
{
if (! validIPEndpoint (address))
return;
std::pair <LegacyEndpoint&, bool> result (
m_legacyCache.insert (address));
if (result.second)
{
// its new
m_journal.trace << "New legacy endpoint: " << address;
// VFALCO NOTE Temporarily we are doing a check on each
// legacy endpoint to test the async code
//
m_checker.async_test (address, bind (
&Logic::onCheckLegacyEndpoint,
this, address, _1));
}
}
//--------------------------------------------------------------------------
// Send mtENDPOINTS for each peer as needed
//
void sendEndpoints ()
{
if (! m_peers.empty())
{
m_journal.debug << "Sending mtENDPOINTS";
RelativeTime const now (RelativeTime::fromStartup());
for (Peers::iterator iter (m_peers.begin());
iter != m_peers.end(); ++iter)
{
PeerInfo const& peer (*iter);
if (peer.whenSendEndpoints <= now)
{
sendEndpoints (peer);
peer.whenSendEndpoints = now +
RelativeTime (secondsPerEndpoints);
}
}
}
}
// Called when a peer connection is established. // Called when a peer connection is established.
// We are guaranteed that the PeerID is not already in our map. // We are guaranteed that the PeerID is not already in our map.
// //
@@ -363,6 +240,85 @@ public:
m_peers.erase (iter); m_peers.erase (iter);
} }
//--------------------------------------------------------------------------
//
// CachedEndpoints
//
//--------------------------------------------------------------------------
// Returns true if the Endpoint contains no invalid data.
//
bool validEndpoint (Endpoint const& endpoint)
{
// This function is here in case we add more stuff
// we want to validate to the Endpoint struct.
//
return validIPEndpoint (endpoint.address);
}
// Prunes invalid endpoints from a list.
//
void pruneEndpoints (
std::string const& source, std::vector <Endpoint>& list)
{
for (std::vector <Endpoint>::iterator iter (list.begin());
iter != list.end();)
{
if (! validEndpoint (*iter))
{
iter = list.erase (iter);
m_journal.error <<
"Invalid endpoint " << iter->address <<
" from " << source;
}
else
{
++iter;
}
}
}
// Send mtENDPOINTS for the specified peer
//
void sendEndpoints (PeerInfo const& peer)
{
typedef std::vector <Endpoint> List;
std::vector <Endpoint> endpoints;
// fill in endpoints
// Add us to the list if we want incoming
if (m_slots.inboundSlots > 0)
endpoints.push_back (thisEndpoint ());
if (! endpoints.empty())
m_callback.sendPeerEndpoints (peer.id, endpoints);
}
// Send mtENDPOINTS for each peer as needed
//
void sendEndpoints ()
{
if (! m_peers.empty())
{
m_journal.debug << "Sending mtENDPOINTS";
RelativeTime const now (RelativeTime::fromStartup());
for (Peers::iterator iter (m_peers.begin());
iter != m_peers.end(); ++iter)
{
PeerInfo const& peer (*iter);
if (peer.whenSendEndpoints <= now)
{
sendEndpoints (peer);
peer.whenSendEndpoints = now +
RelativeTime (secondsPerEndpoints);
}
}
}
}
// Called when the Checker completes a connectivity test // Called when the Checker completes a connectivity test
// //
void onCheckEndpoint (PeerID const& id, void onCheckEndpoint (PeerID const& id,
@@ -407,26 +363,27 @@ public:
} }
} }
// Processes a list of Endpoint received from a peer. // Called when a peer sends us the mtENDPOINTS message.
// //
void onPeerEndpoints (PeerID const& id, std::vector <Endpoint> endpoints) void onPeerEndpoints (PeerID const& id, std::vector <Endpoint> list)
{ {
pruneEndpoints (endpoints);
Peers::iterator iter (m_peers.find (id)); Peers::iterator iter (m_peers.find (id));
bassert (iter != m_peers.end()); bassert (iter != m_peers.end());
RelativeTime const now (RelativeTime::fromStartup()); RelativeTime const now (RelativeTime::fromStartup());
PeerInfo const& peer (*iter); PeerInfo const& peer (*iter);
if (now >= peer.whenReceiveEndpoints) pruneEndpoints (peer.address.to_string(), list);
{
m_journal.debug << "Received " << endpoints.size() << // Log at higher severity if this is the first time
"Endpoint descriptors from " << peer.address; m_journal.stream (peer.whenAcceptEndpoints.isZero() ?
Journal::kInfo : Journal::kTrace) <<
"Received " << list.size() <<
" endpoints from " << peer.address;
// We charge a load penalty if the peer sends us more than // We charge a load penalty if the peer sends us more than
// numberOfEndpoints peers in a single message // numberOfEndpoints peers in a single message
if (endpoints.size() > numberOfEndpoints) if (list.size() > numberOfEndpoints)
{ {
m_journal.warning << "Charging peer " << peer.address << m_journal.warning << "Charging peer " << peer.address <<
" for sending too many endpoints"; " for sending too many endpoints";
@@ -436,21 +393,22 @@ public:
// process the list // process the list
{ {
bool foundZeroHops (false); bool foundNeighbor (false);
bool chargedPenalty (false); bool chargedPenalty (false);
for (std::vector <Endpoint>::const_iterator iter (endpoints.begin()); for (std::vector <Endpoint>::const_iterator iter (list.begin());
iter != endpoints.end(); ++iter) iter != list.end(); ++iter)
{ {
Endpoint const& endpoint (*iter); Endpoint const& endpoint (*iter);
if (endpoint.hops == 0) if (endpoint.hops == 0)
{ {
if (! foundZeroHops) if (! foundNeighbor)
{ {
foundZeroHops = true; foundNeighbor = true;
IPEndpoint const address ( // Test the peer's listening port if its the first time
endpoint.address.withPort (endpoint.port)); if (! peer.checked)
m_checker.async_test (address, bind (&Logic::onCheckEndpoint, m_checker.async_test (endpoint.address, bind (
this, id, address, _1)); &Logic::onCheckEndpoint, this, id,
endpoint.address, _1));
} }
else if (! chargedPenalty) else if (! chargedPenalty)
{ {
@@ -465,13 +423,109 @@ public:
} }
} }
peer.whenReceiveEndpoints = now + secondsPerEndpoints; peer.whenAcceptEndpoints = now + secondsPerEndpoints;
}
//--------------------------------------------------------------------------
//
// LegacyEndpoint
//
//--------------------------------------------------------------------------
// Fetch addresses into the LegacyEndpointCache for bootstrapping
//
void fetch (SharedPtr <Source> const& source)
{
Source::Results results;
{
{
SharedState::Access state (m_state);
if (state->stopping)
return;
state->fetchSource = source;
}
source->fetch (results, m_journal);
{
SharedState::Access state (m_state);
if (state->stopping)
return;
state->fetchSource = nullptr;
}
}
if (! results.error)
{
std::size_t newEntries (0);
for (std::vector <IPEndpoint>::const_iterator iter (results.list.begin());
iter != results.list.end(); ++iter)
{
std::pair <LegacyEndpoint const&, bool> result (
m_legacyCache.insert (*iter));
if (result.second)
++newEntries;
}
m_journal.debug <<
"Fetched " << results.list.size() <<
" legacy endpoints (" << newEntries << " new) "
"from " << source->name();
} }
else else
{ {
m_journal.warning << "Charging peer " << peer.address << m_journal.error <<
" for sending too quickly"; "Fetch " << source->name() << "failed: " <<
m_callback.chargePeerLoadPenalty (id); results.error.message();
}
}
// Completion handler for a LegacyEndpoint listening test.
//
void onCheckLegacyEndpoint (IPEndpoint const& endpoint,
Checker::Result const& result)
{
if (result.error == boost::asio::error::operation_aborted)
return;
RelativeTime const now (RelativeTime::fromStartup());
if (! result.error)
{
if (result.canAccept)
m_journal.info << "Legacy address " << endpoint <<
" passed listening test";
else
m_journal.warning << "Legacy address " << endpoint <<
" cannot accept incoming connections";
}
else
{
m_journal.error << "Listening test for legacy address " <<
endpoint << " failed: " << result.error.message();
}
}
void onPeerLegacyEndpoint (IPEndpoint const& address)
{
if (! validIPEndpoint (address))
return;
std::pair <LegacyEndpoint const&, bool> result (
m_legacyCache.insert (address));
if (result.second)
{
// its new
m_journal.trace << "New legacy endpoint: " << address;
#if 0
// VFALCO NOTE Temporarily we are doing a check on each
// legacy endpoint to test the async code
//
m_checker.async_test (address, bind (
&Logic::onCheckLegacyEndpoint,
this, address, _1));
#endif
} }
} }
}; };

View File

@@ -278,21 +278,14 @@ public:
} }
void onStop () void onStop ()
{
m_checker.cancel ();
if (this->Thread::isThreadRunning ())
{ {
m_journal.debug << "Stopping"; m_journal.debug << "Stopping";
m_checker.cancel ();
m_logic.stop ();
m_connectTimer.cancel(); m_connectTimer.cancel();
m_endpointsTimer.cancel(); m_endpointsTimer.cancel();
m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this)); m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this));
} }
else
{
stopped();
}
}
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------

View File

@@ -39,7 +39,7 @@ struct PeerInfo
, checked (inbound_ ? false : true) , checked (inbound_ ? false : true)
, canAccept (inbound_ ? false : true) , canAccept (inbound_ ? false : true)
, whenSendEndpoints (RelativeTime::fromStartup()) , whenSendEndpoints (RelativeTime::fromStartup())
, whenReceiveEndpoints (RelativeTime::fromStartup()) , whenAcceptEndpoints (RelativeTime::fromStartup())
{ {
} }
@@ -62,7 +62,7 @@ struct PeerInfo
// This is to prevent flooding or spamming. Receipt of mtENDPOINTS // This is to prevent flooding or spamming. Receipt of mtENDPOINTS
// sooner than the allotted time should impose a load charge. // sooner than the allotted time should impose a load charge.
// //
RelativeTime mutable whenReceiveEndpoints; RelativeTime mutable whenAcceptEndpoints;
// All the Endpoint records we have received from this peer // All the Endpoint records we have received from this peer
Endpoints mutable endpoints; Endpoints mutable endpoints;

View File

@@ -23,17 +23,31 @@
namespace ripple { namespace ripple {
namespace PeerFinder { namespace PeerFinder {
/** A source of IPEndpoint for peers. */ /** A static or dynamic source of peer addresses.
class Source These are used as fallbacks when we are bootstrapping and don't have
a local cache, or when none of our addresses are functioning. Typically
sources will represent things like static text in the config file, a
separate local file with addresses, or a remote HTTPS URL that can
be updated automatically. Another solution is to use a custom DNS server
that hands out peer IP addresses when name lookups are performed.
*/
class Source : public SharedObject
{ {
public: public:
typedef std::vector <IPEndpoint> IPEndpoints; /** The results of a fetch. */
struct Results
{
// error_code on a failure
ErrorCode error;
// list of fetched endpoints
std::vector <IPEndpoint> list;
};
virtual ~Source () { } virtual ~Source () { }
virtual std::string const& name () = 0; virtual std::string const& name () = 0;
virtual void cancel () { } virtual void cancel () { }
virtual void fetch (IPEndpoints& list, Journal journal) = 0; virtual void fetch (Results& results, Journal journal) = 0;
}; };
} }

View File

@@ -38,18 +38,17 @@ public:
return m_name; return m_name;
} }
void fetch (IPEndpoints& list, Journal journal) void fetch (Results& results, Journal journal)
{ {
list.resize (0); results.list.resize (0);
list.reserve (m_strings.size()); results.list.reserve (m_strings.size());
for (int i = 0; i < m_strings.size (); ++i) for (int i = 0; i < m_strings.size (); ++i)
{ {
IPEndpoint ep ( IPEndpoint ep (
IPEndpoint::from_string_altform ( IPEndpoint::from_string_altform (
m_strings [i])); m_strings [i]));
if (! ep.empty()) if (! ep.empty())
list.push_back (ep); results.list.push_back (ep);
} }
} }
@@ -60,7 +59,7 @@ private:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
SourceStrings* SourceStrings::New (std::string const& name, Strings const& strings) SharedPtr <Source> SourceStrings::New (std::string const& name, Strings const& strings)
{ {
return new SourceStringsImp (name, strings); return new SourceStringsImp (name, strings);
} }

View File

@@ -23,13 +23,13 @@
namespace ripple { namespace ripple {
namespace PeerFinder { namespace PeerFinder {
/** Provides an IPEndpoint list from a set of strings. */ /** Provides addresses from a static set of strings. */
class SourceStrings : public Source class SourceStrings : public Source
{ {
public: public:
typedef std::vector <std::string> Strings; typedef std::vector <std::string> Strings;
static SourceStrings* New (std::string const& name, Strings const& strings); static SharedPtr <Source> New (std::string const& name, Strings const& strings);
}; };
} }

View File

@@ -51,6 +51,10 @@ enum
// The most Endpoint we will accept in mtENDPOINTS // The most Endpoint we will accept in mtENDPOINTS
,numberOfEndpointsMax = 20 ,numberOfEndpointsMax = 20
// How long an Endpoint will stay in the cache
// This should be a small multiple of the broadcast frequency
,cachedEndpointSecondsToLive = 60
//--------------------------------------------------------- //---------------------------------------------------------
// //
// LegacyEndpoint Settings // LegacyEndpoint Settings

View File

@@ -28,7 +28,7 @@
#include "beast/modules/beast_core/system/BeforeBoost.h" #include "beast/modules/beast_core/system/BeforeBoost.h"
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <boost/regex.hpp> #include <boost/regex.hpp>
#include <boost/unordered_set.hpp> #include <boost/unordered_map.hpp>
#include <boost/multi_index_container.hpp> #include <boost/multi_index_container.hpp>
#include <boost/multi_index/hashed_index.hpp> #include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/key_extractors.hpp> #include <boost/multi_index/key_extractors.hpp>
@@ -36,6 +36,8 @@
#include "beast/modules/beast_sqdb/beast_sqdb.h" #include "beast/modules/beast_sqdb/beast_sqdb.h"
#include "beast/modules/beast_asio/beast_asio.h" #include "beast/modules/beast_asio/beast_asio.h"
#include "beast/beast/boost/ErrorCode.h"
namespace ripple { namespace ripple {
using namespace beast; using namespace beast;
} }
@@ -43,6 +45,7 @@ using namespace beast;
# include "impl/Tuning.h" # include "impl/Tuning.h"
# include "impl/Checker.h" # include "impl/Checker.h"
#include "impl/CheckerAdapter.h" #include "impl/CheckerAdapter.h"
# include "impl/CachedEndpoint.h"
#include "impl/EndpointCache.h" #include "impl/EndpointCache.h"
#include "impl/Slots.h" #include "impl/Slots.h"
#include "impl/Source.h" #include "impl/Source.h"

View File

@@ -307,8 +307,15 @@ private:
boost::asio::ip::address_v4::bytes_type bytes (addr.to_v4().to_bytes()); boost::asio::ip::address_v4::bytes_type bytes (addr.to_v4().to_bytes());
m_remoteAddress = IPEndpoint (IPEndpoint::V4 ( m_remoteAddress = IPEndpoint (IPEndpoint::V4 (
bytes[0], bytes[1], bytes[2], bytes[3]), 0); bytes[0], bytes[1], bytes[2], bytes[3]), 0);
if (! m_isInbound)
m_remoteAddress = m_remoteAddress.withPort (
getNativeSocket().remote_endpoint().port());
}
else
{
// TODO: Support ipv6
bassertfalse;
} }
m_remoteAddressSet = true; m_remoteAddressSet = true;
if (m_socket->getFlags ().set (MultiSocket::Flag::proxy) && m_isInbound) if (m_socket->getFlags ().set (MultiSocket::Flag::proxy) && m_isInbound)
@@ -1683,8 +1690,7 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet)
in_addr addr; in_addr addr;
addr.s_addr = tm.ipv4().ipv4(); addr.s_addr = tm.ipv4().ipv4();
IPEndpoint::V4 v4 (ntohl (addr.s_addr)); IPEndpoint::V4 v4 (ntohl (addr.s_addr));
endpoint.address = IPEndpoint (v4, 0); endpoint.address = IPEndpoint (v4, tm.ipv4().ipv4port ());
endpoint.port = tm.ipv4().ipv4port ();
} }
else else
{ {
@@ -1695,8 +1701,8 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet)
// by performing a connectivity test. // by performing a connectivity test.
// //
bassert (m_remoteAddressSet); bassert (m_remoteAddressSet);
endpoint.address = m_remoteAddress.withPort (0); endpoint.address = m_remoteAddress.withPort (
endpoint.port = tm.ipv4().ipv4port (); tm.ipv4().ipv4port ());
} }
// slots // slots

View File

@@ -149,7 +149,7 @@ public:
toNetworkByteOrder (ep.address.v4().value)); toNetworkByteOrder (ep.address.v4().value));
else else
tme.mutable_ipv4()->set_ipv4(0); tme.mutable_ipv4()->set_ipv4(0);
tme.mutable_ipv4()->set_ipv4port (ep.port); tme.mutable_ipv4()->set_ipv4port (ep.address.port());
tme.set_hops (ep.hops); tme.set_hops (ep.hops);
tme.set_slots (ep.incomingSlotsAvailable); tme.set_slots (ep.incomingSlotsAvailable);
@@ -718,8 +718,12 @@ bool PeersImp::peerConnected (Peer::ref peer, const RippleAddress& naPeer,
mConnectedMap[naPeer] = peer; mConnectedMap[naPeer] = peer;
bNew = true; bNew = true;
// Notify peerfinder since this is a connection that we didn't know about and are keeping // Notify peerfinder since this is a connection that we didn't
getPeerFinder ().onPeerConnected (RipplePublicKey (peer->getNodePublic()), peer->getPeerEndpoint(), peer->isInbound()); // know about and are keeping
//
getPeerFinder ().onPeerConnected (RipplePublicKey (
peer->getNodePublic()), peer->getPeerEndpoint(),
peer->isInbound());
assert (peer->getPeerId () != 0); assert (peer->getPeerId () != 0);
mPeerIdMap.insert (std::make_pair (peer->getPeerId (), peer)); mPeerIdMap.insert (std::make_pair (peer->getPeerId (), peer));