Peerfinder work

This commit is contained in:
Nik Bougalis
2013-10-18 17:46:34 -07:00
committed by Vinnie Falco
parent 5dda088335
commit 6c17002e8a
18 changed files with 388 additions and 50 deletions

View File

@@ -168,7 +168,7 @@
// messages as needed, and collect legacy IP endpoint information.
//
#ifndef RIPPLE_USE_PEERFINDER
#define RIPPLE_USE_PEERFINDER 0
#define RIPPLE_USE_PEERFINDER 1
#endif
// Here temporarily

View File

@@ -40,6 +40,9 @@ struct Config
uint16 listeningPort;
std::string featureList;
/** Write the configuration into a property stream */
void onWrite(PropertyStream::Map& map);
};
}

View File

@@ -70,8 +70,7 @@ public:
// we remove it from the table.
iter = m_list.erase(iter);
m_journal.debug << "Cache entry for " <<
ep.message.address << " expired.";
m_journal.debug << ep.message.address << " expired.";
m_endpoints.erase (ep.message.address);
}
@@ -106,13 +105,29 @@ public:
CachedEndpoint& entry (result.first->second);
m_journal.debug << "Cache entry for " << message.address <<
" is valid until " << entry.whenExpires <<
m_journal.debug << message.address <<
"valid " << entry.whenExpires <<
" (" << entry.message.incomingSlotsAvailable <<
"/" << entry.message.incomingSlotsMax << ")";
m_list.push_back (entry);
}
// Returns all the known endpoints we have, sorted by distance (that is,
// by hop).
Giveaways getGiveawayList()
{
Giveaways giveaway;
for (List <CachedEndpoint>::iterator iter (m_list.begin());
iter != m_list.end(); iter++)
{
if (iter->message.hops < maxPeerHopCount)
giveaway.add (*iter);
}
return giveaway;
}
};
}

View File

@@ -27,12 +27,18 @@ struct CachedEndpoint : public List<CachedEndpoint>::Node
{
CachedEndpoint (Endpoint const& message_, DiscreteTime now)
: message (message_)
, whenExpires(now + cacheSecondsToLive)
, whenExpires (now + cacheSecondsToLive)
, color (true)
{
}
Endpoint message;
DiscreteTime whenExpires;
// The color indicates whether this peer was recently sent out or not. It
// is recently sent out if the color of the peer matches the color assigned
// in PeerFinder tables.
bool color;
};
}

View File

@@ -28,5 +28,16 @@ Config::Config ()
{
}
void Config::onWrite(PropertyStream::Map &map)
{
map ["min_out_count"] = minOutCount;
map ["out_percent"] = outPercent;
map ["max_peer_count"] = maxPeerCount;
map ["want_incoming"] = wantIncoming;
map ["connect_automatically"] = connectAutomatically;
map ["listening_port"] = listeningPort;
map ["feature_list"] = featureList;
}
}
}

View File

@@ -0,0 +1,82 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_GIVEAWAYS_H_INCLUDED
#define RIPPLE_PEERFINDER_GIVEAWAYS_H_INCLUDED
namespace ripple {
namespace PeerFinder {
/** The Giveaways holds a vector of HopVectors, one of each hop.
*/
class Giveaways
{
std::vector <GiveawaysAtHop> m_hopVector;
bool m_shuffled;
public:
typedef std::vector <GiveawaysAtHop>::iterator iterator;
Giveaways()
: m_hopVector(maxPeerHopCount)
, m_shuffled(false)
{
}
// Add the endpoint to the appropriate hop vector.
void add (CachedEndpoint &endpoint)
{
if (endpoint.message.hops < maxPeerHopCount)
m_hopVector[endpoint.message.hops].add(endpoint);
}
// Resets the Giveaways, preparing to allow a new peer to iterate over it.
void reset ()
{
for (size_t i = 0; i != m_hopVector.size(); i++)
{
if (!m_shuffled)
m_hopVector[i].shuffle ();
m_hopVector[i].reset ();
}
// Once this has been called, the hop vectors have all been shuffled
// and we do not need to shuffle them again for the lifetime of this
// instance.
m_shuffled = true;
}
iterator begin ()
{
return m_hopVector.begin();
}
iterator end ()
{
return m_hopVector.end();
}
};
}
}
#endif

