PeerFinder work

This commit is contained in:
Nik Bougalis
2013-10-15 17:15:49 -07:00
committed by Vinnie Falco
parent 1a6d72b14c
commit 1c41dae51c
17 changed files with 444 additions and 120 deletions

View File

@@ -35,6 +35,9 @@ struct Config
/** True if we want to accept incoming connections. */
bool wantIncoming;
/** True if we want to establish connections automatically */
bool connectAutomatically;
uint16 listeningPort;
std::string featureList;
};

View File

@@ -17,8 +17,8 @@
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_ENDPOINTCACHE_H_INCLUDED
#define RIPPLE_PEERFINDER_ENDPOINTCACHE_H_INCLUDED
#ifndef RIPPLE_PEERFINDER_CACHE_H_INCLUDED
#define RIPPLE_PEERFINDER_CACHE_H_INCLUDED
namespace ripple {
namespace PeerFinder {
@@ -33,23 +33,11 @@ private:
Journal m_journal;
Table m_now;
Table m_prev;
Table m_endpoints;
// Refresh the existing entry with a new message
void refresh (CachedEndpoint& entry, Endpoint const& message)
{
entry.message.hops = std::min (entry.message.hops, message.hops);
// Copy the other fields based on uptime
if (entry.message.uptimeMinutes < message.uptimeMinutes)
{
entry.message.incomingSlotsAvailable = message.incomingSlotsAvailable;
entry.message.incomingSlotsMax = message.incomingSlotsMax;
entry.message.uptimeMinutes = message.uptimeMinutes;
entry.message.featureList = message.featureList;
}
}
// Tracks all the cached endpoints stored in the endpoint table
// in oldest-to-newest order. The oldest item is at the head.
List <CachedEndpoint> m_list;
public:
explicit Cache (Journal journal)
@@ -63,30 +51,67 @@ public:
std::size_t size() const
{
return m_now.size() + m_prev.size();
return m_endpoints.size();
}
// Cycle the tables
void cycle()
void cycle(DiscreteTime now)
{
std::swap (m_now, m_prev);
m_now.clear();
List <CachedEndpoint>::iterator iter (m_list.begin());
while (iter != m_list.end())
{
if (iter->whenExpires > now)
break;
CachedEndpoint &ep (*iter);
// We need to remove the entry from the list before
// we remove it from the table.
iter = m_list.erase(iter);
m_journal.debug << "Cache entry for " <<
ep.message.address << " expired.";
m_endpoints.erase (ep.message.address);
}
}
// Insert or update an existing entry with the new message
void insert (Endpoint const& message)
void insert (Endpoint const& message, DiscreteTime now)
{
Table::iterator iter (m_prev.find (message.address));
if (iter != m_prev.end())
{
}
else
{
std::pair <Table::iterator, bool> result (
m_now.emplace (message.address, message));
if (!result.second)
refresh (result.first->second, message);
std::pair <Table::iterator, bool> result (
m_endpoints.emplace (message.address, CachedEndpoint(message, now)));
if (!result.second)
{ // There was already an entry for this endpoint. Update it.
CachedEndpoint& entry (result.first->second);
entry.message.hops = std::min (entry.message.hops, message.hops);
// Copy the other fields based on uptime
if (entry.message.uptimeMinutes < message.uptimeMinutes)
{
entry.message.incomingSlotsAvailable = message.incomingSlotsAvailable;
entry.message.incomingSlotsMax = message.incomingSlotsMax;
entry.message.uptimeMinutes = message.uptimeMinutes;
entry.message.featureList = message.featureList;
}
entry.whenExpires = now + cacheSecondsToLive;
// It must already be in the list. Remove it in preparation.
m_list.erase (m_list.iterator_to(entry));
}
CachedEndpoint& entry (result.first->second);
m_journal.debug << "Cache entry for " << message.address <<
" is valid until " << entry.whenExpires <<
" (" << entry.message.incomingSlotsAvailable <<
"/" << entry.message.incomingSlotsMax << ")";
m_list.push_back (entry);
}
};

View File

@@ -23,14 +23,16 @@
namespace ripple {
namespace PeerFinder {
struct CachedEndpoint
struct CachedEndpoint : public List<CachedEndpoint>::Node
{
CachedEndpoint (Endpoint const& message_)
CachedEndpoint (Endpoint const& message_, DiscreteTime now)
: message (message_)
, whenExpires(now + cacheSecondsToLive)
{
}
Endpoint message;
DiscreteTime whenExpires;
};
}

View File

@@ -23,6 +23,12 @@
namespace ripple {
namespace PeerFinder {
// Ensures that all Logic member function entry points are
// called while holding a lock on the recursive mutex.
//
typedef ScopedWrapperContext <
RecursiveMutex, RecursiveMutex::ScopedLockType> SerializedContext;
/** Adapts a ServiceQueue to dispatch Checker handler completions.
This lets the Logic have its Checker handler get dispatched
on the ServiceQueue instead of an io_service thread. Otherwise,
@@ -31,30 +37,40 @@ namespace PeerFinder {
class CheckerAdapter : public Checker
{
private:
SerializedContext& m_context;
ServiceQueue& m_queue;
ScopedPointer <Checker> m_checker;
struct Handler
{
ServiceQueue* m_queue;
SerializedContext& m_context;
ServiceQueue& m_queue;
AbstractHandler <void (Checker::Result)> m_handler;
Handler (
SerializedContext& context,
ServiceQueue& queue,
AbstractHandler <void (Checker::Result)> handler)
: m_queue (&queue)
: m_context (context)
, m_queue (queue)
, m_handler (handler)
{ }
void operator() (Checker::Result result)
{
m_queue->wrap (m_handler) (result);
// VFALCO TODO Fix this, it is surely wrong but
// this supposedly correct line doesn't compile
//m_queue.wrap (m_context.wrap (m_handler)) (result);
// WRONG
m_queue.wrap (m_handler) (result);
}
};
public:
explicit CheckerAdapter (ServiceQueue& queue)
: m_queue (queue)
CheckerAdapter (SerializedContext& context, ServiceQueue& queue)
: m_context (context)
, m_queue (queue)
, m_checker (Checker::New())
{
}
@@ -73,7 +89,8 @@ public:
void async_test (IPEndpoint const& endpoint,
AbstractHandler <void (Checker::Result)> handler)
{
m_checker->async_test (endpoint, Handler (m_queue, handler));
m_checker->async_test (endpoint, Handler (
m_context, m_queue, handler));
}
};

View File

@@ -23,6 +23,7 @@ namespace PeerFinder {
Config::Config ()
: maxPeerCount (20)
, wantIncoming (false)
, connectAutomatically (false)
, listeningPort (0)
{
}

View File

@@ -26,22 +26,25 @@ namespace PeerFinder {
struct LegacyEndpoint
{
LegacyEndpoint ()
: checked (false)
: whenInserted (0)
, lastGet(0)
,checked (false)
, canAccept (false)
{ }
LegacyEndpoint (IPEndpoint const& address_)
LegacyEndpoint (IPEndpoint const& address_, DiscreteTime now)
: address (address_)
, whenInserted (RelativeTime::fromStartup())
, whenInserted (now)
, lastGet(0)
{ }
IPEndpoint address;
// When we inserted the endpoint into the cache
RelativeTime mutable whenInserted;
DiscreteTime mutable whenInserted;
// When we last used the endpoint for outging connection attempts
RelativeTime mutable lastGet;
DiscreteTime mutable lastGet;
// True if we ever tried to connect
bool mutable checked;

View File

@@ -164,7 +164,7 @@ public:
}
/** Load the legacy endpoints cache from the database. */
void load ()
void load (DiscreteTime now)
{
typedef std::vector <IPEndpoint> List;
List list;
@@ -173,7 +173,7 @@ public:
for (List::const_iterator iter (list.begin());
iter != list.end(); ++iter)
{
std::pair <LegacyEndpoint const&, bool> result (insert (*iter));
std::pair <LegacyEndpoint const&, bool> result (insert (*iter, now));
if (result.second)
++n;
}
@@ -186,10 +186,10 @@ public:
The return value provides a reference to the new or existing endpoint.
The bool indicates whether or not the insertion took place.
*/
std::pair <LegacyEndpoint const&, bool> insert (IPEndpoint const& address)
std::pair <LegacyEndpoint const&, bool> insert (IPEndpoint const& address, DiscreteTime now)
{
std::pair <MapType::iterator, bool> result (
m_map.insert (LegacyEndpoint (address)));
m_map.insert (LegacyEndpoint (address, now)));
if (m_map.size() > legacyEndpointCacheSize)
prune();
if (result.second)
@@ -224,13 +224,12 @@ public:
Also updates the lastGet field of the LegacyEndpoint so we will avoid
re-using the address until we have tried all the others.
*/
void get (std::size_t n, std::vector <IPEndpoint>& result) const
void get (std::size_t n, std::vector <IPEndpoint>& result, DiscreteTime now) const
{
FlattenedList list (flatten());
std::random_shuffle (list.begin(), list.end());
std::sort (list.begin(), list.end(), GetLess());
n = std::min (n, list.size());
RelativeTime const now (RelativeTime::fromStartup());
for (FlattenedList::iterator iter (list.begin());
n-- && iter!=list.end(); ++iter)
{

View File

@@ -140,7 +140,7 @@ public:
//
void load ()
{
m_legacyCache.load();
m_legacyCache.load (get_now());
}
// Returns a suitable Endpoint representing us.
@@ -172,10 +172,14 @@ public:
return true;
}
// Make outgoing connections to bring us up to desired out count
// If configured to make outgoing connections, do us in order
// to bring us up to desired out count.
//
void makeOutgoingConnections ()
{
if (m_config.connectAutomatically)
return;
std::vector <IPEndpoint> list;
if (m_slots.outDesired > m_slots.outboundCount)
@@ -183,7 +187,7 @@ public:
int const needed (std::min (
m_slots.outDesired - m_slots.outboundCount,
int (maxAddressesPerAttempt)));
m_legacyCache.get (needed, list);
m_legacyCache.get (needed, list, get_now());
}
if (m_fixedPeersConnected < m_fixedPeers.size())
@@ -200,10 +204,16 @@ public:
}
}
#if RIPPLE_USE_PEERFINDER
if (! list.empty())
m_callback.connectPeerEndpoints (list);
#endif
}
// Returns the number of seconds that have elapsed since some baseline
// event.
//
virtual DiscreteTime get_now()
{
return 0;
}
//--------------------------------------------------------------------------
@@ -245,16 +255,21 @@ public:
m_sources.push_back (source);
}
// Called periodically to cycle and age the varioous caches.
// Called periodically to cycle and age the various caches.
//
void cycleCache()
{
m_cache.cycle();
m_cache.cycle(get_now());
for (Peers::iterator iter (m_peers.begin());
iter != m_peers.end(); ++iter)
iter->received.cycle();
}
void onPeerConnecting ()
{
}
// Called when a peer connection is established.
// We are guaranteed that the PeerID is not already in our map.
// but we are *NOT* guaranteed that the IP isn't. So we need
@@ -269,7 +284,7 @@ public:
std::pair <Peers::iterator, bool> result (
m_peers.insert (
PeerInfo (id, address, inbound)));
PeerInfo (id, address, inbound, get_now())));
bassert (result.second);
m_slots.addPeer (m_config, inbound);
}
@@ -349,9 +364,9 @@ public:
{
if (! m_peers.empty())
{
m_journal.trace << "Sending mtENDPOINTS";
m_journal.trace << "Sending endpoints to our peers";
RelativeTime const now (RelativeTime::fromStartup());
DiscreteTime const now (get_now());
for (Peers::iterator iter (m_peers.begin());
iter != m_peers.end(); ++iter)
@@ -361,7 +376,7 @@ public:
{
sendEndpoints (peer);
peer.whenSendEndpoints = now +
RelativeTime (secondsPerMessage);
secondsPerMessage;
}
}
}
@@ -380,6 +395,9 @@ public:
{
PeerInfo const& peer (*iter);
// Mark that a check for this peer is finished.
peer.connectivityCheckInProgress = false;
if (! result.error)
{
peer.checked = true;
@@ -418,13 +436,13 @@ public:
Peers::iterator iter (m_peers.find (id));
bassert (iter != m_peers.end());
RelativeTime const now (RelativeTime::fromStartup());
DiscreteTime const now (get_now());
PeerInfo const& peer (*iter);
pruneEndpoints (peer.address.to_string(), list);
// Log at higher severity if this is the first time
m_journal.stream (peer.whenAcceptEndpoints.isZero() ?
m_journal.stream (peer.whenAcceptEndpoints == 0 ?
Journal::kInfo : Journal::kTrace) <<
"Received " << list.size() <<
" endpoints from " << peer.address;
@@ -439,6 +457,9 @@ public:
m_callback.chargePeerLoadPenalty(id);
}
m_journal.debug << "Peer " << peer.address <<
" sent us " << list.size() << " endpoints.";
// Process each entry
//
int neighborCount (0);
@@ -450,16 +471,27 @@ public:
// Remember that this peer gave us this address
peer.received.insert (message.address);
m_journal.debug << "Received peer " << message.address <<
" at " << message.hops << " hops.";
if (message.hops == 0)
{
++neighborCount;
if (neighborCount == 1)
{
if (! peer.checked)
if (peer.connectivityCheckInProgress)
{
m_journal.warning << "Connectivity check for " <<
message.address << "already in progress.";
}
else if (! peer.checked)
{
// Mark that a check for this peer is now in progress.
peer.connectivityCheckInProgress = true;
// Test the peer's listening port before
// adding it to the cache for the first time.
//
//
m_checker.async_test (message.address, bind (
&Logic::onCheckEndpoint, this, id,
message.address, _1));
@@ -475,20 +507,20 @@ public:
// listening test, else we silently drop their message
// since their listening port is misconfigured.
//
m_cache.insert (message);
m_cache.insert (message, get_now());
}
}
}
else
{
m_cache.insert (message);
m_cache.insert (message, get_now());
}
}
if (neighborCount > 1)
{
m_journal.warning << "Peer " << peer.address <<
" sent " << neighborCount << " entries with hops=0";
" sent " << neighborCount << " entries with hops=0";
// VFALCO TODO Should we apply load charges?
}
@@ -528,11 +560,13 @@ public:
if (! results.error)
{
std::size_t newEntries (0);
DiscreteTime now (get_now());
for (std::vector <IPEndpoint>::const_iterator iter (results.list.begin());
iter != results.list.end(); ++iter)
{
std::pair <LegacyEndpoint const&, bool> result (
m_legacyCache.insert (*iter));
m_legacyCache.insert (*iter, now));
if (result.second)
++newEntries;
}
@@ -558,8 +592,6 @@ public:
if (result.error == boost::asio::error::operation_aborted)
return;
RelativeTime const now (RelativeTime::fromStartup());
if (! result.error)
{
if (result.canAccept)
@@ -581,7 +613,7 @@ public:
if (! validIPEndpoint (address))
return;
std::pair <LegacyEndpoint const&, bool> result (
m_legacyCache.insert (address));
m_legacyCache.insert (address, get_now()));
if (result.second)
{
// its new

View File

@@ -0,0 +1,53 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_LOGICTYPE_H_INCLUDED
#define RIPPLE_PEERFINDER_LOGICTYPE_H_INCLUDED
namespace ripple {
namespace PeerFinder {
/** Provides the Clock required by Logic's get_now().
This allows the unit tests to provide its own manual clock.
*/
template <typename Clock>
class LogicType : public Logic
{
public:
explicit LogicType (Callback& callback,
Store& store,
Checker& checker,
Journal journal)
: Logic (callback, store, checker, journal)
{
}
DiscreteTime get_now ()
{
return m_clock();
}
private:
Clock m_clock;
};
}
}
#endif

View File

@@ -185,20 +185,13 @@ public:
ServiceQueue m_queue;
Journal m_journal;
StoreSqdb m_store;
SerializedContext m_context;
CheckerAdapter m_checker;
Logic m_logic;
LogicType <SimpleMonotonicClock> m_logic;
DeadlineTimer m_connectTimer;
DeadlineTimer m_messageTimer;
DeadlineTimer m_cacheTimer;
// Ensures that all Logic member function entry points are
// called while holding a lock on the recursive mutex.
//
typedef ScopedWrapperContext <
RecursiveMutex, RecursiveMutex::ScopedLockType> SerializedContext;
SerializedContext m_context;
//--------------------------------------------------------------------------
ManagerImp (Stoppable& stoppable, Callback& callback, Journal journal)
@@ -206,7 +199,7 @@ public:
, Thread ("PeerFinder")
, m_journal (journal)
, m_store (journal)
, m_checker (m_queue)
, m_checker (m_context, m_queue)
, m_logic (callback, m_store, m_checker, journal)
, m_connectTimer (this)
, m_messageTimer (this)
@@ -267,13 +260,20 @@ public:
// VFALCO TODO This needs to be implemented
}
void onPeerConnecting ()
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::onPeerConnecting, &m_logic)));
}
void onPeerConnected (PeerID const& id,
IPEndpoint const& address, bool incoming)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::onPeerConnected, &m_logic,
id, address, incoming)));;
id, address, incoming)));
}
void onPeerDisconnected (const PeerID& id)
@@ -383,8 +383,6 @@ public:
File const file (File::getSpecialLocation (
File::userDocumentsDirectory).getChildFile ("PeerFinder.sqlite"));
m_journal.debug << "Opening database at '" << file.getFullPathName() << "'";
Error error (m_store.open (file));
if (error)
@@ -402,7 +400,9 @@ public:
m_messageTimer.setExpiration (secondsPerMessage);
m_cacheTimer.setExpiration (cacheSecondsToLive);
m_queue.post (bind (&Logic::makeOutgoingConnections, &m_logic));
m_queue.post (
m_context.wrap (
bind (&Logic::makeOutgoingConnections, &m_logic)));
}
void run ()

