PeerFinder work

This commit is contained in:
Vinnie Falco
2013-10-05 11:59:17 -07:00
parent 500bddebff
commit 72681fa7fb
11 changed files with 136 additions and 75 deletions

View File

@@ -90,7 +90,7 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\EndpointCache.cpp">
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Cache.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
@@ -1644,7 +1644,7 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CachedEndpoint.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Checker.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\EndpointCache.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Cache.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\LegacyEndpoint.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\LegacyEndpointCache.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Logic.h" />

View File

@@ -1080,9 +1080,6 @@
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Slots.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\EndpointCache.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\SourceStrings.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
@@ -1092,6 +1089,9 @@
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Checker.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Cache.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\ripple_basics\containers\KeyCache.h">
@@ -2175,9 +2175,6 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\PeerInfo.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\EndpointCache.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Source.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
@@ -2217,6 +2214,9 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Cache.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CachedEndpoint.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>

View File

@@ -25,7 +25,7 @@ namespace PeerFinder {
/** The Endpoint cache holds the short-lived relayed Endpoint messages.
*/
class EndpointCache
class Cache
{
private:
typedef boost::unordered_map <
@@ -36,21 +36,52 @@ private:
Table m_now;
Table m_prev;
// Refresh the existing entry with a new message
void refresh (CachedEndpoint& entry, Endpoint const& message)
{
entry.message.hops = std::min (entry.message.hops, message.hops);
// Copy the other fields based on uptime
if (entry.message.uptimeMinutes < message.uptimeMinutes)
{
entry.message.incomingSlotsAvailable = message.incomingSlotsAvailable;
entry.message.incomingSlotsMax = message.incomingSlotsMax;
entry.message.uptimeMinutes = message.uptimeMinutes;
entry.message.featureList = message.featureList;
}
}
public:
explicit EndpointCache (Journal journal)
explicit Cache (Journal journal)
: m_journal (journal)
{
}
~EndpointCache ()
~Cache ()
{
}
// Insert or update an existing entry with the new message
//
void update (Endpoint const& ep)
// Cycle the tables
void cycle()
{
std::swap (m_now, m_prev);
m_now.clear();
}
// Insert or update an existing entry with the new message
void insert (Endpoint const& message)
{
Table::iterator iter (m_prev.find (message.address));
if (iter != m_prev.end())
{
}
else
{
std::pair <Table::iterator, bool> result (
m_now.emplace (message.address, message));
if (!result.second)
refresh (result.first->second, message);
}
}
};

View File

@@ -25,23 +25,12 @@ namespace PeerFinder {
struct CachedEndpoint
{
CachedEndpoint (Endpoint const& endpoint)
: hops (endpoint.hops)
, incomingSlotsAvailable (endpoint.incomingSlotsAvailable)
, incomingSlotsMax (endpoint.incomingSlotsMax)
, uptimeMinutes (endpoint.uptimeMinutes)
, featureList (endpoint.featureList)
CachedEndpoint (Endpoint const& message_)
: message (message_)
{
}
int hops;
uint32 incomingSlotsAvailable;
uint32 incomingSlotsMax;
uint32 uptimeMinutes;
std::string featureList;
// The peer closest to the endpoint, measured in hops.
PeerID origin;
Endpoint message;
};
}

View File

@@ -92,7 +92,7 @@ public:
// Our view of the current set of connected peers.
Peers m_peers;
EndpointCache m_cache;
Cache m_cache;
LegacyEndpointCache m_legacyCache;
@@ -204,9 +204,14 @@ public:
m_sources.push_back (source);
}
void onUpdate ()
// Called periodically to cycle and age the varioous caches.
//
void cycleCache()
{
m_journal.debug << "Processing Update";
m_cache.cycle();
for (Peers::iterator iter (m_peers.begin());
iter != m_peers.end(); ++iter)
iter->received.cycle();
}
// Called when a peer connection is established.
@@ -301,7 +306,7 @@ public:
{
if (! m_peers.empty())
{
m_journal.debug << "Sending mtENDPOINTS";
m_journal.trace << "Sending mtENDPOINTS";
RelativeTime const now (RelativeTime::fromStartup());
@@ -313,7 +318,7 @@ public:
{
sendEndpoints (peer);
peer.whenSendEndpoints = now +
RelativeTime (secondsPerEndpoints);
RelativeTime (secondsPerMessage);
}
}
}
@@ -387,43 +392,64 @@ public:
{
m_journal.warning << "Charging peer " << peer.address <<
" for sending too many endpoints";
m_callback.chargePeerLoadPenalty(id);
}
// process the list
// Process each entry
//
int neighborCount (0);
for (std::vector <Endpoint>::const_iterator iter (list.begin());
iter != list.end(); ++iter)
{
bool foundNeighbor (false);
bool chargedPenalty (false);
for (std::vector <Endpoint>::const_iterator iter (list.begin());
iter != list.end(); ++iter)
Endpoint const& message (*iter);
// Remember that this peer gave us this address
peer.received.insert (message.address);
if (message.hops == 0)
{
Endpoint const& endpoint (*iter);
if (endpoint.hops == 0)
++neighborCount;
if (neighborCount == 1)
{
if (! foundNeighbor)
if (! peer.checked)
{
foundNeighbor = true;
// Test the peer's listening port if its the first time
if (! peer.checked)
m_checker.async_test (endpoint.address, bind (
&Logic::onCheckEndpoint, this, id,
endpoint.address, _1));
// Test the peer's listening port before
// adding it to the cache for the first time.
//
m_checker.async_test (message.address, bind (
&Logic::onCheckEndpoint, this, id,
message.address, _1));
// Note that we simply discard the first Endpoint
// that the neighbor sends when we perform the
// listening test. They will just send us another
// one in a few seconds.
}
else if (! chargedPenalty)
else if (peer.canAccept)
{
// Only charge them once (?)
chargedPenalty = true;
// More than one zero-hops message?!
m_journal.warning << "Charging peer " << peer.address <<
" for sending more than one hops==0 endpoint";
m_callback.chargePeerLoadPenalty (id);
// We only add to the cache if the neighbor passed the
// listening test, else we silently drop their message
// since their listening port is misconfigured.
//
m_cache.insert (message);
}
}
}
else
{
m_cache.insert (message);
}
}
peer.whenAcceptEndpoints = now + secondsPerEndpoints;
if (neighborCount > 1)
{
m_journal.warning << "Peer " << peer.address <<
" sent " << neighborCount << " entries with hops=0";
// VFALCO TODO Should we apply load charges?
}
peer.whenAcceptEndpoints = now + secondsPerMessage;
}
//--------------------------------------------------------------------------