View File

@@ -0,0 +1,118 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_GIVEAWAYSATHOP_H_INCLUDED
#define RIPPLE_PEERFINDER_GIVEAWAYSATHOP_H_INCLUDED
namespace ripple {
namespace PeerFinder {
/** A GiveawaysAtHop contains a list of all the endpoints that are a particular
number of hops away from us.
*/
class GiveawaysAtHop
{
public:
typedef std::vector<CachedEndpoint*>::iterator iterator;
private:
// List of endpoints that haven't been seen during this iteration
std::vector <CachedEndpoint*> m_list;
// List of endpoints that have been used during this iteration
std::vector <CachedEndpoint*> m_used;
// This iterator tracks where we are in the list between calls. It is
// set to the beginning of the list by calling reset().
iterator m_position;
public:
// This function adds a new endpoint to the list of endpoints
// that we will be returning.
void add (CachedEndpoint &endpoint)
{
if (endpoint.message.hops < maxPeerHopCount)
{
if (endpoint.color)
m_list.push_back(&endpoint);
else
m_used.push_back(&endpoint);
}
}
// Shuffles the list of peers we are about to hand out.
void shuffle ()
{
std::random_shuffle (m_list.begin (), m_list.end ());
}
// Prepare to begin iterating over the entire set of peers again.
void reset ()
{
// We need to add any entries from the stale vector in the tail
// end of the fresh vector. We do not need to shuffle them.
if (!m_used.empty())
{
m_list.insert(m_list.end (), m_used.begin (), m_used.end ());
m_used.clear();
}
// We need to start from the beginning again.
m_position = m_list.begin();
}
// This is somewhat counterintuitive, but it doesn't really "begin"
// iteration, but allows us to resume it.
iterator begin ()
{
return m_position;
}
// The iterator to the last fresh endpoint we have available. Once we get
// to this point, we have provided this peer with all endpoints in our list.
iterator end ()
{
return m_list.end();
}
// Removes the specified item from the "fresh" list of endpoints and returns
// an iterator to the next one to use. This means that the peer decided
// to use this iterator.
iterator erase (iterator iter)
{
// NIKB FIXME change node color, if it's going from fresh to stale
m_used.push_back(*iter);
return m_list.erase(iter);
}
// Reserves entries to allow inserts to be efficient.
void reserve (size_t n)
{
m_used.reserve (n);
m_list.reserve (n);
}
};
}
}
#endif

View File