View File

@@ -32,14 +32,16 @@ struct PeerInfo
{
PeerInfo (PeerID const& id_,
IPEndpoint const& address_,
bool inbound_)
bool inbound_,
DiscreteTime now)
: id (id_)
, address (address_)
, inbound (inbound_)
, checked (inbound_ ? false : true)
, canAccept (inbound_ ? false : true)
, whenSendEndpoints (RelativeTime::fromStartup())
, whenAcceptEndpoints (RelativeTime::fromStartup())
, connectivityCheckInProgress (false)
, whenSendEndpoints (now)
, whenAcceptEndpoints (now)
{
}
@@ -52,17 +54,21 @@ struct PeerInfo
bool mutable checked;
// Set to indicate if the connection can receive incoming at the
// address advertised in mtENDPOINTS. Only valid if checked is true
// address advertised in mtENDPOINTS. Only valid if checked is true.
bool mutable canAccept;
// Set to indicate that a connection check for this peer is in
// progress. Valid always.
bool mutable connectivityCheckInProgress;
// The time after which we will send the peer mtENDPOINTS
RelativeTime mutable whenSendEndpoints;
DiscreteTime mutable whenSendEndpoints;
// The time after which we will accept mtENDPOINTS from the peer
// This is to prevent flooding or spamming. Receipt of mtENDPOINTS
// sooner than the allotted time should impose a load charge.
//
RelativeTime mutable whenAcceptEndpoints;
DiscreteTime mutable whenAcceptEndpoints;
// The set of all recent IPEndpoint that we have seen from this peer.
// We try to avoid sending a peer the same addresses they gave us.

