PeerFinder work

This commit is contained in:
Nik Bougalis
2013-10-29 20:47:15 -07:00
committed by Vinnie Falco
parent e710bd2183
commit 42b841735e
16 changed files with 624 additions and 58 deletions

View File

@@ -102,6 +102,12 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile> </ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Resolver.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Slots.cpp"> <ClCompile Include="..\..\src\ripple\peerfinder\impl\Slots.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -1684,11 +1690,14 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Checker.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\Checker.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Cache.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\Cache.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Giveaways.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\GiveawaysAtHop.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\LegacyEndpoint.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\LegacyEndpoint.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\LegacyEndpointCache.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\LegacyEndpointCache.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Logic.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\Logic.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\LogicType.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\LogicType.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\PeerInfo.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\PeerInfo.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Resolver.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Slots.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\Slots.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Source.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\Source.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\SourceStrings.h" /> <ClInclude Include="..\..\src\ripple\peerfinder\impl\SourceStrings.h" />

View File

@@ -1125,6 +1125,9 @@
<ClCompile Include="..\..\src\ripple\resource\impl\LegacyFees.cpp"> <ClCompile Include="..\..\src\ripple\resource\impl\LegacyFees.cpp">
<Filter>[1] Ripple\resource\impl</Filter> <Filter>[1] Ripple\resource\impl</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Resolver.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="..\..\src\ripple_basics\containers\KeyCache.h"> <ClInclude Include="..\..\src\ripple_basics\containers\KeyCache.h">
@@ -2304,6 +2307,15 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\LogicType.h"> <ClInclude Include="..\..\src\ripple\peerfinder\impl\LogicType.h">
<Filter>[1] Ripple\peerfinder\impl</Filter> <Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Resolver.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Giveaways.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\GiveawaysAtHop.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<CustomBuild Include="..\..\src\ripple_data\protocol\ripple.proto"> <CustomBuild Include="..\..\src\ripple_data\protocol\ripple.proto">

View File

@@ -70,12 +70,30 @@ public:
virtual void addFallbackURL (std::string const& name, virtual void addFallbackURL (std::string const& name,
std::string const& url) = 0; std::string const& url) = 0;
/** Called when a new peer connection is established. /** Called when an (outgoing) connection attempt to a particular address
is about to begin.
*/
virtual void onPeerConnectAttemptBegins (IPAddress const& address) = 0;
/** Called when an (outgoing) connection attempt to a particular address
completes, whether it succeeds or fails.
*/
virtual void onPeerConnectAttemptCompletes (IPAddress const& address,
bool success) = 0;
/** Called when a new peer connection is established but before get
we exchange hello messages.
*/
virtual void onPeerConnected (IPAddress const& address,
bool inbound) = 0;
/** Called when a new peer connection is established after we exchange
hello messages.
Internally, we add the peer to our tracking table, validate that Internally, we add the peer to our tracking table, validate that
we can connect to it, and begin advertising it to others after we can connect to it, and begin advertising it to others after
we are sure that its connection is stable. we are sure that its connection is stable.
*/ */
virtual void onPeerConnected (PeerID const& id, virtual void onPeerHandshake (PeerID const& id,
IPAddress const& address, IPAddress const& address,
bool inbound) = 0; bool inbound) = 0;

View File