@@ -85,9 +85,6 @@ public:
Config m_config;
// The number of fixed peers that are currently connected
int m_fixedPeersConnected;
// A list of peers that should always be connected
typedef std::set <IPAddress> FixedPeers;
FixedPeers m_fixedPeers;
@@ -118,7 +115,6 @@ public:
, m_store (store)
, m_checker (checker)
, m_journal (journal)
, m_fixedPeersConnected (0)
, m_slots (clock)
, m_cache (journal)
, m_legacyCache (store, journal)
@@ -144,6 +140,27 @@ public:
state->fetchSource->cancel ();
}
// Output statistics
void onWrite (PropertyStream::Map& map)
{
// VFALCO NOTE These ugly casts are needed because
// of how std::size_t is declared on some linuxes
//
map ["cache"] = uint32(m_cache.size());
map ["legacy"] = uint32(m_legacyCache.size());
map ["fixed_desired"] = uint32 (m_fixedPeers.size());
{
PropertyStream::Map child ("slots", map);
m_slots.onWrite (child);
}
{
PropertyStream::Map child ("config", map);
m_config.onWrite (child);
}
}
//--------------------------------------------------------------------------
// Load persistent state information from the Store
@@ -187,30 +204,29 @@ public:
//
void makeOutgoingConnections ()
{
if (m_config.connectAutomatically)
return;
std::vector <IPAddress> list;
if (m_slots.outDesired > m_slots.outboundCount)
if (m_config.connectAutomatically)
{
int const needed (std::min (
m_slots.outDesired - m_slots.outboundCount,
int (maxAddressesPerAttempt)));
m_legacyCache.get (needed, list, get_now());
if (m_slots.outDesired > m_slots.outboundCount)
{
int const needed (std::min (
m_slots.outDesired - m_slots.outboundCount,
int (maxAddressesPerAttempt)));
m_legacyCache.get (needed, list, get_now());
}
}
if (m_fixedPeersConnected < m_fixedPeers.size())
if (m_slots.fixedCount < m_fixedPeers.size())
{
list.reserve (list.size() + m_fixedPeers.size() - m_fixedPeersConnected);
list.reserve (list.size() + m_fixedPeers.size() - m_slots.fixedCount);
for (FixedPeers::const_iterator iter (m_fixedPeers.begin());
iter != m_fixedPeers.end(); ++iter)
{
if (m_peers.get<1>().find (*iter) != m_peers.get<1>().end())
{
// Make sure the fixed peer is not already connected
if (m_peers.get<1>().find (*iter) == m_peers.get<1>().end())
list.push_back (*iter);
}
}
}
@@ -234,17 +250,26 @@ public:
{
for (std::vector <std::string>::const_iterator iter (strings.begin());
iter != strings.end(); ++iter)
{
{
IPAddress ep (IPAddress::from_string (*iter));
if (ep.empty ())
ep = IPAddress::from_string_altform(*iter);
if (! ep.empty ())
{
m_fixedPeers.insert (ep);
m_journal.info << "Added fixed peer " << *iter;
}
else
{
// VFALCO TODO Attempt name resolution
m_journal.error << "Failed to resolve: '" << *iter << "'";
}
}
m_journal.info << m_fixedPeers.size() << " fixed peers added.";
}
void addStaticSource (SharedPtr <Source> const& source)
@@ -289,6 +314,14 @@ public:
PeerInfo (id, address, inbound, get_now())));
bassert (result.second);
m_slots.addPeer (m_config, inbound);
// VFALCO NOTE Update fixed peers count (HACKED)
for (FixedPeers::const_iterator iter (m_fixedPeers.begin());
iter != m_fixedPeers.end(); ++iter)
{
if (iter->withPort (0) == address.withPort (0))
++m_slots.fixedCount;
}
}
// Called when a peer is disconnected.
@@ -302,6 +335,16 @@ public:
PeerInfo const& peer (*iter);
m_journal.debug << "Peer disconnected: " << peer.address;
m_slots.dropPeer (m_config, peer.inbound);
// VFALCO NOTE Update fixed peers count (HACKED)
for (FixedPeers::const_iterator iter (m_fixedPeers.begin());
iter != m_fixedPeers.end(); ++iter)
{
if (iter->withPort (0) == peer.address.withPort (0))
--m_slots.fixedCount;
}
// Must come last
m_peers.erase (iter);
}
@@ -345,17 +388,41 @@ public:
// Send mtENDPOINTS for the specified peer
//
void sendEndpoints (PeerInfo const& peer)
void sendEndpoints (PeerInfo const& peer, Giveaways &giveaway)
{
typedef std::vector <Endpoint> List;
std::vector <Endpoint> endpoints;
// fill in endpoints
// Add us to the list if we want incoming
if (m_slots.inboundSlots > 0)
// VFALCO TODO Reconsider this logic
//if (m_slots.inboundSlots > 0)
if (m_config.wantIncoming)
endpoints.push_back (thisEndpoint ());
// We iterate over the hop list we have, adding one
// peer per hop (if possible) until we add the maximum
// number of peers we are allowed to send or we can't
// send anything else.
for (int i = 0; i != numberOfEndpoints; ++i)
{
for (Giveaways::iterator iter = giveaway.begin();
iter != giveaway.end(); ++iter)
{
GiveawaysAtHop::iterator iter2 = iter->begin();
while(iter2 != iter->end())
{
// FIXME NIKB check if the peer wants to receive this endpoint
// and add it to the list of endpoints we will send if he does.
if(false)
iter->erase(iter2);
else
++iter2;
}
}
}
if (! endpoints.empty())
m_callback.sendPeerEndpoints (peer.id, endpoints);
}
@@ -370,13 +437,20 @@ public:
DiscreteTime const now (get_now());
// fill in endpoints
Giveaways giveaway(m_cache.getGiveawayList());
for (Peers::iterator iter (m_peers.begin());
iter != m_peers.end(); ++iter)
{
PeerInfo const& peer (*iter);
// Reset the giveaway to begin a fresh iteration.
giveaway.reset ();
if (peer.whenSendEndpoints <= now)
{
sendEndpoints (peer);
sendEndpoints (peer, giveaway);
peer.whenSendEndpoints = now +
secondsPerMessage;
}

View File

@@ -205,7 +205,7 @@ public:
, m_messageTimer (this)
, m_cacheTimer (this)
{
#if 0
#if 1
#if BEAST_MSVC
if (beast_isRunningUnderDebugger())
{
@@ -322,7 +322,9 @@ public:
m_connectTimer.cancel();
m_messageTimer.cancel();
m_cacheTimer.cancel();
m_queue.dispatch (bind (&Thread::signalThreadShouldExit, this));
m_queue.dispatch (
m_context.wrap (
bind (&Thread::signalThreadShouldExit, this)));
}
//--------------------------------------------------------------------------
@@ -334,16 +336,7 @@ public:
{
SerializedContext::Scope scope (m_context);
map ["peers"] = m_logic.m_slots.peerCount;
map ["in"] = m_logic.m_slots.inboundCount;
map ["out"] = m_logic.m_slots.outboundCount;
map ["out_desired"] = m_logic.m_slots.outDesired;
map ["in_avail"] = m_logic.m_slots.inboundSlots;
map ["in_max"] = m_logic.m_slots.inboundSlotsMaximum;
map ["uptime"] = m_logic.m_slots.uptimeSeconds();
map ["round"] = m_logic.m_slots.roundUpwards();
map ["cache"] = uint32(m_logic.m_cache.size());
map ["legacy"] = uint32(m_logic.m_legacyCache.size());
m_logic.onWrite (map);
}
//--------------------------------------------------------------------------

View File

@@ -21,15 +21,15 @@ namespace ripple {
namespace PeerFinder {
Slots::Slots (DiscreteClock <DiscreteTime> clock, bool roundUpwards)
: m_clock (clock)
, m_startTime (0)
, peerCount (0)
: peerCount (0)
, inboundCount (0)
, outboundCount (0)
, fixedCount (0)
, outDesired (0)
, inboundSlots (0)
, inboundSlotsMaximum (0)
, m_clock (clock)
, m_startTime (0)
, m_roundUpwards (roundUpwards)
{
}
@@ -119,5 +119,20 @@ void Slots::updateConnected ()
}
}
void Slots::onWrite (PropertyStream::Map& map)
{
map ["peers"] = peerCount;
map ["in"] = inboundCount;
map ["out"] = outboundCount;
map ["fixed"] = fixedCount;
map ["out_desired"] = outDesired;
map ["in_avail"] = inboundSlots;
map ["in_max"] = inboundSlotsMaximum;
map ["round"] = roundUpwards();
map ["connected"] = connected();
map ["uptime"] =
RelativeTime (uptimeSeconds()).getDescription ().toStdString();
}
}
}