View File

@@ -0,0 +1,32 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_PRIVATE_TYPES_H_INCLUDED
#define RIPPLE_PEERFINDER_PRIVATE_TYPES_H_INCLUDED
namespace ripple {
namespace PeerFinder {
/** Time in seconds since some baseline event in the past. */
typedef int DiscreteTime;
}
}
#endif

View File

@@ -0,0 +1,40 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_SIMPLEMONOTONICCLOCK_H_INCLUDED
#define RIPPLE_PEERFINDER_SIMPLEMONOTONICCLOCK_H_INCLUDED
namespace ripple {
namespace PeerFinder {
/** Monotonically increasing time value. */
struct SimpleMonotonicClock
{
typedef int value_type;
value_type operator() () const
{
return value_type (RelativeTime::fromStartup().inSeconds());
}
};
}
}
#endif

View File

@@ -34,30 +34,25 @@ Slots::Slots ()
void Slots::update (Config const& config)
{
if (! config.wantIncoming)
{
inboundSlots = 0;
inboundSlotsMaximum = 0;
}
double outDesiredFraction = 1;
if (config.wantIncoming)
outDesiredFraction = config.maxPeerCount * (Config::outPercent * .01);
if (m_roundUpwards)
outDesired = int (std::ceil (outDesiredFraction));
else
{
double outDesiredFraction (
config.maxPeerCount * (Config::outPercent * .01));
outDesired = int (std::floor (outDesiredFraction));
if (m_roundUpwards)
outDesired = int (std::ceil (outDesiredFraction));
else
outDesired = int (std::floor (outDesiredFraction));
if (outDesired < Config::minOutCount)
outDesired = Config::minOutCount;
if (outDesired < Config::minOutCount)
outDesired = Config::minOutCount;
if (config.maxPeerCount >= outDesired)
inboundSlotsMaximum = config.maxPeerCount - outDesired;
else
inboundSlotsMaximum = 0;
if (config.maxPeerCount >= outDesired)
inboundSlotsMaximum = config.maxPeerCount - outDesired;
else
inboundSlotsMaximum = 0;
inboundSlots = std::max (inboundSlotsMaximum - inboundCount, 0);
}
inboundSlots = std::max (inboundSlotsMaximum - inboundCount, 0);
}
void Slots::addPeer (Config const& config, bool inbound)