View File

@@ -188,7 +188,8 @@ public:
CheckerAdapter m_checker;
Logic m_logic;
DeadlineTimer m_connectTimer;
DeadlineTimer m_endpointsTimer;
DeadlineTimer m_messageTimer;
DeadlineTimer m_cacheTimer;
//--------------------------------------------------------------------------
@@ -200,7 +201,8 @@ public:
, m_checker (m_queue)
, m_logic (callback, m_store, m_checker, journal)
, m_connectTimer (this)
, m_endpointsTimer (this)
, m_messageTimer (this)
, m_cacheTimer (this)
{
#if 1
#if BEAST_MSVC
@@ -283,7 +285,8 @@ public:
m_checker.cancel ();
m_logic.stop ();
m_connectTimer.cancel();
m_endpointsTimer.cancel();
m_messageTimer.cancel();
m_cacheTimer.cancel();
m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this));
}
@@ -296,10 +299,15 @@ public:
m_queue.dispatch (bind (&Logic::makeOutgoingConnections, &m_logic));
m_connectTimer.setExpiration (secondsPerConnect);
}
else if (timer == m_endpointsTimer)
else if (timer == m_messageTimer)
{
m_queue.dispatch (bind (&Logic::sendEndpoints, &m_logic));
m_endpointsTimer.setExpiration (secondsPerEndpoints);
m_messageTimer.setExpiration (secondsPerMessage);
}
else if (timer == m_cacheTimer)
{
m_queue.dispatch (bind (&Logic::cycleCache, &m_logic));
m_cacheTimer.setExpiration (cacheSecondsToLive);
}
}
@@ -326,8 +334,9 @@ public:
}
m_connectTimer.setExpiration (secondsPerConnect);
m_endpointsTimer.setExpiration (secondsPerEndpoints);
m_messageTimer.setExpiration (secondsPerMessage);
m_cacheTimer.setExpiration (cacheSecondsToLive);
m_queue.post (bind (&Logic::makeOutgoingConnections, &m_logic));
}

View File

@@ -23,7 +23,7 @@
namespace ripple {
namespace PeerFinder {
typedef AgedHistory <std::set <Endpoint> > Endpoints;
//typedef AgedHistory <std::set <Endpoint> > Endpoints;
//--------------------------------------------------------------------------
@@ -64,8 +64,12 @@ struct PeerInfo
//
RelativeTime mutable whenAcceptEndpoints;
// All the Endpoint records we have received from this peer
Endpoints mutable endpoints;
// The set of all recent IPEndpoint that we have seen from this peer.
// We try to avoid sending a peer the same addresses they gave us.
//
CycledSet <IPEndpoint,
IPEndpoint::hasher,
IPEndpoint::key_equal> mutable received;
};
}

View File

@@ -43,7 +43,7 @@ enum
//
// How often we send or accept mtENDPOINTS messages per peer
,secondsPerEndpoints = 5
,secondsPerMessage = 5
// How many Endpoint to send in each mtENDPOINTS
,numberOfEndpoints = 10
@@ -53,7 +53,7 @@ enum
// How long an Endpoint will stay in the cache
// This should be a small multiple of the broadcast frequency
,cachedEndpointSecondsToLive = 60
,cacheSecondsToLive = 60
//---------------------------------------------------------
//

View File

@@ -21,7 +21,7 @@
#include "ripple_peerfinder.h"
#include "../../ripple/types/api/AgedHistory.h"
#include "../../ripple/types/ripple_types.h"
#include <set>
@@ -45,8 +45,8 @@ using namespace beast;
# include "impl/Tuning.h"
# include "impl/Checker.h"
#include "impl/CheckerAdapter.h"
# include "impl/CachedEndpoint.h"
#include "impl/EndpointCache.h"
# include "impl/CachedEndpoint.h"
#include "impl/Cache.h"
#include "impl/Slots.h"
#include "impl/Source.h"
#include "impl/SourceStrings.h"
@@ -60,7 +60,7 @@ using namespace beast;
#include "impl/Checker.cpp"
#include "impl/Config.cpp"
#include "impl/Endpoint.cpp"
#include "impl/EndpointCache.cpp"
#include "impl/Cache.cpp"
#include "impl/Manager.cpp"
#include "impl/Slots.cpp"
#include "impl/SourceStrings.cpp"

View File

@@ -112,8 +112,10 @@ public:
void preparePeerFinder()
{
PeerFinder::Config config;
// config.maxPeerCount = ?
#if RIPPLE_USE_PEERFINDER
config.maxPeerCount = 100;
#endif
config.wantIncoming =
(! getConfig ().PEER_PRIVATE) &&