@@ -31,6 +31,9 @@ private:
typedef boost::unordered_map < typedef boost::unordered_map <
IPAddress, CachedEndpoint, IPAddress::hasher> Table; IPAddress, CachedEndpoint, IPAddress::hasher> Table;
typedef std::set <
IPAddress*, PtrCompareFunctor <IPAddress> > AddressSet;
Journal m_journal; Journal m_journal;
Table m_endpoints; Table m_endpoints;
@@ -39,9 +42,15 @@ private:
// in oldest-to-newest order. The oldest item is at the head. // in oldest-to-newest order. The oldest item is at the head.
List <CachedEndpoint> m_list; List <CachedEndpoint> m_list;
// A set of IP addresses which we know about
AddressSet m_addresses;
unsigned int m_generation;
public: public:
explicit Cache (Journal journal) explicit Cache (Journal journal)
: m_journal (journal) : m_journal (journal)
, m_generation(0)
{ {
} }
@@ -54,8 +63,7 @@ public:
return m_endpoints.size(); return m_endpoints.size();
} }
// Cycle the tables void sweep (DiscreteTime now)
void cycle(DiscreteTime now)
{ {
List <CachedEndpoint>::iterator iter (m_list.begin()); List <CachedEndpoint>::iterator iter (m_list.begin());
@@ -82,10 +90,16 @@ public:
std::pair <Table::iterator, bool> result ( std::pair <Table::iterator, bool> result (
m_endpoints.emplace (message.address, CachedEndpoint(message, now))); m_endpoints.emplace (message.address, CachedEndpoint(message, now)));
if (!result.second) CachedEndpoint& entry (result.first->second);
{ // There was already an entry for this endpoint. Update it.
CachedEndpoint& entry (result.first->second);
// We ignore messages that we receive at a higher hop count. We should
// consider having a counter that monotonically increases per reboot
// so that we can detect a server restart.
if (!result.second && (entry.message.hops > message.hops))
return;
if (!result.second)
{
entry.message.hops = std::min (entry.message.hops, message.hops); entry.message.hops = std::min (entry.message.hops, message.hops);
// Copy the other fields based on uptime // Copy the other fields based on uptime
@@ -103,8 +117,6 @@ public:
m_list.erase (m_list.iterator_to(entry)); m_list.erase (m_list.iterator_to(entry));
} }
CachedEndpoint& entry (result.first->second);
m_journal.debug << message.address << m_journal.debug << message.address <<
"valid " << entry.whenExpires << "valid " << entry.whenExpires <<
" (" << entry.message.incomingSlotsAvailable << " (" << entry.message.incomingSlotsAvailable <<
@@ -113,8 +125,8 @@ public:
m_list.push_back (entry); m_list.push_back (entry);
} }
// Returns all the known endpoints we have, sorted by distance (that is, // Get all known endpoints, sorted by distance (i.e. by hop).
// by hop). //
Giveaways getGiveawayList() Giveaways getGiveawayList()
{ {
Giveaways giveaway; Giveaways giveaway;
@@ -122,8 +134,7 @@ public:
for (List <CachedEndpoint>::iterator iter (m_list.begin()); for (List <CachedEndpoint>::iterator iter (m_list.begin());
iter != m_list.end(); iter++) iter != m_list.end(); iter++)
{ {
if (iter->message.hops < maxPeerHopCount) giveaway.add (*iter);
giveaway.add (*iter);
} }
return giveaway; return giveaway;

View File

@@ -33,6 +33,7 @@ class Giveaways
public: public:
typedef std::vector <GiveawaysAtHop>::iterator iterator; typedef std::vector <GiveawaysAtHop>::iterator iterator;
typedef std::vector <GiveawaysAtHop>::reverse_iterator reverse_iterator;
Giveaways() Giveaways()
: m_hopVector(maxPeerHopCount) : m_hopVector(maxPeerHopCount)
@@ -65,6 +66,8 @@ public:
m_shuffled = true; m_shuffled = true;
} }
// Provides an iterator that starts from hop 0 and goes all the way to
// the max hop.
iterator begin () iterator begin ()
{ {
return m_hopVector.begin(); return m_hopVector.begin();
@@ -74,6 +77,18 @@ public:
{ {
return m_hopVector.end(); return m_hopVector.end();
} }
// Provides an iterator that starts from the max hop and goes all the way
// down to hop 0.
reverse_iterator rbegin ()
{
return m_hopVector.rbegin();
}
reverse_iterator rend ()
{
return m_hopVector.rend();
}
}; };
} }

View File

@@ -47,13 +47,10 @@ public:
// that we will be returning. // that we will be returning.
void add (CachedEndpoint &endpoint) void add (CachedEndpoint &endpoint)
{ {
if (endpoint.message.hops < maxPeerHopCount) if (endpoint.color)
{ m_list.push_back(&endpoint);
if (endpoint.color) else
m_list.push_back(&endpoint); m_used.push_back(&endpoint);
else
m_used.push_back(&endpoint);
}
} }
// Shuffles the list of peers we are about to hand out. // Shuffles the list of peers we are about to hand out.
@@ -63,7 +60,7 @@ public:
} }
// Prepare to begin iterating over the entire set of peers again. // Prepare to begin iterating over the entire set of peers again.
void reset () bool reset ()
{ {
// We need to add any entries from the stale vector in the tail // We need to add any entries from the stale vector in the tail
// end of the fresh vector. We do not need to shuffle them. // end of the fresh vector. We do not need to shuffle them.
@@ -73,8 +70,19 @@ public:
m_used.clear(); m_used.clear();
} }
// We need to start from the beginning again. // And start iterating the list from the beginning.
m_position = m_list.begin(); m_position = m_list.begin();
// Return whether there is anything in this vector to iterate.
return !empty();
}
// Determines if we have any giveaways at the current hop could; if we
// do not you should not dereference the iterator returned from "begin" or
// "rbegin"
bool empty() const
{
return m_list.empty();
} }
// This is somewhat counterintuitive, but it doesn't really "begin" // This is somewhat counterintuitive, but it doesn't really "begin"