View File

@@ -70,11 +70,14 @@ public:
//
uint32 uptimeSeconds () const;
// Output statistics
void onWrite (PropertyStream::Map& map);
private:
void updateConnected();
DiscreteTime m_startTime;
DiscreteClock <DiscreteTime> m_clock;
DiscreteTime m_startTime;
bool m_roundUpwards;
};

View File

@@ -23,6 +23,10 @@ namespace PeerFinder {
class PeerFinderTests : public UnitTest
{
public:
//--------------------------------------------------------------------------
// Complete Logic used for tests
//
class TestLogic

View File

@@ -55,6 +55,10 @@ enum
// This should be a small multiple of the broadcast frequency
,cacheSecondsToLive = 60
// The maximum number of hops that we allow. Peers farther
// away than this are dropped.
,maxPeerHopCount = 10
//---------------------------------------------------------
//
// LegacyEndpoint Settings

View File

@@ -48,6 +48,8 @@ using namespace beast;
# include "impl/Checker.h"
#include "impl/CheckerAdapter.h"
# include "impl/CachedEndpoint.h"
# include "impl/GiveawaysAtHop.h"
# include "impl/Giveaways.h"
#include "impl/Cache.h"
#include "impl/Slots.h"
#include "impl/Source.h"
@@ -57,8 +59,8 @@ using namespace beast;
# include "impl/LegacyEndpointCache.h"
# include "impl/PeerInfo.h"
#include "impl/StoreSqdb.h"
# include "impl/LogicType.h"
#include "impl/Logic.h"
# include "impl/Logic.h"
#include "impl/LogicType.h"
#include "impl/Checker.cpp"
#include "impl/Config.cpp"

View File

@@ -117,15 +117,14 @@ public:
PeerFinder::Config config;
#if RIPPLE_USE_PEERFINDER
config.maxPeerCount = 100;
config.maxPeerCount = getConfig ().PEERS_MAX;
#endif
config.wantIncoming =
(! getConfig ().PEER_PRIVATE) &&
(getConfig().peerListeningPort != 0);
if (config.wantIncoming)
config.listeningPort = getConfig().peerListeningPort;
config.listeningPort = getConfig().peerListeningPort;
// if it's a private peer or we are running as standalone
// automatic connections would defeat the purpose.

View File

@@ -78,6 +78,7 @@ Config::Config ()
PEER_CONNECT_LOW_WATER = DEFAULT_PEER_CONNECT_LOW_WATER;
PEER_PRIVATE = false;
PEERS_MAX = DEFAULT_PEERS_MAX;
TRANSACTION_FEE_BASE = DEFAULT_FEE_DEFAULT;
@@ -312,6 +313,9 @@ void Config::load ()
if (SectionSingleB (secConfig, SECTION_PEER_PRIVATE, strTemp))
PEER_PRIVATE = lexicalCastThrow <bool> (strTemp);
if (SectionSingleB (secConfig, SECTION_PEERS_MAX, strTemp))
PEERS_MAX = std::max (1, lexicalCastThrow <int> (strTemp));
smtTmp = SectionEntries (secConfig, SECTION_RPC_ADMIN_ALLOW);
if (smtTmp)

View File

@@ -53,6 +53,9 @@ const int SYSTEM_WEBSOCKET_PUBLIC_PORT = 6563; // XXX Going away.
// Might connect with fewer for testing.
#define DEFAULT_PEER_CONNECT_LOW_WATER 10
// The maximum number of peers to allow.
#define DEFAULT_PEERS_MAX 100
#define DEFAULT_PATH_SEARCH_OLD 7
#define DEFAULT_PATH_SEARCH 7
#define DEFAULT_PATH_SEARCH_FAST 2
@@ -395,6 +398,7 @@ public:
int PEER_START_MAX;
unsigned int PEER_CONNECT_LOW_WATER;
bool PEER_PRIVATE; // True to ask peers not to relay current IP.
unsigned int PEERS_MAX;
// Websocket networking parameters
std::string WEBSOCKET_PUBLIC_IP; // XXX Going away. Merge with the inbound peer connction.

View File

@@ -61,6 +61,7 @@ struct ConfigSection
#define SECTION_PEER_PORT "peer_port"
#define SECTION_PEER_PROXY_PORT "peer_port_proxy"
#define SECTION_PEER_PRIVATE "peer_private"
#define SECTION_PEERS_MAX "peers_max"
#define SECTION_PEER_SCAN_INTERVAL_MIN "peer_scan_interval_min"
#define SECTION_PEER_SSL_CIPHER_LIST "peer_ssl_cipher_list"
#define SECTION_PEER_START_MAX "peer_start_max"