View File

@@ -24,13 +24,21 @@ namespace ripple {
namespace PeerFinder {
/** Database persistence for PeerFinder using SQLite */
class StoreSqdb : public Store
class StoreSqdb
: public Store
, public LeakChecked <StoreSqdb>
{
private:
Journal m_journal;
sqdb::session m_session;
public:
enum
{
// This determines the on-database format of the data
currentSchemaVersion = 2
};
explicit StoreSqdb (Journal journal = Journal())
: m_journal (journal)
{
@@ -44,9 +52,14 @@ public:
{
Error error (m_session.open (file.getFullPathName ()));
m_journal.info << "Opening database at '" << file.getFullPathName() << "'";
if (!error)
error = init ();
if (!error)
error = update ();
return error;
}
@@ -62,7 +75,7 @@ public:
if (! error)
{
m_session.once (error) <<
"SELECT COUNT(*) FROM LegacyEndpoints "
"SELECT COUNT(*) FROM PeerFinder_LegacyEndpoints "
,sqdb::into (count)
;
}
@@ -78,7 +91,7 @@ public:
{
std::string s;
sqdb::statement st = (m_session.prepare <<
"SELECT ipv4 FROM LegacyEndpoints "
"SELECT ipv4 FROM PeerFinder_LegacyEndpoints "
,sqdb::into (s)
);
@@ -110,13 +123,13 @@ public:
sqdb::transaction tr (m_session);
m_session.once (error) <<
"DELETE FROM LegacyEndpoints";
"DELETE FROM PeerFinder_LegacyEndpoints";
if (! error)
{
std::string s;
sqdb::statement st = (m_session.prepare <<
"INSERT INTO LegacyEndpoints ( "
"INSERT INTO PeerFinder_LegacyEndpoints ( "
" ipv4 "
") VALUES ( "
" ? "
@@ -145,6 +158,79 @@ public:
}
}
// Convert any existing entries from an older schema to the
// current one, if approrpriate.
//
Error update ()
{
Error error;
sqdb::transaction tr (m_session);
// get version
int version (0);
if (!error)
{
m_session.once (error) <<
"SELECT "
" version "
"FROM SchemaVersion WHERE "
" name = 'PeerFinder'"
,sqdb::into (version)
;
if (! error)
{
if (!m_session.got_data())
version = 0;
m_journal.info << "Opened version " << version << " database";
}
}
if (!error && version != currentSchemaVersion)
{
m_journal.info <<
"Updateding database to version " << currentSchemaVersion;
}
if (!error && (version < 2))
{
if (!error)
m_session.once (error) <<
"DROP TABLE IF EXISTS LegacyEndpoints";
if (!error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinderLegacyEndpoints";
}
if (!error)
{
int const version (currentSchemaVersion);
m_session.once (error) <<
"INSERT OR REPLACE INTO SchemaVersion ("
" name "
" ,version "
") VALUES ( "
" 'PeerFinder', ? "
")"
,sqdb::use(version);
}
if (!error)
error = tr.commit();
if (error)
{
tr.rollback();
report (error, __FILE__, __LINE__);
}
return error;
}
private:
Error init ()
{
@@ -160,13 +246,34 @@ private:
if (! error)
{
m_session.once (error) <<
"CREATE TABLE IF NOT EXISTS LegacyEndpoints ( "
"CREATE TABLE IF NOT EXISTS SchemaVersion ( "
" name TEXT PRIMARY KEY, "
" version INTEGER"
");"
;
}
if (! error)
{
m_session.once (error) <<
"CREATE TABLE IF NOT EXISTS PeerFinder_LegacyEndpoints ( "
" id INTEGER PRIMARY KEY AUTOINCREMENT, "
" ipv4 TEXT UNIQUE NOT NULL "
");"
;
}
if (! error)
{
m_session.once (error) <<
"CREATE INDEX IF NOT EXISTS "
" PeerFinder_LegacyEndpoints_Index ON PeerFinder_LegacyEndpoints "
" ( "
" ipv4 "
" ); "
;
}
if (! error)
{
error = tr.commit();

View File

@@ -42,6 +42,8 @@ namespace ripple {
using namespace beast;
}
#include "impl/SimpleMonotonicClock.h"
#include "impl/PrivateTypes.h"
# include "impl/Tuning.h"
# include "impl/Checker.h"
#include "impl/CheckerAdapter.h"
@@ -56,6 +58,7 @@ using namespace beast;
# include "impl/PeerInfo.h"
#include "impl/StoreSqdb.h"
#include "impl/Logic.h"
#include "impl/LogicType.h"
#include "impl/Checker.cpp"
#include "impl/Config.cpp"

View File

@@ -127,6 +127,12 @@ public:
if (config.wantIncoming)
config.listeningPort = getConfig().peerListeningPort;
// if it's a private peer or we are running as standalone
// automatic connections would defeat the purpose.
config.connectAutomatically =
!getConfig().RUN_STANDALONE &&
!getConfig().PEER_PRIVATE;
config.featureList = "";
m_peerFinder->setConfig (config);