View File

@@ -55,7 +55,21 @@ typedef boost::multi_index_container <
*/ */
class Logic class Logic
{ {
private:
typedef std::set < IPAddress > IPAddressSet;
public: public:
template < class T, class C = std::less<T> >
struct PtrComparator
{
bool operator()(const T *x, const T *y) const
{
C comp;
return comp(*x, *y);
}
};
struct State struct State
{ {
State () State ()
@@ -102,6 +116,9 @@ public:
LegacyEndpointCache m_legacyCache; LegacyEndpointCache m_legacyCache;
// Our set of connection attempts currently in-progress
IPAddressSet m_attemptsInProgress;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
Logic ( Logic (
@@ -201,6 +218,40 @@ public:
return true; return true;
} }
// Return endpoints to which we want to try to make outgoing connections.
// We preferentially return endpoints which are far away from as to try to
// improve the algebraic connectivity of the network graph. For more see
// http://en.wikipedia.org/wiki/Algebraic_connectivity
//
void getNewOutboundEndpoints (int needed, std::vector <IPAddress>& list)
{
Giveaways giveaway (m_cache.getGiveawayList());
int count = 0;
for (Giveaways::reverse_iterator iter (giveaway.rbegin());
iter != giveaway.rend(); ++iter)
{
// Check whether we have anything at the current hop level.
iter->reset ();
for(GiveawaysAtHop::iterator iter2 (iter->begin());
iter2 != iter->end() && (count != needed); ++iter2)
{
CachedEndpoint *ep (*iter2);
// NIKB TODO we need to check whether this peer is already
// connected prior to just returning it and wasting time
// trying to establish a redundant connection.
if(ep->message.incomingSlotsAvailable != 0)
{
list.push_back(ep->message.address);
++count;
}
}
}
}
// If configured to make outgoing connections, do us in order // If configured to make outgoing connections, do us in order
// to bring us up to desired out count. // to bring us up to desired out count.
// //
@@ -215,7 +266,8 @@ public:
int const needed (std::min ( int const needed (std::min (
m_slots.outDesired - m_slots.outboundCount, m_slots.outDesired - m_slots.outboundCount,
int (maxAddressesPerAttempt))); int (maxAddressesPerAttempt)));
m_legacyCache.get (needed, list, get_now());
getNewOutboundEndpoints (needed, list);
} }
} }
@@ -288,29 +340,69 @@ public:
m_sources.push_back (source); m_sources.push_back (source);
} }
// Called periodically to cycle and age the various caches. // Called periodically to sweep the cache and remove aged out items.
// //
void cycleCache() void sweepCache ()
{ {
m_cache.cycle (get_now()); m_cache.sweep (get_now());
for (Peers::iterator iter (m_peers.begin()); for (Peers::iterator iter (m_peers.begin());
iter != m_peers.end(); ++iter) iter != m_peers.end(); ++iter)
iter->received.cycle(); iter->received.cycle();
} }
void onPeerConnecting () // Called when an outbound connection attempt is started
//
void onPeerConnectAttemptBegins (IPAddress const& address)
{ {
std::pair <IPAddressSet::iterator, bool> ret =
m_attemptsInProgress.insert (address);
// We are always notified of connection attempts so if we think that
// something was in progress and a connection attempt begins then
// something is very wrong.
bassert (ret.second);
if (ret.second)
m_journal.debug << "Attempt for " << address << " is in progress";
else
m_journal.error << "Attempt for " << address << " was already in progress";
}
// Called when an outbound connection attempt completes
//
void onPeerConnectAttemptCompletes (IPAddress const& address, bool success)
{
IPAddressSet::size_type ret = m_attemptsInProgress.erase (address);
bassert (ret == 1);
if (ret == 1)
m_journal.debug << "Attempt for " << address <<
" completed: " << (success ? "success" : "failure");
else
m_journal.error << "Attempt for untracked " << address <<
" completed: " << (success ? "success" : "failure");
}
// Called when a peer connection is established but before a handshake
// occurs.
void onPeerConnected (IPAddress const& address, bool incoming)
{
m_journal.error << "Connected: " << address <<
(incoming ? " (incoming)" : " (outgoing)");
} }
// Called when a peer connection is established. // Called when a peer connection is established.
// We are guaranteed that the PeerID is not already in our map. // We are guaranteed that the PeerID is not already in our map.
// but we are *NOT* guaranteed that the IP isn't. So we need // but we are *NOT* guaranteed that the IP isn't. So we need
// to be careful. // to be careful.
void onPeerConnected (PeerID const& id, void onPeerHandshake (PeerID const& id,
IPAddress const& address, bool inbound) IPAddress const& address, bool inbound)
{ {
m_journal.debug << "Peer connected: " << address; m_journal.debug << "Handshake: " << address;
// If this is outgoing, record the success // If this is outgoing, record the success
if (! inbound) if (! inbound)
m_legacyCache.checked (address, true); m_legacyCache.checked (address, true);
@@ -332,14 +424,14 @@ public:
// Called when a peer is disconnected. // Called when a peer is disconnected.
// We are guaranteed to get this exactly once for each // We are guaranteed to get this exactly once for each
// corresponding call to onPeerConnected. // corresponding call to onPeerHandshake.
// //
void onPeerDisconnected (PeerID const& id) void onPeerDisconnected (PeerID const& id)
{ {
Peers::iterator iter (m_peers.find (id)); Peers::iterator iter (m_peers.find (id));
bassert (iter != m_peers.end()); bassert (iter != m_peers.end());
PeerInfo const& peer (*iter); PeerInfo const& peer (*iter);
m_journal.debug << "Peer disconnected: " << peer.address; m_journal.debug << "Disconnected: " << peer.address;
m_slots.dropPeer (m_config, peer.inbound); m_slots.dropPeer (m_config, peer.inbound);
// VFALCO NOTE Update fixed peers count (HACKED) // VFALCO NOTE Update fixed peers count (HACKED)
@@ -418,11 +510,12 @@ public:
while(iter2 != iter->end()) while(iter2 != iter->end())
{ {
// FIXME NIKB check if the peer wants to receive this endpoint // FIXME NIKB check if the peer wants to receive this
// and add it to the list of endpoints we will send if he does. // endpoint and add it to the list of endpoints we will
// send if he does.
if(false) if(false)
iter->erase(iter2); iter2 = iter->erase(iter2);
else else
++iter2; ++iter2;
} }
@@ -439,11 +532,11 @@ public:
{ {
if (! m_peers.empty()) if (! m_peers.empty())
{ {
m_journal.trace << "Sending endpoints to our peers"; m_journal.trace << "Sending endpoints...";
DiscreteTime const now (get_now()); DiscreteTime const now (get_now());
// fill in endpoints // fill in endpoints.
Giveaways giveaway(m_cache.getGiveawayList()); Giveaways giveaway(m_cache.getGiveawayList());
for (Peers::iterator iter (m_peers.begin()); for (Peers::iterator iter (m_peers.begin());
@@ -486,10 +579,10 @@ public:
peer.canAccept = result.canAccept; peer.canAccept = result.canAccept;
if (peer.canAccept) if (peer.canAccept)
m_journal.info << "Peer " << peer.address << m_journal.info << peer.address <<
" passed listening test"; " passed listening test";
else else
m_journal.warning << "Peer " << peer.address << m_journal.warning << peer.address <<
" cannot accept incoming connections"; " cannot accept incoming connections";
} }
else else
@@ -533,13 +626,13 @@ public:
// numberOfEndpoints peers in a single message // numberOfEndpoints peers in a single message
if (list.size() > numberOfEndpoints) if (list.size() > numberOfEndpoints)
{ {
m_journal.warning << "Charging peer " << peer.address << m_journal.warning << "Charging " << peer.address <<
" for sending too many endpoints"; " for sending too many endpoints";
m_callback.chargePeerLoadPenalty(id); m_callback.chargePeerLoadPenalty(id);
} }
m_journal.debug << "Peer " << peer.address << m_journal.debug << peer.address <<
" sent us " << list.size() << " endpoints."; " sent us " << list.size() << " endpoints.";
// Process each entry // Process each entry
@@ -553,7 +646,7 @@ public:
// Remember that this peer gave us this address // Remember that this peer gave us this address
peer.received.insert (message.address); peer.received.insert (message.address);
m_journal.debug << "Received peer " << message.address << m_journal.debug << message.address <<
" at " << message.hops << " hops."; " at " << message.hops << " hops.";
if (message.hops == 0) if (message.hops == 0)
@@ -601,7 +694,7 @@ public:
if (neighborCount > 1) if (neighborCount > 1)
{ {
m_journal.warning << "Peer " << peer.address << m_journal.warning << peer.address <<
" sent " << neighborCount << " entries with hops=0"; " sent " << neighborCount << " entries with hops=0";
// VFALCO TODO Should we apply load charges? // VFALCO TODO Should we apply load charges?
} }

View File

@@ -299,19 +299,36 @@ public:
// VFALCO TODO This needs to be implemented // VFALCO TODO This needs to be implemented
} }
void onPeerConnecting () void onPeerConnectAttemptBegins (IPAddress const& address)
{ {
m_queue.dispatch ( m_queue.dispatch (
m_context.wrap ( m_context.wrap (
bind (&Logic::onPeerConnecting, &m_logic))); bind (&Logic::onPeerConnectAttemptBegins, &m_logic,
address)));
} }
void onPeerConnected (PeerID const& id, void onPeerConnectAttemptCompletes (IPAddress const& address, bool success)
IPAddress const& address, bool incoming) {
m_queue.dispatch (
m_context.wrap (
bind (&Logic::onPeerConnectAttemptCompletes, &m_logic,
address, success)));
}
void onPeerConnected (const IPAddress &address, bool incoming)
{ {
m_queue.dispatch ( m_queue.dispatch (
m_context.wrap ( m_context.wrap (
bind (&Logic::onPeerConnected, &m_logic, bind (&Logic::onPeerConnected, &m_logic,
address, incoming)));
}
void onPeerHandshake (PeerID const& id,
IPAddress const& address, bool incoming)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::onPeerHandshake, &m_logic,
id, address, incoming))); id, address, incoming)));
} }
@@ -402,7 +419,7 @@ public:
{ {
m_queue.dispatch ( m_queue.dispatch (
m_context.wrap ( m_context.wrap (
bind (&Logic::cycleCache, &m_logic))); bind (&Logic::sweepCache, &m_logic)));
m_cacheTimer.setExpiration (cacheSecondsToLive); m_cacheTimer.setExpiration (cacheSecondsToLive);
} }

View File

@@ -30,6 +30,24 @@ namespace PeerFinder {
// we keep one of these for each connected peer // we keep one of these for each connected peer
struct PeerInfo struct PeerInfo
{ {
enum State
{
// Some peculiar, unknown state
stateUnknown,
// A connection attempt is in progress
stateConnecting,
// A connection has been established but no handshake yet
stateConnected,
// A connection has been established and the handshake has completed
stateEstablished,
// A connection (of some kind) that is being torn down
stateDisconnecting
};
PeerInfo (PeerID const& id_, PeerInfo (PeerID const& id_,
IPAddress const& address_, IPAddress const& address_,
bool inbound_, bool inbound_,
@@ -37,9 +55,11 @@ struct PeerInfo
: id (id_) : id (id_)
, address (address_) , address (address_)
, inbound (inbound_) , inbound (inbound_)
, fixed (false)
, checked (inbound_ ? false : true) , checked (inbound_ ? false : true)
, canAccept (inbound_ ? false : true) , canAccept (inbound_ ? false : true)
, connectivityCheckInProgress (false) , connectivityCheckInProgress (false)
, peerState (stateUnknown)
, whenSendEndpoints (now) , whenSendEndpoints (now)
, whenAcceptEndpoints (now) , whenAcceptEndpoints (now)
{ {
@@ -49,6 +69,9 @@ struct PeerInfo
IPAddress address; IPAddress address;
bool inbound; bool inbound;
// Set to indicate that this is a fixed peer.
bool fixed;
// Tells us if we checked the connection. Outbound connections // Tells us if we checked the connection. Outbound connections
// are always considered checked since we successfuly connected. // are always considered checked since we successfuly connected.
bool mutable checked; bool mutable checked;
@@ -61,6 +84,9 @@ struct PeerInfo
// progress. Valid always. // progress. Valid always.
bool mutable connectivityCheckInProgress; bool mutable connectivityCheckInProgress;
// Indicates the state for this peer
State peerState;
// The time after which we will send the peer mtENDPOINTS // The time after which we will send the peer mtENDPOINTS
DiscreteTime mutable whenSendEndpoints; DiscreteTime mutable whenSendEndpoints;

View File

@@ -0,0 +1,46 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_PTRCOMPAREFUNC_H_INCLUDED
#define RIPPLE_PEERFINDER_PTRCOMPAREFUNC_H_INCLUDED
namespace ripple {
namespace PeerFinder {
//------------------------------------------------------------------------------
/** Compare two instances of a class of type T using the comparator specified
by class C via pointers. This does not compare the pointers themselves but
what the pointers point to.
*/
template < class T, class C = std::less<T> >
struct PtrCompareFunctor
{
bool operator()(T const *lhs, T const *rhs) const
{
C comp;
return comp(*lhs, *rhs);
}
};
}
}
#endif

View File

@@ -0,0 +1,185 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#if 0
namespace ripple {
namespace PeerFinder {
class ResolverImp
: public Resolver
, private Thread
, private LeakChecked <ResolverImp>
{
private:
class Request;
struct State
{
List <Request> list;
};
typedef SharedData <State> SharedState;
SharedState m_state;
boost::asio::io_service m_io_service;
boost::optional <boost::asio::io_service::work> m_work;
//--------------------------------------------------------------------------
static boost::asio::ip::tcp::endpoint fromIPAddress (
IPAddress const& ipEndpoint)
{
if (ipEndpoint.isV4 ())
{
return boost::asio::ip::tcp::endpoint (
boost::asio::ip::address_v4 (
ipEndpoint.v4().value),
ipEndpoint.port ());
}
bassertfalse;
return boost::asio::ip::tcp::endpoint ();
}
//--------------------------------------------------------------------------
class Request
: public SharedObject
, public List <Request>::Node
, private LeakChecked <Request>
{
public:
typedef SharedPtr <Request> Ptr;
typedef boost::asio::ip::tcp Protocol;
typedef boost::system::error_code error_code;
typedef Protocol::socket socket_type;
typedef Protocol::endpoint endpoint_type;
ResolverImp& m_owner;
boost::asio::io_service& m_io_service;
IPAddress m_address;
AbstractHandler <void (Result)> m_handler;
socket_type m_socket;
boost::system::error_code m_error;
bool m_canAccept;
Request (ResolverImp& owner, boost::asio::io_service& io_service,
IPAddress const& address, AbstractHandler <void (Result)> handler)
: m_owner (owner)
, m_io_service (io_service)
, m_address (address)
, m_handler (handler)
, m_socket (m_io_service)
, m_canAccept (false)
{
m_owner.add (*this);
m_socket.async_connect (fromIPAddress (m_address),
wrapHandler (boost::bind (&Request::handle_connect, Ptr(this),
boost::asio::placeholders::error), m_handler));
}
~Request ()
{
Result result;
result.address = m_address;
result.error = m_error;
m_io_service.wrap (m_handler) (result);
m_owner.remove (*this);
}
void cancel ()
{
m_socket.cancel();
}
void handle_connect (boost::system::error_code ec)
{
m_error = ec;
if (ec)
return;
m_canAccept = true;
}
};
//--------------------------------------------------------------------------
void add (Request& request)
{
SharedState::Access state (m_state);
state->list.push_back (request);
}
void remove (Request& request)
{
SharedState::Access state (m_state);
state->list.erase (state->list.iterator_to (request));
}
void run ()
{
m_io_service.run ();
}
public:
ResolverImp ()
: Thread ("PeerFinder::Resolver")
, m_work (boost::in_place (boost::ref (m_io_service)))
{
startThread ();
}
~ResolverImp ()
{
// cancel pending i/o
cancel();
// destroy the io_service::work object
m_work = boost::none;
// signal and wait for the thread to exit gracefully
stopThread ();
}
void cancel ()
{
SharedState::Access state (m_state);
for (List <Request>::iterator iter (state->list.begin());
iter != state->list.end(); ++iter)
iter->cancel();
}
void async_test (IPAddress const& endpoint,
AbstractHandler <void (Result)> handler)
{
new Request (*this, m_io_service, endpoint, handler);
}
};
//------------------------------------------------------------------------------
Resolver* Resolver::New ()
{
return new ResolverImp;
}
}
}
#endif

View File

@@ -0,0 +1,88 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_RESOLVER_H_INCLUDED
#define RIPPLE_PEERFINDER_RESOLVER_H_INCLUDED
namespace ripple {
namespace PeerFinder {
/** Performs asynchronous domain name resolution. */
class Resolver
{
public:
/** Create the service.
This will automatically start the associated thread and io_service.
*/
static Resolver* New ();
/** Destroy the service.
Any pending I/O operations will be canceled. This call blocks until
all pending operations complete (either with success or with
operation_aborted) and the associated thread and io_service have
no more work remaining.
*/
virtual ~Resolver () { }
/** Cancel pending I/O.
This issues cancel orders for all pending I/O operations and then
returns immediately. Handlers will receive operation_aborted errors,
or if they were already queued they will complete normally.
*/
virtual void cancel () = 0;
struct Result
{
Result ()
{ }
/** The original name string */
std::string name;
/** The error code from the operation. */
boost::system::error_code error;
/** The resolved address.
Only defined if there is no error.
If the original name string contains a port specification,
it will be set in the resolved IPAddress.
*/
IPAddress address;
};
/** Performs an async resolution on the specified name.
The port information, if present, will be passed through.
*/
template <typename Handler>
void async_resolve (std::string const& name,
BEAST_MOVE_ARG(Handler) handler)
{
async_resolve (name,
AbstractHandler <void (Result)> (
BEAST_MOVE_CAST(Handler)(handler)));
}
virtual void async_resolve (std::string const& name,
AbstractHandler <void (Result)> handler) = 0;
};
}
}
#endif

View File

@@ -46,10 +46,12 @@ using namespace beast;
#include "impl/PrivateTypes.h" #include "impl/PrivateTypes.h"
# include "impl/Tuning.h" # include "impl/Tuning.h"
# include "impl/Checker.h" # include "impl/Checker.h"
# include "impl/Resolver.h"
#include "impl/CheckerAdapter.h" #include "impl/CheckerAdapter.h"
# include "impl/CachedEndpoint.h" # include "impl/CachedEndpoint.h"
# include "impl/GiveawaysAtHop.h" # include "impl/GiveawaysAtHop.h"
# include "impl/Giveaways.h" # include "impl/Giveaways.h"
#include "impl/PtrCompareFunctor.h"
#include "impl/Cache.h" #include "impl/Cache.h"
#include "impl/Slots.h" #include "impl/Slots.h"
#include "impl/Source.h" #include "impl/Source.h"
@@ -67,6 +69,7 @@ using namespace beast;
#include "impl/Endpoint.cpp" #include "impl/Endpoint.cpp"
#include "impl/Cache.cpp" #include "impl/Cache.cpp"
#include "impl/Manager.cpp" #include "impl/Manager.cpp"
#include "impl/Resolver.cpp"
#include "impl/Slots.cpp" #include "impl/Slots.cpp"
#include "impl/SourceStrings.cpp" #include "impl/SourceStrings.cpp"
#include "impl/Tests.cpp" #include "impl/Tests.cpp"

View File

@@ -332,6 +332,8 @@ private:
else else
m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress); m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress);
getApp ().getPeers ().peerConnected(m_remoteAddress, m_isInbound);
// Must compute mCookieHash before receiving a hello. // Must compute mCookieHash before receiving a hello.
sendHello (); sendHello ();
startReadHeader (); startReadHeader ();
@@ -378,6 +380,8 @@ private:
else else
m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress); m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress);
getApp ().getPeers ().peerConnected(m_remoteAddress, m_isInbound);
// Must compute mCookieHash before receiving a hello. // Must compute mCookieHash before receiving a hello.
sendHello (); sendHello ();
startReadHeader (); startReadHeader ();
@@ -624,7 +628,13 @@ void PeerImp::connect (const std::string& strIp, int iPort)
if (!err) if (!err)
{ {
WriteLog (lsINFO, Peer) << "Peer: Connect: Outbound: " << addressToString (this) << ": " << mIpPort.first << " " << mIpPort.second; WriteLog (lsINFO, Peer) << "Peer: Connect: Outbound: " <<
addressToString (this) << ": " <<
mIpPort.first << " " << mIpPort.second;
// Notify peer finder that we have a connection attempt in-progress
getApp ().getPeers ().getPeerFinder ().onPeerConnectAttemptBegins(
IPAddress::from_string(strIp).withPort(iPortAct) );
boost::asio::async_connect ( boost::asio::async_connect (
getNativeSocket (), getNativeSocket (),
@@ -640,14 +650,23 @@ void PeerImp::connect (const std::string& strIp, int iPort)
// Connect ssl as client. // Connect ssl as client.
void PeerImp::handleConnect (const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it) void PeerImp::handleConnect (const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it)
{ {
// Notify peer finder about the status of this in-progress connection attempt
getApp ().getPeers ().getPeerFinder ().onPeerConnectAttemptCompletes(
IPAddress::from_string(getIP()).withPort(getPort()), !error );
if (error) if (error)
{ {
WriteLog (lsINFO, Peer) << "Peer: Connect: Error: " << error.category ().name () << ": " << error.message () << ": " << error; WriteLog (lsINFO, Peer) << "Peer: Connect: Error: " <<
getIP() << ":" << getPort() <<
" (" << error.category ().name () <<
": " << error.message () <<
": " << error << ")";
detach ("hc", true); detach ("hc", true);
} }
else else
{ {
WriteLog (lsINFO, Peer) << "Connect peer: success."; WriteLog (lsINFO, Peer) << "Peer: Connect: Success: " <<
getIP() << ":" << getPort();
getHandshakeStream ().set_verify_mode (boost::asio::ssl::verify_none); getHandshakeStream ().set_verify_mode (boost::asio::ssl::verify_none);
@@ -1158,7 +1177,7 @@ void PeerImp::recvHello (protocol::TMHello& packet)
getApp().getPeers ().peerVerified (shared_from_this ()); getApp().getPeers ().peerVerified (shared_from_this ());
} }
if (! getApp().getPeers ().peerConnected (shared_from_this (), mNodePublic, getIP (), getPort ())) if (! getApp().getPeers ().peerHandshake (shared_from_this (), mNodePublic, getIP (), getPort ()))
{ {
// Already connected, self, or some other reason. // Already connected, self, or some other reason.
WriteLog (lsINFO, Peer) << "Recv(Hello): Disconnect: Extraneous connection."; WriteLog (lsINFO, Peer) << "Recv(Hello): Disconnect: Extraneous connection.";

View File

@@ -116,9 +116,7 @@ public:
{ {
PeerFinder::Config config; PeerFinder::Config config;
#if RIPPLE_USE_PEERFINDER
config.maxPeerCount = getConfig ().PEERS_MAX; config.maxPeerCount = getConfig ().PEERS_MAX;
#endif
config.wantIncoming = config.wantIncoming =
(! getConfig ().PEER_PRIVATE) && (! getConfig ().PEER_PRIVATE) &&
@@ -271,9 +269,18 @@ public:
bool getTopNAddrs (int n, std::vector<std::string>& addrs); bool getTopNAddrs (int n, std::vector<std::string>& addrs);
bool savePeer (const std::string& strIp, int iPort, char code); bool savePeer (const std::string& strIp, int iPort, char code);
// disconnect the specified peer
void disconnectPeer (PeerFinder::PeerID const &id, bool graceful)
{
// NIKB TODO
}
// A peer connected but we only have the IP address so far.
void peerConnected (const IPAddress& address, bool incoming);
// We know peers node public key. // We know peers node public key.
// <-- bool: false=reject // <-- bool: false=reject
bool peerConnected (Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort); bool peerHandshake (Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort);
// No longer connected. // No longer connected.
void peerDisconnected (Peer::ref peer, const RippleAddress& naPeer); void peerDisconnected (Peer::ref peer, const RippleAddress& naPeer);
@@ -641,7 +648,7 @@ void PeersImp::connectTo (const std::string& strIp, int iPort)
// <-- true, if already connected. // <-- true, if already connected.
Peer::pointer PeersImp::peerConnect (const std::string& strIp, int iPort) Peer::pointer PeersImp::peerConnect (const std::string& strIp, int iPort)
{ {
IPAndPortNumber pipPeer = make_pair (strIp, iPort); IPAndPortNumber pipPeer = make_pair (strIp, iPort);
Peer::pointer ppResult; Peer::pointer ppResult;
{ {
@@ -716,9 +723,14 @@ uint64 PeersImp::assignPeerId ()
return ++mLastPeer; return ++mLastPeer;
} }
void PeersImp::peerConnected (const IPAddress& address, bool incoming)
{
getPeerFinder ().onPeerConnected (address, incoming);
}
// Now know peer's node public key. Determine if we want to stay connected. // Now know peer's node public key. Determine if we want to stay connected.
// <-- bNew: false = redundant // <-- bNew: false = redundant
bool PeersImp::peerConnected (Peer::ref peer, const RippleAddress& naPeer, bool PeersImp::peerHandshake (Peer::ref peer, const RippleAddress& naPeer,
const std::string& strIP, int iPort) const std::string& strIP, int iPort)
{ {
bool bNew = false; bool bNew = false;
@@ -745,7 +757,7 @@ bool PeersImp::peerConnected (Peer::ref peer, const RippleAddress& naPeer,
// Notify peerfinder since this is a connection that we didn't // Notify peerfinder since this is a connection that we didn't
// know about and are keeping // know about and are keeping
// //
getPeerFinder ().onPeerConnected (RipplePublicKey ( getPeerFinder ().onPeerHandshake (RipplePublicKey (
peer->getNodePublic()), peer->getPeerEndpoint(), peer->getNodePublic()), peer->getPeerEndpoint(),
peer->isInbound()); peer->isInbound());

View File

@@ -62,9 +62,13 @@ public:
virtual bool getTopNAddrs (int n, std::vector<std::string>& addrs) = 0; virtual bool getTopNAddrs (int n, std::vector<std::string>& addrs) = 0;
virtual bool savePeer (const std::string& strIp, int iPort, char code) = 0; virtual bool savePeer (const std::string& strIp, int iPort, char code) = 0;
// A peer connection has been established, but we know nothing about it at
// this point beyond the IP address.
virtual void peerConnected (const IPAddress& address, bool incoming) = 0;
// We know peers node public key. // We know peers node public key.
// <-- bool: false=reject // <-- bool: false=reject
virtual bool peerConnected (Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort) = 0; virtual bool peerHandshake (Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort) = 0;
// No longer connected. // No longer connected.
virtual void peerDisconnected (Peer::ref peer, const RippleAddress& naPeer) = 0; virtual void peerDisconnected (Peer::ref peer, const RippleAddress& naPeer) = 0;