PeerFinder fixes:

* Fix split horizon using recent address cache
* Change message tuning parameters to reduce dead messages
* Improved peer handout algorithm for addresses
* Improved handout algorithm for redirects
* Improved selection algorithm for autoconnect
* Faster autoconnection strategy
* Consolidate deadline timers
* Send empty endpoints message as a socket ping
* Fix hop count adjustments for live cache filtering
* Remove broken Peer::isConnected function
* Optimized Livecache for handouts
* Optimized Bootcache for handouts
* Remove uptime metric from Bootcache entries
* Add histogram to Livecache print output
This commit is contained in:
Vinnie Falco
2014-02-15 08:27:02 -08:00
parent bf085f0ef3
commit 995e64a205
30 changed files with 2096 additions and 1795 deletions

View File

@@ -103,6 +103,12 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\json\ripple_json.cpp" />
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Bootcache.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Checker.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -115,6 +121,12 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\ConnectHandouts.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Endpoint.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -133,6 +145,24 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\RedirectHandouts.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\SlotHandouts.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\SlotImp.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\SlotImp.h">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -2235,16 +2265,17 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Bootcache.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Checker.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\ConnectHandouts.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Fixed.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Giveaways.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\handout.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\iosformat.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Livecache.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Logic.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\PrivateTypes.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\RedirectHandouts.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Reporting.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Seen.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Counts.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Sorts.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\SlotHandouts.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Source.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\SourceStrings.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Store.h" />

View File

@@ -1473,6 +1473,21 @@
<ClCompile Include="..\..\src\ripple_overlay\ripple_overlay.cpp">
<Filter>[2] Old Ripple\ripple_overlay</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Bootcache.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\ConnectHandouts.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\RedirectHandouts.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\SlotHandouts.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\SlotImp.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\ripple_basics\containers\RangeSet.h">
@@ -2913,9 +2928,6 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Giveaways.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\iosformat.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
@@ -2931,12 +2943,6 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Reporting.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Seen.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Sorts.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Source.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
@@ -2946,9 +2952,6 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Store.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\StoreSqdb.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Tuning.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
@@ -3036,6 +3039,21 @@
<ClInclude Include="..\..\src\ripple_overlay\ripple_overlay.h">
<Filter>[2] Old Ripple\ripple_overlay</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\handout.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\SlotHandouts.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\RedirectHandouts.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\ConnectHandouts.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\StoreSqdb.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<CustomBuild Include="..\..\src\ripple_data\protocol\ripple.proto">

View File

@@ -260,7 +260,7 @@ Slot properties may be combined and are not mutually exclusive.
configuration file or learned through overlay messages from other trusted
peers. Cluster slots do not count towards connection limits.
* **Superpeer** (2.0)
* **Superpeer** (forthcoming)
A superpeer slot is a connection to a peer which can accept incoming
connections, meets certain resource availaibility requirements (such as

View File

@@ -28,10 +28,14 @@ struct Endpoint
{
Endpoint ();
Endpoint (IP::Endpoint const& ep, int hops_);
int hops;
IP::Endpoint address;
};
bool operator< (Endpoint const& lhs, Endpoint const& rhs);
}
}

View File

@@ -0,0 +1,267 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "Bootcache.h"
namespace ripple {
namespace PeerFinder {
Bootcache::Bootcache (
Store& store,
clock_type& clock,
Journal journal)
: m_store (store)
, m_clock (clock)
, m_journal (journal)
, m_whenUpdate (m_clock.now ())
{
}
Bootcache::~Bootcache ()
{
update();
}
bool
Bootcache::empty() const
{
return m_map.empty();
}
Bootcache::map_type::size_type
Bootcache::size() const
{
return m_map.size();
}
Bootcache::const_iterator
Bootcache::begin() const
{
return const_iterator (m_map.right.begin());
}
Bootcache::const_iterator
Bootcache::cbegin() const
{
return const_iterator (m_map.right.begin());
}
Bootcache::const_iterator
Bootcache::end() const
{
return const_iterator (m_map.right.end());
}
Bootcache::const_iterator
Bootcache::cend() const
{
return const_iterator (m_map.right.end());
}
void
Bootcache::clear()
{
m_map.clear();
m_needsUpdate = true;
}
//--------------------------------------------------------------------------
void
Bootcache::load ()
{
clear();
auto const n (m_store.load (
[this](IP::Endpoint const& endpoint, int valence)
{
auto const result (this->m_map.insert (
value_type (endpoint, valence)));
if (! result.second)
{
if (this->m_journal.error)
this->m_journal.error << leftw (18) <<
"Bootcache discard " << endpoint;
}
}));
if (n > 0)
{
if (m_journal.info) m_journal.info << leftw (18) <<
"Bootcache loaded " << n <<
((n > 1) ? " addresses" : " address");
prune ();
}
}
bool
Bootcache::insert (IP::Endpoint const& endpoint)
{
auto const result (m_map.insert (
value_type (endpoint, 0)));
if (result.second)
{
if (m_journal.trace) m_journal.trace << leftw (18) <<
"Bootcache insert " << endpoint;
prune ();
flagForUpdate();
}
return result.second;
}
void
Bootcache::on_success (IP::Endpoint const& endpoint)
{
auto result (m_map.insert (
value_type (endpoint, 1)));
if (result.second)
{
prune ();
}
else
{
Entry entry (result.first->right);
if (entry.valence() < 0)
entry.valence() = 0;
++entry.valence();
m_map.erase (result.first);
result = m_map.insert (
value_type (endpoint, entry));
assert (result.second);
}
Entry const& entry (result.first->right);
if (m_journal.info) m_journal.info << leftw (18) <<
"Bootcache connect " << endpoint <<
" with " << entry.valence() <<
((entry.valence() > 1) ? " successes" : " success");
flagForUpdate();
}
void
Bootcache::on_failure (IP::Endpoint const& endpoint)
{
auto result (m_map.insert (
value_type (endpoint, -1)));
if (result.second)
{
prune();
}
else
{
Entry entry (result.first->right);
if (entry.valence() > 0)
entry.valence() = 0;
--entry.valence();
m_map.erase (result.first);
result = m_map.insert (
value_type (endpoint, entry));
assert (result.second);
}
Entry const& entry (result.first->right);
auto const n (std::abs (entry.valence()));
if (m_journal.debug) m_journal.debug << leftw (18) <<
"Bootcache failed " << endpoint <<
" with " << n <<
((n > 1) ? " attempts" : " attempt");
flagForUpdate();
}
void
Bootcache::periodicActivity ()
{
checkUpdate();
}
//--------------------------------------------------------------------------
void
Bootcache::onWrite (PropertyStream::Map& map)
{
map ["entries"] = uint32 (m_map.size());
}
// Checks the cache size and prunes if its over the limit.
void
Bootcache::prune ()
{
if (size() <= Tuning::bootcacheSize)
return;
// Calculate the amount to remove
auto count ((size() *
Tuning::bootcachePrunePercent) / 100);
decltype(count) pruned (0);
// Work backwards because bimap doesn't handle
// erasing using a reverse iterator very well.
//
for (auto iter (m_map.right.end());
count-- > 0 && iter != m_map.right.begin(); ++pruned)
{
--iter;
IP::Endpoint const& endpoint (iter->get_left());
Entry const& entry (iter->get_right());
if (m_journal.trace) m_journal.trace << leftw (18) <<
"Bootcache pruned" << endpoint <<
" at valence " << entry.valence();
iter = m_map.right.erase (iter);
}
if (m_journal.debug) m_journal.debug << leftw (18) <<
"Bootcache pruned " << pruned << " entries total";
}
// Updates the Store with the current set of entries if needed.
void
Bootcache::update ()
{
if (! m_needsUpdate)
return;
std::vector <Store::Entry> list;
list.reserve (m_map.size());
for (auto const& e : m_map)
{
Store::Entry se;
se.endpoint = e.get_left();
se.valence = e.get_right().valence();
list.push_back (se);
}
m_store.save (list);
// Reset the flag and cooldown timer
m_needsUpdate = false;
m_whenUpdate = m_clock.now() + Tuning::bootcacheCooldownTime;
}
// Checks the clock and calls update if we are off the cooldown.
void
Bootcache::checkUpdate ()
{
if (m_needsUpdate && m_whenUpdate < m_clock.now())
update ();
}
// Called when changes to an entry will affect the Store.
void
Bootcache::flagForUpdate ()
{
m_needsUpdate = true;
checkUpdate ();
}
}
}

View File

@@ -20,6 +20,10 @@
#ifndef RIPPLE_PEERFINDER_BOOTCACHE_H_INCLUDED
#define RIPPLE_PEERFINDER_BOOTCACHE_H_INCLUDED
#include <boost/bimap.hpp>
#include <boost/bimap/multiset_of.hpp>
#include <boost/bimap/unordered_set_of.hpp>
namespace ripple {
namespace PeerFinder {
@@ -29,13 +33,7 @@ namespace PeerFinder {
connections are needed. Along with the address, each entry has this
additional metadata:
Uptime
The number of seconds that the address has maintained an active
peer connection, cumulative, without a connection attempt failure.
Valence
A signed integer which represents the number of successful
consecutive connection attempts when positive, and the number of
failed consecutive connection attempts when negative.
@@ -46,33 +44,18 @@ namespace PeerFinder {
*/
class Bootcache
{
public:
/** An item used for connecting. */
class Endpoint
private:
class Entry
{
public:
Endpoint ()
: m_uptime (0)
, m_valence (0)
Entry (int valence)
: m_valence (valence)
{
}
Endpoint (IP::Endpoint const& address,
std::chrono::seconds uptime, int valence)
: m_address (address)
, m_uptime (uptime)
, m_valence (valence)
int& valence ()
{
}
IP::Endpoint const& address () const
{
return m_address;
}
std::chrono::seconds uptime () const
{
return m_uptime;
return m_valence;
}
int valence () const
@@ -80,106 +63,40 @@ public:
return m_valence;
}
private:
IP::Endpoint m_address;
std::chrono::seconds m_uptime;
int m_valence;
};
typedef std::vector <Endpoint> Endpoints;
//--------------------------------------------------------------------------
/** An entry in the bootstrap cache. */
struct Entry
{
Entry ()
: cumulativeUptime (0)
, sessionUptime (0)
, connectionValence (0)
, active (false)
friend bool operator< (Entry const& lhs, Entry const& rhs)
{
}
/** Update the uptime measurement based on the time. */
void update (clock_type::time_point const& now)
{
// Must be active!
assert (active);
// Clock must be monotonically increasing
assert (now >= whenActive);
// Remove the uptime we added earlier in the
// session and add back in the new uptime measurement.
auto const uptime (now - whenActive);
cumulativeUptime -= sessionUptime;
cumulativeUptime += uptime;
sessionUptime = uptime;
}
/** Our cumulative uptime with this address with no failures. */
std::chrono::seconds cumulativeUptime;
/** Amount of uptime from the current session (if any). */
std::chrono::seconds sessionUptime;
/** Number of consecutive connection successes or failures.
If the number is positive, indicates the number of
consecutive successful connection attempts, else the
absolute value indicates the number of consecutive
connection failures.
*/
int connectionValence;
/** `true` if the peer has handshaked and is currently connected. */
bool active;
/** Time when the peer became active. */
clock_type::time_point whenActive;
};
//--------------------------------------------------------------------------
/* Comparison function for entries.
1. Sort descending by cumulative uptime
2. For all uptimes == 0,
Sort descending by connection successes
3. For all successes == 0
Sort ascending by number of failures
*/
struct Less
{
template <typename Iter>
bool operator() (
Iter const& lhs_iter, Iter const& rhs_iter)
{
Entry const& lhs (lhs_iter->second);
Entry const& rhs (rhs_iter->second);
// Higher cumulative uptime always wins
if (lhs.cumulativeUptime > rhs.cumulativeUptime)
return true;
else if (lhs.cumulativeUptime <= rhs.cumulativeUptime
&& rhs.cumulativeUptime.count() != 0)
return false;
// At this point both uptimes will be zero
consistency_check (lhs.cumulativeUptime.count() == 0 &&
rhs.cumulativeUptime.count() == 0);
if (lhs.connectionValence > rhs.connectionValence)
if (lhs.valence() > rhs.valence())
return true;
return false;
}
private:
int m_valence;
};
//--------------------------------------------------------------------------
typedef boost::bimaps::unordered_set_of <IP::Endpoint> left_t;
typedef boost::bimaps::multiset_of <Entry> right_t;
typedef boost::bimap <left_t, right_t> map_type;
typedef map_type::value_type value_type;
typedef std::unordered_map <IP::Endpoint, Entry> Entries;
struct Transform : std::unary_function <
map_type::right_map::const_iterator::value_type const&,
IP::Endpoint const&>
{
IP::Endpoint const& operator() (
map_type::right_map::
const_iterator::value_type const& v) const
{
return v.get_left();
}
};
typedef std::vector <Entries::iterator> SortedEntries;
private:
map_type m_map;
Store& m_store;
clock_type& m_clock;
Journal m_journal;
Entries m_entries;
// Time after which we can update the database again
clock_type::time_point m_whenUpdate;
@@ -187,367 +104,57 @@ public:
// Set to true when a database update is needed
bool m_needsUpdate;
public:
typedef boost::transform_iterator <Transform,
map_type::right_map::const_iterator> iterator;
typedef iterator const_iterator;
Bootcache (
Store& store,
clock_type& clock,
Journal journal)
: m_store (store)
, m_clock (clock)
, m_journal (journal)
, m_whenUpdate (m_clock.now ())
{
}
Journal journal);
~Bootcache ()
{
update ();
}
~Bootcache ();
//--------------------------------------------------------------------------
/** Load the persisted data from the Store into the container. */
void load ()
{
typedef std::vector <Store::SavedBootstrapAddress> StoredData;
StoredData const list (m_store.loadBootstrapCache ());
std::size_t count (0);
for (StoredData::const_iterator iter (list.begin());
iter != list.end(); ++iter)
{
std::pair <Entries::iterator, bool> result (
m_entries.emplace (std::piecewise_construct,
std::forward_as_tuple (iter->address),
std::make_tuple ()));
if (result.second)
{
++count;
Entry& entry (result.first->second);
entry.cumulativeUptime = iter->cumulativeUptime;
entry.connectionValence = iter->connectionValence;
}
else
{
if (m_journal.error) m_journal.error << leftw (18) <<
"Bootcache discard " << iter->address;
}
}
if (count > 0)
{
if (m_journal.info) m_journal.info << leftw (18) <<
"Bootcache loaded " << count <<
((count > 1) ? " addresses" : " address");
}
prune ();
}
/** Returns `true` if the cache is empty. */
bool empty() const;
/** Returns the number of entries in the cache. */
std::size_t size () const
{
return m_entries.size();
}
map_type::size_type size() const;
/** Returns up to the specified number of the best addresses. */
IPAddresses getAddresses (int n)
{
SortedEntries const list (sort());
IPAddresses result;
int count (0);
result.reserve (n);
for (SortedEntries::const_iterator iter (
list.begin()); ++count <= n && iter != list.end(); ++iter)
result.push_back ((*iter)->first);
consistency_check (result.size() <= n);
return result;
}
/** IP::Endpoint iterators that traverse in decreasing valence. */
/** @{ */
const_iterator begin() const;
const_iterator cbegin() const;
const_iterator end() const;
const_iterator cend() const;
void clear();
/** @} */
/** Returns all entries in the cache. */
Endpoints fetch () const
{
Endpoints result;
result.reserve (m_entries.size ());
for (Entries::const_iterator iter (m_entries.begin ());
iter != m_entries.end (); ++iter)
result.emplace_back (iter->first,
iter->second.cumulativeUptime,
iter->second.connectionValence);
return result;
}
/** Load the persisted data from the Store into the container. */
void load ();
/** Called periodically to perform time related tasks. */
void periodicActivity ()
{
checkUpdate();
}
/** Called when an address is learned from a message. */
bool insert (IP::Endpoint const& address)
{
std::pair <Entries::iterator, bool> result (
m_entries.emplace (std::piecewise_construct,
std::forward_as_tuple (address),
std::make_tuple ()));
if (result.second)
{
if (m_journal.trace) m_journal.trace << leftw (18) <<
"Bootcache insert " << address;
prune ();
flagForUpdate();
}
return result.second;
}
/** Called when an outbound connection attempt fails to handshake. */
void onConnectionFailure (IP::Endpoint const& address)
{
Entries::iterator iter (m_entries.find (address));
// If the entry doesn't already exist don't bother remembering
// it since the connection failed.
//
if (iter == m_entries.end())
return;
Entry& entry (iter->second);
// Reset cumulative uptime to zero. We are aggressive
// with resetting uptime to prevent the entire network
// from settling on just a handful of addresses.
//
entry.cumulativeUptime = std::chrono::seconds (0);
entry.sessionUptime = std::chrono::seconds (0);
// Increment the number of consecutive failures.
if (entry.connectionValence > 0)
entry.connectionValence = 0;
--entry.connectionValence;
int const count (std::abs (entry.connectionValence));
if (m_journal.debug) m_journal.debug << leftw (18) <<
"Bootcache failed " << address <<
" with " << count <<
((count > 1) ? " attempts" : " attempt");
flagForUpdate();
}
/** Add the address to the cache. */
bool insert (IP::Endpoint const& endpoint);
/** Called when an outbound connection handshake completes. */
void onConnectionHandshake (IP::Endpoint const& address,
HandshakeAction action)
{
std::pair <Entries::iterator, bool> result (
m_entries.emplace (std::piecewise_construct,
std::forward_as_tuple (address),
std::make_tuple ()));
Entry& entry (result.first->second);
// Can't already be active!
consistency_check (! entry.active);
// Reset session uptime
entry.sessionUptime = std::chrono::seconds (0);
// Count this as a connection success
if (entry.connectionValence < 0)
entry.connectionValence = 0;
++entry.connectionValence;
// Update active status
if (action == doActivate)
{
entry.active = true;
entry.whenActive = m_clock.now();
}
else
{
entry.active = false;
}
// Prune if we made the container larger
if (result.second)
prune ();
flagForUpdate();
if (m_journal.info) m_journal.info << leftw (18) <<
"Bootcache connect " << address <<
" with " << entry.connectionValence <<
((entry.connectionValence > 1) ? " successes" : " success");
}
void on_success (IP::Endpoint const& endpoint);
/** Called periodically while the peer is active. */
//
// VFALCO TODO Can't we just put the active ones into an intrusive list
// and update their uptime in periodicActivity() now that
// we have the m_clock member?
//
void onConnectionActive (IP::Endpoint const& address)
{
std::pair <Entries::iterator, bool> result (
m_entries.emplace (std::piecewise_construct,
std::forward_as_tuple (address),
std::make_tuple ()));
// Must exist!
consistency_check (! result.second);
Entry& entry (result.first->second);
entry.update (m_clock.now());
flagForUpdate();
}
/** Called when an outbound connection attempt fails to handshake. */
void on_failure (IP::Endpoint const& endpoint);
template <class Rep, class Period>
static std::string uptime_phrase (
std::chrono::duration <Rep, Period> const& elapsed)
{
if (elapsed.count() > 0)
{
std::stringstream ss;
ss << " with " << elapsed << " uptime";
return ss.str();
}
return std::string ();
}
/** Called when an active outbound connection closes. */
void onConnectionClosed (IP::Endpoint const& address)
{
Entries::iterator iter (m_entries.find (address));
// Must exist!
consistency_check (iter != m_entries.end());
Entry& entry (iter->second);
// Must be active!
consistency_check (entry.active);
if (m_journal.trace) m_journal.trace << leftw (18) <<
"Bootcache close " << address <<
uptime_phrase (entry.cumulativeUptime);
entry.update (m_clock.now());
entry.sessionUptime = std::chrono::seconds (0);
entry.active = false;
flagForUpdate();
}
/** Stores the cache in the persistent database on a timer. */
void periodicActivity ();
//--------------------------------------------------------------------------
//
// Diagnostics
//
//--------------------------------------------------------------------------
/** Write the cache state to the property stream. */
void onWrite (PropertyStream::Map& map);
void onWrite (PropertyStream::Map& map)
{
map ["entries"] = uint32(m_entries.size());
}
static std::string valenceString (int valence)
{
std::stringstream ss;
if (valence >= 0)
ss << '+';
ss << valence;
return ss.str();
}
void dump (Journal::ScopedStream const& ss) const
{
std::vector <Entries::const_iterator> const list (csort ());
ss << std::endl << std::endl <<
"Bootcache (size " << list.size() << ")";
for (std::vector <Entries::const_iterator>::const_iterator iter (
list.begin()); iter != list.end(); ++iter)
{
ss << std::endl <<
(*iter)->first << ", " <<
(*iter)->second.cumulativeUptime << ", "
<< valenceString ((*iter)->second.connectionValence);
if ((*iter)->second.active)
ss <<
", active";
}
}
//--------------------------------------------------------------------------
private:
// Returns a vector of entry iterators sorted by descending score
std::vector <Entries::const_iterator> csort () const
{
std::vector <Entries::const_iterator> result;
result.reserve (m_entries.size());
for (Entries::const_iterator iter (m_entries.begin());
iter != m_entries.end(); ++iter)
result.push_back (iter);
std::random_shuffle (result.begin(), result.end());
// should be std::unstable_sort (c++11)
std::sort (result.begin(), result.end(), Less());
return result;
}
// Returns a vector of entry iterators sorted by descending score
std::vector <Entries::iterator> sort ()
{
std::vector <Entries::iterator> result;
result.reserve (m_entries.size());
for (Entries::iterator iter (m_entries.begin());
iter != m_entries.end(); ++iter)
result.push_back (iter);
std::random_shuffle (result.begin(), result.end());
// should be std::unstable_sort (c++11)
std::sort (result.begin(), result.end(), Less());
return result;
}
// Checks the cache size and prunes if its over the limit.
void prune ()
{
if (m_entries.size() <= Tuning::bootcacheSize)
return;
// Calculate the amount to remove
int count ((m_entries.size() *
Tuning::bootcachePrunePercent) / 100);
int pruned (0);
SortedEntries list (sort ());
for (SortedEntries::const_reverse_iterator iter (
list.rbegin()); count > 0 && iter != list.rend(); ++iter)
{
Entry& entry ((*iter)->second);
// skip active entries
if (entry.active)
continue;
if (m_journal.trace) m_journal.trace << leftw (18) <<
"Bootcache pruned" << (*iter)->first <<
uptime_phrase (entry.cumulativeUptime) <<
" and valence " << entry.connectionValence;
m_entries.erase (*iter);
--count;
++pruned;
}
if (m_journal.debug) m_journal.debug << leftw (18) <<
"Bootcache pruned " << pruned << " entries total";
}
// Updates the Store with the current set of entries if needed.
void update ()
{
if (! m_needsUpdate)
return;
typedef std::vector <Store::SavedBootstrapAddress> StoredData;
StoredData list;
list.reserve (m_entries.size());
for (Entries::const_iterator iter (m_entries.begin());
iter != m_entries.end(); ++iter)
{
Store::SavedBootstrapAddress entry;
entry.address = iter->first;
entry.cumulativeUptime = iter->second.cumulativeUptime;
entry.connectionValence = iter->second.connectionValence;
list.push_back (entry);
}
m_store.updateBootstrapCache (list);
// Reset the flag and cooldown timer
m_needsUpdate = false;
m_whenUpdate = m_clock.now() + Tuning::bootcacheCooldownTime;
}
// Checks the clock and calls update if we are off the cooldown.
void checkUpdate ()
{
if (m_needsUpdate && m_whenUpdate < m_clock.now())
update ();
}
// Called when changes to an entry will affect the Store.
void flagForUpdate ()
{
m_needsUpdate = true;
checkUpdate ();
}
void prune ();
void update ();
void checkUpdate ();
void flagForUpdate ();
};
}

View File

@@ -0,0 +1,64 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "ConnectHandouts.h"
namespace ripple {
namespace PeerFinder {
ConnectHandouts::ConnectHandouts (
std::size_t needed, Squelches& squelches)
: m_needed (needed)
, m_squelches (squelches)
{
m_list.reserve (needed);
}
bool
ConnectHandouts::try_insert (IP::Endpoint const& endpoint)
{
if (full ())
return false;
// Make sure the address isn't already in our list
if (std::any_of (m_list.begin(), m_list.end(),
[&endpoint](IP::Endpoint const& other)
{
// Ignore port for security reasons
return other.address() ==
endpoint.address();
}))
{
return false;
}
// Add to squelch list so we don't try it too often.
// If its already there, then make try_insert fail.
auto const result (m_squelches.insert (
endpoint.address()));
if (! result.second)
return false;
m_list.push_back (endpoint);
return true;
}
}
}

View File

@@ -0,0 +1,79 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_CONNECTHANDOUTS_H_INCLUDED
#define RIPPLE_PEERFINDER_CONNECTHANDOUTS_H_INCLUDED
#include "Tuning.h"
#include "../../../beast/beast/container/aged_set.h"
namespace ripple {
namespace PeerFinder {
/** Receives handouts for making automatic connections. */
class ConnectHandouts
{
public:
// Keeps track of addresses we have made outgoing connections
// to, for the purposes of not connecting to them too frequently.
typedef beast::aged_set <IP::Address> Squelches;
typedef std::vector <IP::Endpoint> list_type;
private:
std::size_t m_needed;
Squelches& m_squelches;
list_type m_list;
public:
ConnectHandouts (std::size_t needed, Squelches& squelches);
bool empty() const
{
return m_list.empty();
}
bool full() const
{
return m_list.size() >= m_needed;
}
bool try_insert (Endpoint const& endpoint)
{
return try_insert (endpoint.address);
}
list_type& list()
{
return m_list;
}
list_type const& list() const
{
return m_list;
}
bool try_insert (IP::Endpoint const& endpoint);
};
}
}
#endif

View File

@@ -27,5 +27,16 @@ Endpoint::Endpoint ()
{
}
Endpoint::Endpoint (IP::Endpoint const& ep, int hops_)
: hops (hops_)
, address (ep)
{
}
bool operator< (Endpoint const& lhs, Endpoint const& rhs)
{
return lhs.address < rhs.address;
}
}
}

View File

@@ -1,150 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_GIVEAWAYS_H_INCLUDED
#define RIPPLE_PEERFINDER_GIVEAWAYS_H_INCLUDED
namespace ripple {
namespace PeerFinder {
/** Holds a rotating set of endpoint messages to give away. */
class Giveaways
{
public:
typedef std::vector <Endpoint const*> Bucket;
typedef boost::array <Bucket, Tuning::maxHops + 1> Buckets;
Endpoints m_endpoints;
std::size_t m_remain;
Buckets m_buckets;
void prepare ()
{
for (Buckets::iterator iter (m_buckets.begin());
iter != m_buckets.end(); ++iter)
iter->reserve (m_endpoints.size ());
}
public:
bool is_consistent ()
{
// Make sure the counts add up
std::size_t count (0);
for (Buckets::const_iterator iter (m_buckets.begin());
iter != m_buckets.end(); ++iter)
count += iter->size();
return count == m_remain;
}
void refill ()
{
// Empty out the buckets
for (Buckets::iterator iter (m_buckets.begin());
iter != m_buckets.end(); ++iter)
iter->clear();
// Put endpoints back into buckets
for (Endpoints::const_iterator iter (m_endpoints.begin());
iter != m_endpoints.end(); ++iter)
{
Endpoint const& ep (*iter);
consistency_check (ep.hops <= Tuning::maxHops);
m_buckets [ep.hops].push_back (&ep);
}
// Shuffle the buckets
for (Buckets::iterator iter (m_buckets.begin());
iter != m_buckets.end(); ++iter)
std::random_shuffle (iter->begin(), iter->end());
m_remain = m_endpoints.size();
consistency_check (is_consistent ());
}
public:
explicit Giveaways (Endpoints const& endpoints)
: m_endpoints (endpoints)
, m_remain (0)
{
prepare();
}
#if BEAST_COMPILER_SUPPORTS_MOVE_SEMANTICS
Giveaways (Endpoints&& endpoints)
: m_endpoints (endpoints)
, m_remain (0)
{
prepare();
}
#endif
/** Append up to `n` Endpoint to the specified container.
The entries added to the container will have hops incremented.
*/
template <typename EndpointContainer>
void append (Endpoints::size_type n, EndpointContainer& c)
{
n = std::min (n, m_endpoints.size());
c.reserve (c.size () + n);
if (m_remain < n)
refill ();
for (cyclic_iterator <Buckets::iterator> iter (
m_buckets.begin (), m_buckets.begin (), m_buckets.end()); n;)
{
Bucket& bucket (*iter++);
if (! bucket.empty ())
{
c.emplace_back (*bucket.back ());
bucket.pop_back ();
++c.back ().hops;
--n;
--m_remain;
}
}
consistency_check (is_consistent ());
}
/** Retrieve a fresh set of endpoints, preferring high hops.
The entries added to the container will have hops incremented.
*/
template <typename EndpointContainer>
void reverse_append (Endpoints::size_type n, EndpointContainer& c)
{
n = std::min (n, m_endpoints.size());
c.reserve (c.size () + n);
if (m_remain < n)
refill ();
for (cyclic_iterator <Buckets::reverse_iterator> iter (
m_buckets.rbegin (), m_buckets.rbegin (), m_buckets.rend()); n;)
{
Bucket& bucket (*iter++);
if (! bucket.empty ())
{
c.emplace_back (*bucket.back ());
bucket.pop_back ();
++c.back ().hops;
--n;
--m_remain;
}
}
consistency_check (is_consistent ());
}
};
}
}
#endif

View File

@@ -20,13 +20,24 @@
namespace ripple {
namespace PeerFinder {
//------------------------------------------------------------------------------
namespace detail {
}
//------------------------------------------------------------------------------
class LivecacheTests : public UnitTest
{
public:
manual_clock <clock_type::duration> m_clock;
// Add the address as an endpoint
void add (uint32 index, uint16 port, Livecache& c)
template <class C>
void add (uint32 index, uint16 port, C& c)
{
Endpoint ep;
ep.hops = 0;
@@ -39,7 +50,7 @@ public:
{
beginTestCase ("fetch");
Livecache c (m_clock, Journal());
Livecache <> c (m_clock, Journal());
add (1, 1, c);
add (2, 1, c);
@@ -52,32 +63,7 @@ public:
add (6, 2, c);
add (7, 1, c);
Endpoints const eps (c.fetch_unique ());
struct IsAddr
{
explicit IsAddr (uint32 index_)
: index (index_)
{ }
bool operator() (Endpoint const& ep) const
{ return ep.address.to_v4().value == index; }
uint32 index;
};
expect (std::count_if (
eps.begin(), eps.end(), IsAddr (1)) == 1);
expect (std::count_if (
eps.begin(), eps.end(), IsAddr (2)) == 1);
expect (std::count_if (
eps.begin(), eps.end(), IsAddr (3)) == 1);
expect (std::count_if (
eps.begin(), eps.end(), IsAddr (4)) == 1);
expect (std::count_if (
eps.begin(), eps.end(), IsAddr (5)) == 1);
expect (std::count_if (
eps.begin(), eps.end(), IsAddr (6)) == 1);
expect (std::count_if (
eps.begin(), eps.end(), IsAddr (7)) == 1);
// VFALCO TODO!!!
pass();
}

View File

@@ -20,11 +20,151 @@
#ifndef RIPPLE_PEERFINDER_LIVECACHE_H_INCLUDED
#define RIPPLE_PEERFINDER_LIVECACHE_H_INCLUDED
#include <unordered_map>
#include "../../../beast/beast/container/aged_map.h"
#include "../../../beast/beast/type_traits/maybe_const.h"
#include <boost/intrusive/list.hpp>
#include <boost/iterator/transform_iterator.hpp>
namespace ripple {
namespace PeerFinder {
template <class>
class Livecache;
namespace detail {
class LivecacheBase
{
protected:
struct Element
: boost::intrusive::list_base_hook <>
{
Element (Endpoint const& endpoint_)
: endpoint (endpoint_)
{
}
Endpoint endpoint;
};
typedef boost::intrusive::make_list <Element,
boost::intrusive::constant_time_size <false>
>::type list_type;
public:
/** A list of Endpoint at the same hops
This is a lightweight wrapper around a reference to the underlying
container.
*/
template <bool IsConst>
class Hop
{
public:
// Iterator transformation to extract the endpoint from Element
struct Transform
: public std::unary_function <Element, Endpoint>
{
Endpoint const& operator() (Element const& e) const
{
return e.endpoint;
}
};
public:
typedef boost::transform_iterator <Transform,
typename list_type::const_iterator> iterator;
typedef iterator const_iterator;
typedef boost::transform_iterator <Transform,
typename list_type::const_reverse_iterator> reverse_iterator;
typedef reverse_iterator const_reverse_iterator;
iterator begin () const
{
return iterator (m_list.get().cbegin(),
Transform());
}
iterator cbegin () const
{
return iterator (m_list.get().cbegin(),
Transform());
}
iterator end () const
{
return iterator (m_list.get().cend(),
Transform());
}
iterator cend () const
{
return iterator (m_list.get().cend(),
Transform());
}
reverse_iterator rbegin () const
{
return reverse_iterator (m_list.get().crbegin(),
Transform());
}
reverse_iterator crbegin () const
{
return reverse_iterator (m_list.get().crbegin(),
Transform());
}
reverse_iterator rend () const
{
return reverse_iterator (m_list.get().crend(),
Transform());
}
reverse_iterator crend () const
{
return reverse_iterator (m_list.get().crend(),
Transform());
}
// move the element to the end of the container
void move_back (const_iterator pos)
{
auto& e (const_cast <Element&>(*pos.base()));
m_list.get().erase (m_list.get().iterator_to (e));
m_list.get().push_back (e);
}
private:
explicit Hop (typename maybe_const <
IsConst, list_type>::type& list)
: m_list (list)
{
}
friend class LivecacheBase;
std::reference_wrapper <typename maybe_const <
IsConst, list_type>::type> m_list;
};
protected:
// Work-around to call Hop's private constructor from Livecache
template <bool IsConst>
static Hop <IsConst> make_hop (typename maybe_const <
IsConst, list_type>::type& list)
{
return Hop <IsConst> (list);
}
};
}
//------------------------------------------------------------------------------
/** The Livecache holds the short-lived relayed Endpoint messages.
Since peers only advertise themselves when they have open slots,
@@ -37,234 +177,382 @@ namespace PeerFinder {
launches or for bootstrapping, because they do not have verifiable
and locally observed uptime and connectibility information.
*/
class Livecache
template <class Allocator = std::allocator <char>>
class Livecache : protected detail::LivecacheBase
{
public:
struct Entry;
private:
typedef aged_map <
IP::Endpoint,
Element,
std::chrono::seconds,
std::less <IP::Endpoint>,
Allocator
> cache_type;
typedef List <Entry> EntryList;
struct Entry : public EntryList::Node
{
Entry (Endpoint const& endpoint_,
clock_type::time_point const& whenExpires_)
: endpoint (endpoint_)
, whenExpires (whenExpires_)
{
}
Endpoint endpoint;
clock_type::time_point whenExpires;
};
typedef std::set <Endpoint, LessEndpoints> SortedTable;
typedef std::unordered_map <IP::Endpoint, Entry> AddressTable;
clock_type& m_clock;
Journal m_journal;
AddressTable m_byAddress;
SortedTable m_bySorted;
// Tracks all the cached endpoints stored in the endpoint table
// in oldest-to-newest order. The oldest item is at the head.
EntryList m_list;
cache_type m_cache;
public:
typedef Allocator allocator_type;
/** Create the cache. */
Livecache (
clock_type& clock,
Journal journal)
: m_clock (clock)
, m_journal (journal)
Journal journal,
Allocator alloc = Allocator());
//
// Iteration by hops
//
// The range [begin, end) provides a sequence of list_type
// where each list contains endpoints at a given hops.
//
class hops_t
{
}
private:
// An endpoint at hops=0 represents the local node.
// Endpoints coming in at maxHops are stored at maxHops +1,
// but not given out (since they would exceed maxHops). They
// are used for automatic connection attempts.
//
typedef std::array <int, 1 + Tuning::maxHops + 1> Histogram;
typedef std::array <list_type, 1 + Tuning::maxHops + 1> lists_type;
template <bool IsConst>
struct Transform
: public std::unary_function <
typename lists_type::value_type, Hop <IsConst>>
{
Hop <IsConst> operator() (typename maybe_const <
IsConst, typename lists_type::value_type>::type& list) const
{
return make_hop <IsConst> (list);
}
};
public:
typedef boost::transform_iterator <Transform <false>,
typename lists_type::iterator> iterator;
typedef boost::transform_iterator <Transform <true>,
typename lists_type::const_iterator> const_iterator;
typedef boost::transform_iterator <Transform <false>,
typename lists_type::reverse_iterator> reverse_iterator;
typedef boost::transform_iterator <Transform <true>,
typename lists_type::const_reverse_iterator> const_reverse_iterator;
iterator begin ()
{
return iterator (m_lists.begin(),
Transform <false>());
}
const_iterator begin () const
{
return const_iterator (m_lists.cbegin(),
Transform <true>());
}
const_iterator cbegin () const
{
return const_iterator (m_lists.cbegin(),
Transform <true>());
}
iterator end ()
{
return iterator (m_lists.end(),
Transform <false>());
}
const_iterator end () const
{
return const_iterator (m_lists.cend(),
Transform <true>());
}
const_iterator cend () const
{
return const_iterator (m_lists.cend(),
Transform <true>());
}
reverse_iterator rbegin ()
{
return reverse_iterator (m_lists.rbegin(),
Transform <false>());
}
const_reverse_iterator rbegin () const
{
return const_reverse_iterator (m_lists.crbegin(),
Transform <true>());
}
const_reverse_iterator crbegin () const
{
return const_reverse_iterator (m_lists.crbegin(),
Transform <true>());
}
reverse_iterator rend ()
{
return reverse_iterator (m_lists.rend(),
Transform <false>());
}
const_reverse_iterator rend () const
{
return const_reverse_iterator (m_lists.crend(),
Transform <true>());
}
const_reverse_iterator crend () const
{
return const_reverse_iterator (m_lists.crend(),
Transform <true>());
}
/** Shuffle each hop list. */
void shuffle ();
std::string histogram() const;
private:
explicit hops_t (Allocator const& alloc);
void insert (Element& e);
// Reinsert e at a new hops
void reinsert (Element& e, int hops);
void remove (Element& e);
friend class Livecache;
lists_type m_lists;
Histogram m_hist;
} hops;
/** Returns `true` if the cache is empty. */
bool empty () const
{
return m_byAddress.empty ();
return m_cache.empty ();
}
/** Returns the number of entries in the cache. */
AddressTable::size_type size() const
typename cache_type::size_type size() const
{
return m_byAddress.size();
return m_cache.size();
}
/** Erase entries whose time has expired. */
void sweep ()
{
auto const now (m_clock.now ());
AddressTable::size_type count (0);
for (EntryList::iterator iter (m_list.begin());
iter != m_list.end();)
{
// Short circuit the loop since the list is sorted
if (iter->whenExpires > now)
break;
Entry& entry (*iter);
if (m_journal.trace) m_journal.trace << leftw (18) <<
"Livecache expired " << entry.endpoint.address;
// Must erase from list before map
iter = m_list.erase (iter);
meets_postcondition (m_bySorted.erase (
entry.endpoint) == 1);
meets_postcondition (m_byAddress.erase (
entry.endpoint.address) == 1);
++count;
}
void expire ();
if (count > 0)
{
if (m_journal.debug) m_journal.debug << leftw (18) <<
"Livecache expired " << count <<
((count > 1) ? " entries" : " entry");
}
}
/** Creates or updates an existing entry based on a new message. */
void insert (Endpoint endpoint)
{
// Caller is responsible for validation
check_precondition (endpoint.hops <= Tuning::maxHops);
auto now (m_clock.now ());
auto const whenExpires (now + Tuning::liveCacheSecondsToLive);
std::pair <AddressTable::iterator, bool> result (
m_byAddress.emplace (std::piecewise_construct,
std::make_tuple (endpoint.address),
std::make_tuple (endpoint, whenExpires)));
Entry& entry (result.first->second);
// Drop duplicates at higher hops
if (! result.second && (endpoint.hops > entry.endpoint.hops))
{
std::size_t const excess (
endpoint.hops - entry.endpoint.hops);
if (m_journal.trace) m_journal.trace << leftw(18) <<
"Livecache drop " << endpoint.address <<
" at hops +" << excess;
return;
}
// Update metadata if the address already exists
if (! result.second)
{
meets_postcondition (m_bySorted.erase (
result.first->second.endpoint) == 1);
if (endpoint.hops < entry.endpoint.hops)
{
if (m_journal.debug) m_journal.debug << leftw (18) <<
"Livecache update " << endpoint.address <<
" at hops " << endpoint.hops;
entry.endpoint.hops = endpoint.hops;
}
else
{
if (m_journal.trace) m_journal.trace << leftw (18) <<
"Livecache refresh " << endpoint.address <<
" at hops " << endpoint.hops;
}
entry.whenExpires = whenExpires;
m_list.erase (m_list.iterator_to(entry));
}
else
{
if (m_journal.debug) m_journal.debug << leftw (18) <<
"Livecache insert " << endpoint.address <<
" at hops " << endpoint.hops;
}
meets_postcondition (m_bySorted.insert (entry.endpoint).second);
m_list.push_back (entry);
}
/** Returns the full set of endpoints in a Giveaways class. */
Giveaways giveaways()
{
Endpoints endpoints;
endpoints.reserve (m_list.size());
for (EntryList::const_iterator iter (m_list.cbegin());
iter != m_list.cend(); ++iter)
{
endpoints.push_back (iter->endpoint);
endpoints.back ().hops;
}
if (! endpoints.empty())
return Giveaways (endpoints);
return Giveaways (endpoints);
}
/** Returns an ordered list all entries with unique addresses. */
Endpoints fetch_unique () const
{
Endpoints result;
if (m_bySorted.empty ())
return result;
result.reserve (m_bySorted.size ());
Endpoint const& front (*m_bySorted.begin());
IP::Address prev (front.address.address());
result.emplace_back (front);
for (SortedTable::const_iterator iter (++m_bySorted.begin());
iter != m_bySorted.end(); ++iter)
{
IP::Address const addr (iter->address.address());
if (addr != prev)
{
result.emplace_back (*iter);
++result.back().hops;
prev = addr;
}
}
return result;
}
/** Creates or updates an existing Element based on a new message. */
void insert (Endpoint const& ep);
/** Produce diagnostic output. */
void dump (Journal::ScopedStream& ss) const
{
ss << std::endl << std::endl <<
"Livecache (size " << m_byAddress.size() << ")";
for (AddressTable::const_iterator iter (m_byAddress.begin());
iter != m_byAddress.end(); ++iter)
{
Entry const& entry (iter->second);
ss << std::endl <<
entry.endpoint.address << ", " <<
entry.endpoint.hops << " hops";
}
}
/** Returns a histogram of message counts by hops. */
typedef boost::array <int, Tuning::maxHops + 1> Histogram;
Histogram histogram () const
{
Histogram h;
for (Histogram::iterator iter (h.begin());
iter != h.end(); ++iter)
*iter = 0;
for (EntryList::const_iterator iter (m_list.begin());
iter != m_list.end(); ++iter)
++h[iter->endpoint.hops];
return h;
}
void dump (Journal::ScopedStream& ss) const;
/** Output statistics. */
void onWrite (PropertyStream::Map& map)
{
clock_type::time_point const now (m_clock.now ());
map ["size"] = size ();
PropertyStream::Set set ("entries", map);
for (auto entry : m_byAddress)
{
PropertyStream::Map item (set);
Entry const& e (entry.second);
item ["hops"] = e.endpoint.hops;
item ["address"] = e.endpoint.address.to_string ();
std::stringstream ss;
ss << e.whenExpires - now;
item ["expires"] = ss.str();
}
}
void onWrite (PropertyStream::Map& map);
};
//------------------------------------------------------------------------------
template <class Allocator>
Livecache <Allocator>::Livecache (
clock_type& clock,
Journal journal,
Allocator alloc)
: m_journal (journal)
, m_cache (clock, alloc)
, hops (alloc)
{
}
template <class Allocator>
void
Livecache <Allocator>::expire()
{
std::size_t n (0);
typename cache_type::time_point const expired (
m_cache.clock().now() - Tuning::liveCacheSecondsToLive);
for (auto iter (m_cache.chronological.begin());
iter != m_cache.chronological.end() && iter.when() <= expired;)
{
Element& e (iter->second);
hops.remove (e);
iter = m_cache.erase (iter);
++n;
}
if (n > 0)
{
if (m_journal.debug) m_journal.debug << leftw (18) <<
"Livecache expired " << n <<
((n > 1) ? " entries" : " entry");
}
}
template <class Allocator>
void Livecache <Allocator>::insert (Endpoint const& ep)
{
// The caller already incremented hop, so if we got a
// message at maxHops we will store it at maxHops + 1.
// This means we won't give out the address to other peers
// but we will use it to make connections and hand it out
// when redirecting.
//
assert (ep.hops <= (Tuning::maxHops + 1));
std::pair <typename cache_type::iterator, bool> result (
m_cache.emplace (ep.address, ep));
Element& e (result.first->second);
if (result.second)
{
hops.insert (e);
if (m_journal.debug) m_journal.debug << leftw (18) <<
"Livecache insert " << ep.address <<
" at hops " << ep.hops;
return;
}
else if (! result.second && (ep.hops > e.endpoint.hops))
{
// Drop duplicates at higher hops
std::size_t const excess (
ep.hops - e.endpoint.hops);
if (m_journal.trace) m_journal.trace << leftw(18) <<
"Livecache drop " << ep.address <<
" at hops +" << excess;
return;
}
m_cache.touch (result.first);
// Address already in the cache so update metadata
if (ep.hops < e.endpoint.hops)
{
hops.reinsert (e, ep.hops);
if (m_journal.debug) m_journal.debug << leftw (18) <<
"Livecache update " << ep.address <<
" at hops " << ep.hops;
}
else
{
if (m_journal.trace) m_journal.trace << leftw (18) <<
"Livecache refresh " << ep.address <<
" at hops " << ep.hops;
}
}
template <class Allocator>
void
Livecache <Allocator>::dump (Journal::ScopedStream& ss) const
{
ss << std::endl << std::endl <<
"Livecache (size " << m_cache.size() << ")";
for (auto const& entry : m_cache)
{
auto const& e (entry.second);
ss << std::endl <<
e.endpoint.address << ", " <<
e.endpoint.hops << " hops";
}
}
template <class Allocator>
void
Livecache <Allocator>::onWrite (PropertyStream::Map& map)
{
typename cache_type::time_point const expired (
m_cache.clock().now() - Tuning::liveCacheSecondsToLive);
map ["size"] = size ();
map ["hist"] = hops.histogram();
PropertyStream::Set set ("entries", map);
for (auto iter (m_cache.cbegin()); iter != m_cache.cend(); ++iter)
{
auto const& e (iter->second);
PropertyStream::Map item (set);
item ["hops"] = e.endpoint.hops;
item ["address"] = e.endpoint.address.to_string ();
std::stringstream ss;
ss << iter.when() - expired;
item ["expires"] = ss.str();
}
}
//------------------------------------------------------------------------------
template <class Allocator>
void
Livecache <Allocator>::hops_t::shuffle()
{
for (auto& list : m_lists)
{
std::vector <std::reference_wrapper <Element>> v;
v.reserve (list.size());
std::copy (list.begin(), list.end(),
std::back_inserter (v));
std::random_shuffle (v.begin(), v.end());
list.clear();
for (auto& e : v)
list.push_back (e);
}
}
template <class Allocator>
std::string
Livecache <Allocator>::hops_t::histogram() const
{
std::stringstream ss;
for (auto i : m_hist)
ss <<
i <<
((i < Tuning::maxHops + 1) ? ", " : "");
return ss.str();
}
template <class Allocator>
Livecache <Allocator>::hops_t::hops_t (Allocator const& alloc)
{
std::fill (m_hist.begin(), m_hist.end(), 0);
}
template <class Allocator>
void
Livecache <Allocator>::hops_t::insert (Element& e)
{
assert (e.endpoint.hops >= 0 &&
e.endpoint.hops <= Tuning::maxHops + 1);
// This has security implications without a shuffle
m_lists [e.endpoint.hops].push_front (e);
++m_hist [e.endpoint.hops];
}
template <class Allocator>
void
Livecache <Allocator>::hops_t::reinsert (Element& e, int hops)
{
assert (hops >= 0 && hops <= Tuning::maxHops + 1);
list_type& list (m_lists [e.endpoint.hops]);
list.erase (list.iterator_to (e));
--m_hist [e.endpoint.hops];
e.endpoint.hops = hops;
insert (e);
}
template <class Allocator>
void
Livecache <Allocator>::hops_t::remove (Element& e)
{
--m_hist [e.endpoint.hops];
list_type& list (m_lists [e.endpoint.hops]);
list.erase (list.iterator_to (e));
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -36,9 +36,7 @@ public:
SerializedContext m_context;
CheckerAdapter m_checker;
Logic m_logic;
DeadlineTimer m_connectTimer;
DeadlineTimer m_messageTimer;
DeadlineTimer m_cacheTimer;
DeadlineTimer m_secondsTimer;
//--------------------------------------------------------------------------
@@ -56,9 +54,7 @@ public:
, m_store (journal)
, m_checker (m_context, m_queue)
, m_logic (clock, callback, m_store, m_checker, journal)
, m_connectTimer (this)
, m_messageTimer (this)
, m_cacheTimer (this)
, m_secondsTimer (this)
{
}
@@ -223,9 +219,7 @@ public:
m_journal.debug << "Stopping";
m_checker.cancel ();
m_logic.stop ();
m_connectTimer.cancel();
m_messageTimer.cancel();
m_cacheTimer.cancel();
m_secondsTimer.cancel();
m_queue.dispatch (
m_context.wrap (
bind (&Thread::signalThreadShouldExit, this)));
@@ -248,35 +242,14 @@ public:
void onDeadlineTimer (DeadlineTimer& timer)
{
if (timer == m_connectTimer)
if (timer == m_secondsTimer)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::makeOutgoingConnections, &m_logic)));
bind (&Logic::periodicActivity, &m_logic)));
m_connectTimer.setExpiration (Tuning::secondsPerConnect);
m_secondsTimer.setExpiration (Tuning::secondsPerConnect);
}
else if (timer == m_messageTimer)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::broadcast, &m_logic)));
m_messageTimer.setExpiration (Tuning::secondsPerMessage);
}
else if (timer == m_cacheTimer)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::sweepCache, &m_logic)));
m_cacheTimer.setExpiration (Tuning::liveCacheSecondsToLive);
}
// VFALCO NOTE Bit of a hack here...
m_queue.dispatch (
m_context.wrap (
bind (&Logic::periodicActivity, &m_logic)));
}
void init ()
@@ -299,12 +272,7 @@ public:
m_logic.load ();
}
m_connectTimer.setExpiration (Tuning::secondsPerConnect);
m_messageTimer.setExpiration (Tuning::secondsPerMessage);
m_cacheTimer.setExpiration (Tuning::liveCacheSecondsToLive);
m_queue.post (m_context.wrap (
bind (&Logic::makeOutgoingConnections, &m_logic)));
m_secondsTimer.setExpiration (std::chrono::seconds (1));
}
void run ()

View File

@@ -0,0 +1,70 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "RedirectHandouts.h"
namespace ripple {
namespace PeerFinder {
RedirectHandouts::RedirectHandouts (SlotImp::ptr const& slot)
: m_slot (slot)
{
m_list.reserve (Tuning::redirectEndpointCount);
}
bool
RedirectHandouts::try_insert (Endpoint const& ep)
{
if (full ())
return false;
// VFALCO NOTE This check can be removed when we provide the
// addresses in a peer HTTP handshake instead of
// the tmENDPOINTS message.
//
if (ep.hops > Tuning::maxHops)
return false;
// Don't send them our address
if (ep.hops == 0)
return false;
// Don't send them their own address
if (m_slot->remote_endpoint().address() ==
ep.address.address())
return false;
// Make sure the address isn't already in our list
if (std::any_of (m_list.begin(), m_list.end(),
[&ep](Endpoint const& other)
{
// Ignore port for security reasons
return other.address.address() == ep.address.address();
}))
{
return false;
}
m_list.emplace_back (ep.address, ep.hops);
return true;
}
}
}

View File

@@ -17,13 +17,44 @@
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_SEEN_H_INCLUDED
#define RIPPLE_PEERFINDER_SEEN_H_INCLUDED
#ifndef RIPPLE_PEERFINDER_REDIRECTHANDOUTS_H_INCLUDED
#define RIPPLE_PEERFINDER_REDIRECTHANDOUTS_H_INCLUDED
#include "SlotImp.h"
#include "Tuning.h"
namespace ripple {
namespace PeerFinder {
/** Tracks endpoints we've seen from a peer. */
/** Receives handouts for redirecting a connection.
An incoming connection request is redirected when we are full on slots.
*/
class RedirectHandouts
{
public:
RedirectHandouts (SlotImp::ptr const& slot);
bool full () const
{
return m_list.size() >= Tuning::redirectEndpointCount;
}
SlotImp::ptr const& slot () const
{
return m_slot;
}
std::vector <Endpoint> const& list() const
{
return m_list;
}
bool try_insert (Endpoint const& ep);
private:
SlotImp::ptr m_slot;
std::vector <Endpoint> m_list;
};
}
}

View File

@@ -0,0 +1,70 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "SlotHandouts.h"
namespace ripple {
namespace PeerFinder {
SlotHandouts::SlotHandouts (SlotImp::ptr const& slot)
: m_slot (slot)
{
m_list.reserve (Tuning::numberOfEndpoints);
}
bool
SlotHandouts::try_insert (Endpoint const& ep)
{
if (full ())
return false;
if (ep.hops > Tuning::maxHops)
return false;
if (m_slot->recent.filter (ep.address, ep.hops))
return false;
// Don't send them their own address
if (m_slot->remote_endpoint().address() ==
ep.address.address())
return false;
// Make sure the address isn't already in our list
if (std::any_of (m_list.begin(), m_list.end(),
[&ep](Endpoint const& other)
{
// Ignore port for security reasons
return other.address.address() == ep.address.address();
}))
return false;
m_list.emplace_back (ep.address, ep.hops);
// Insert into this slot's recent table. Although the endpoint
// didn't come from the slot, adding it to the slot's table
// prevents us from sending it again until it has expired from
// the other end's cache.
//
m_slot->recent.insert (ep.address, ep.hops);
return true;
}
}
}

View File

@@ -17,34 +17,46 @@
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_SORTS_H_INCLUDED
#define RIPPLE_PEERFINDER_SORTS_H_INCLUDED
#ifndef RIPPLE_PEERFINDER_SLOTHANDOUTS_H_INCLUDED
#define RIPPLE_PEERFINDER_SLOTHANDOUTS_H_INCLUDED
#include "SlotImp.h"
#include "Tuning.h"
namespace ripple {
namespace PeerFinder {
/** Total ordering for Endpoint.
The ordering must have these properties:
- Endpoints with addresses differing only by port should be
sorted adjacent, by descending hop count.
- The port number must participate in the ordering
*/
struct LessEndpoints
/** Functor to receive endpoints for a slot during handout. */
class SlotHandouts
{
bool operator() (Endpoint const& lhs, Endpoint const& rhs) const
public:
explicit SlotHandouts (SlotImp::ptr const& slot);
bool full () const
{
if (lhs.address.address() < rhs.address.address())
return true;
if (lhs.address.address() > rhs.address.address())
return false;
// Break ties by preferring higher hops
if (lhs.hops > rhs.hops)
return true;
return lhs.address.port () < rhs.address.port ();
return m_list.size() >= Tuning::numberOfEndpoints;
}
void insert (Endpoint const& ep)
{
m_list.push_back (ep);
}
SlotImp::ptr const& slot () const
{
return m_slot;
}
std::vector <Endpoint> const& list() const
{
return m_list;
}
bool try_insert (Endpoint const& ep);
private:
SlotImp::ptr m_slot;
std::vector <Endpoint> m_list;
};
}

View File

@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "SlotImp.h"
namespace ripple {
namespace PeerFinder {
SlotImp::SlotImp (IP::Endpoint const& local_endpoint,
IP::Endpoint const& remote_endpoint, bool fixed,
clock_type& clock)
: recent (clock)
, m_inbound (true)
, m_fixed (fixed)
, m_cluster (false)
, m_state (accept)
, m_remote_endpoint (remote_endpoint)
, m_local_endpoint (local_endpoint)
, checked (false)
, canAccept (false)
, connectivityCheckInProgress (false)
{
}
SlotImp::SlotImp (IP::Endpoint const& remote_endpoint,
bool fixed, clock_type& clock)
: recent (clock)
, m_inbound (false)
, m_fixed (fixed)
, m_cluster (false)
, m_state (connect)
, m_remote_endpoint (remote_endpoint)
, checked (true)
, canAccept (true)
, connectivityCheckInProgress (false)
{
}
void
SlotImp::state (State state_)
{
// Must go through activate() to set active state
assert (state_ != active);
// The state must be different
assert (state_ != m_state);
// You can't transition into the initial states
assert (state_ != accept && state_ != connect);
// Can only become connected from outbound connect state
assert (state_ != connected || (! m_inbound && m_state == connect));
// Can't gracefully close on an outbound connection attempt
assert (state_ != closing || m_state != connect);
m_state = state_;
}
void
SlotImp::activate (clock_type::time_point const& now)
{
// Can only become active from the accept or connected state
assert (m_state == accept || m_state == connected);
m_state = active;
whenAcceptEndpoints = now;
}
//------------------------------------------------------------------------------
Slot::~Slot ()
{
}
//------------------------------------------------------------------------------
SlotImp::recent_t::recent_t (clock_type& clock)
: cache (clock)
{
}
void
SlotImp::recent_t::insert (IP::Endpoint const& ep, int hops)
{
auto const result (cache.emplace (ep, hops));
if (! result.second)
{
// NOTE Other logic depends on this <= inequality.
if (hops <= result.first->second)
{
result.first->second = hops;
cache.touch (result.first);
}
}
}
bool
SlotImp::recent_t::filter (IP::Endpoint const& ep, int hops)
{
auto const iter (cache.find (ep));
if (iter == cache.end())
return false;
// We avoid sending an endpoint if we heard it
// from them recently at the same or lower hop count.
// NOTE Other logic depends on this <= inequality.
return iter->second <= hops;
}
void
SlotImp::recent_t::expire ()
{
beast::expire (cache,
Tuning::liveCacheSecondsToLive);
}
}
}

View File

@@ -22,6 +22,9 @@
#include "../api/Slot.h"
#include "../../../beast/beast/container/aged_unordered_map.h"
#include "../../../beast/beast/container/aged_container_utility.h"
#include <boost/optional.hpp>
namespace ripple {
@@ -29,40 +32,20 @@ namespace PeerFinder {
class SlotImp : public Slot
{
private:
typedef beast::aged_unordered_map <IP::Endpoint, int> recent_type;
public:
typedef std::shared_ptr <SlotImp> ptr;
// inbound
SlotImp (IP::Endpoint const& local_endpoint,
IP::Endpoint const& remote_endpoint, bool fixed)
: m_inbound (true)
, m_fixed (fixed)
, m_cluster (false)
, m_state (accept)
, m_remote_endpoint (remote_endpoint)
, m_local_endpoint (local_endpoint)
, checked (false)
, canAccept (false)
, connectivityCheckInProgress (false)
{
}
IP::Endpoint const& remote_endpoint, bool fixed,
clock_type& clock);
// outbound
SlotImp (IP::Endpoint const& remote_endpoint, bool fixed)
: m_inbound (false)
, m_fixed (fixed)
, m_cluster (false)
, m_state (connect)
, m_remote_endpoint (remote_endpoint)
, checked (true)
, canAccept (true)
, connectivityCheckInProgress (false)
{
}
~SlotImp ()
{
}
SlotImp (IP::Endpoint const& remote_endpoint,
bool fixed, clock_type& clock);
bool inbound () const
{
@@ -99,38 +82,6 @@ public:
return m_public_key;
}
//--------------------------------------------------------------------------
void state (State state_)
{
// Must go through activate() to set active state
assert (state_ != active);
// The state must be different
assert (state_ != m_state);
// You can't transition into the initial states
assert (state_ != accept && state_ != connect);
// Can only become connected from outbound connect state
assert (state_ != connected || (! m_inbound && m_state == connect));
// Can't gracefully close on an outbound connection attempt
assert (state_ != closing || m_state != connect);
m_state = state_;
}
void activate (clock_type::time_point const& now)
{
// Can only become active from the accept or connected state
assert (m_state == accept || m_state == connected);
m_state = active;
whenSendEndpoints = now;
whenAcceptEndpoints = now;
}
void local_endpoint (IP::Endpoint const& endpoint)
{
m_local_endpoint = endpoint;
@@ -151,6 +102,43 @@ public:
m_cluster = cluster_;
}
//--------------------------------------------------------------------------
void state (State state_);
void activate (clock_type::time_point const& now);
// "Memberspace"
//
// The set of all recent addresses that we have seen from this peer.
// We try to avoid sending a peer the same addresses they gave us.
//
class recent_t
{
public:
explicit recent_t (clock_type& clock);
/** Called for each valid endpoint received for a slot.
We also insert messages that we send to the slot to prevent
sending a slot the same address too frequently.
*/
void insert (IP::Endpoint const& ep, int hops);
/** Returns `true` if we should not send endpoint to the slot. */
bool filter (IP::Endpoint const& ep, int hops);
private:
void expire ();
friend class SlotImp;
recent_type cache;
} recent;
void expire()
{
recent.expire();
}
private:
bool const m_inbound;
bool const m_fixed;
@@ -175,27 +163,13 @@ public:
// progress. Valid always.
bool connectivityCheckInProgress;
// The time after which we will send the peer mtENDPOINTS
clock_type::time_point whenSendEndpoints;
// The time after which we will accept mtENDPOINTS from the peer
// This is to prevent flooding or spamming. Receipt of mtENDPOINTS
// sooner than the allotted time should impose a load charge.
//
clock_type::time_point whenAcceptEndpoints;
// The set of all recent IP::Endpoint that we have seen from this peer.
// We try to avoid sending a peer the same addresses they gave us.
//
//std::set <IP::Endpoint> received;
};
//------------------------------------------------------------------------------
Slot::~Slot ()
{
}
}
}

View File

@@ -29,18 +29,17 @@ class Store
public:
virtual ~Store () { }
struct SavedBootstrapAddress
// load the bootstrap cache
typedef std::function <void (IP::Endpoint, int)> load_callback;
virtual std::size_t load (load_callback const& cb) = 0;
// save the bootstrap cache
struct Entry
{
IP::Endpoint address;
std::chrono::seconds cumulativeUptime;
int connectionValence;
IP::Endpoint endpoint;
int valence;
};
virtual std::vector <SavedBootstrapAddress>
loadBootstrapCache () = 0;
virtual void updateBootstrapCache (
std::vector <SavedBootstrapAddress> const& list) = 0;
virtual void save (std::vector <Entry> const& v) = 0;
};
}

View File

@@ -36,7 +36,7 @@ public:
enum
{
// This determines the on-database format of the data
currentSchemaVersion = 3
currentSchemaVersion = 4
};
explicit StoreSqdb (Journal journal = Journal())
@@ -54,136 +54,95 @@ public:
m_journal.info << "Opening database at '" << file.getFullPathName() << "'";
if (!error)
if (! error)
error = init ();
if (!error)
if (! error)
error = update ();
return error;
}
// Loads the entire stored bootstrap cache and returns it as an array.
// Loads the bootstrap cache, calling the callback for each entry
//
std::vector <SavedBootstrapAddress> loadBootstrapCache ()
std::size_t load (load_callback const& cb)
{
std::vector <SavedBootstrapAddress> list;
std::size_t n (0);
Error error;
std::string s;
int valence;
sqdb::statement st = (m_session.prepare <<
"SELECT "
" address, "
" valence "
"FROM PeerFinder_BootstrapCache "
, sqdb::into (s)
, sqdb::into (valence)
);
// Get the count
std::size_t count;
if (! error)
if (st.execute_and_fetch (error))
{
m_session.once (error) <<
"SELECT COUNT(*) FROM PeerFinder_BootstrapCache "
,sqdb::into (count)
;
}
if (error)
{
report (error, __FILE__, __LINE__);
return list;
}
list.reserve (count);
{
std::string s;
std::chrono::seconds::rep uptimeSeconds;
int connectionValence;
sqdb::statement st = (m_session.prepare <<
"SELECT "
" address, "
" uptime, "
" valence "
"FROM PeerFinder_BootstrapCache "
, sqdb::into (s)
, sqdb::into (uptimeSeconds)
, sqdb::into (connectionValence)
);
if (st.execute_and_fetch (error))
do
{
do
IP::Endpoint const endpoint (
IP::Endpoint::from_string (s));
if (! is_unspecified (endpoint))
{
SavedBootstrapAddress entry;
entry.address = IP::Endpoint::from_string (s);
if (! is_unspecified (entry.address))
{
entry.cumulativeUptime = std::chrono::seconds (uptimeSeconds);
entry.connectionValence = connectionValence;
list.push_back (entry);
}
else
{
m_journal.error <<
"Bad address string '" << s << "' in Bootcache table";
}
cb (endpoint, valence);
++n;
}
else
{
m_journal.error <<
"Bad address string '" << s << "' in Bootcache table";
}
while (st.fetch (error));
}
while (st.fetch (error));
}
if (error)
{
report (error, __FILE__, __LINE__);
}
return list;
return n;
}
// Overwrites the stored bootstrap cache with the specified array.
//
void updateBootstrapCache (
std::vector <SavedBootstrapAddress> const& list)
void save (std::vector <Entry> const& v)
{
Error error;
sqdb::transaction tr (m_session);
m_session.once (error) <<
"DELETE FROM PeerFinder_BootstrapCache";
if (! error)
{
std::string s;
std::chrono::seconds::rep uptimeSeconds;
int connectionValence;
int valence;
sqdb::statement st = (m_session.prepare <<
"INSERT INTO PeerFinder_BootstrapCache ( "
" address, "
" uptime, "
" valence "
") VALUES ( "
" ?, ?, ? "
" ?, ? "
");"
, sqdb::use (s)
, sqdb::use (uptimeSeconds)
, sqdb::use (connectionValence)
, sqdb::use (valence)
);
for (std::vector <SavedBootstrapAddress>::const_iterator iter (
list.begin()); !error && iter != list.end(); ++iter)
for (auto const& e : v)
{
s = to_string (iter->address);
uptimeSeconds = iter->cumulativeUptime.count ();
connectionValence = iter->connectionValence;
s = to_string (e.endpoint);
valence = e.valence;
st.execute_and_fetch (error);
if (error)
break;
}
}
if (! error)
{
error = tr.commit();
}
if (error)
{
@@ -203,7 +162,7 @@ public:
// get version
int version (0);
if (!error)
if (! error)
{
m_session.once (error) <<
"SELECT "
@@ -223,35 +182,155 @@ public:
}
}
if (!error && version != currentSchemaVersion)
{
m_journal.info <<
"Updating database to version " << currentSchemaVersion;
}
if (!error)
{
if (version < 3)
if (version < currentSchemaVersion)
m_journal.info <<
"Updating database to version " << currentSchemaVersion;
else if (version > currentSchemaVersion)
error.fail (__FILE__, __LINE__,
"The PeerFinder database version is higher than expected");
}
if (! error && version < 4)
{
//
// Remove the "uptime" column from the bootstrap table
//
if (! error)
m_session.once (error) <<
"CREATE TABLE IF NOT EXISTS PeerFinder_BootstrapCache_Next ( "
" id INTEGER PRIMARY KEY AUTOINCREMENT, "
" address TEXT UNIQUE NOT NULL, "
" valence INTEGER"
");"
;
if (! error)
m_session.once (error) <<
"CREATE INDEX IF NOT EXISTS "
" PeerFinder_BootstrapCache_Next_Index ON "
" PeerFinder_BootstrapCache_Next "
" ( address ); "
;
std::size_t count;
if (! error)
m_session.once (error) <<
"SELECT COUNT(*) FROM PeerFinder_BootstrapCache "
,sqdb::into (count)
;
std::vector <Store::Entry> list;
if (! error)
{
if (!error)
m_session.once (error) <<
"DROP TABLE IF EXISTS LegacyEndpoints";
list.reserve (count);
std::string s;
int valence;
sqdb::statement st = (m_session.prepare <<
"SELECT "
" address, "
" valence "
"FROM PeerFinder_BootstrapCache "
, sqdb::into (s)
, sqdb::into (valence)
);
if (!error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinderLegacyEndpoints";
if (!error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints";
if (!error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints_Index";
if (st.execute_and_fetch (error))
{
do
{
Store::Entry entry;
entry.endpoint = IP::Endpoint::from_string (s);
if (! is_unspecified (entry.endpoint))
{
entry.valence = valence;
list.push_back (entry);
}
else
{
m_journal.error <<
"Bad address string '" << s << "' in Bootcache table";
}
}
while (st.fetch (error));
}
}
if (! error)
{
std::string s;
int valence;
sqdb::statement st = (m_session.prepare <<
"INSERT INTO PeerFinder_BootstrapCache_Next ( "
" address, "
" valence "
") VALUES ( "
" ?, ?"
");"
, sqdb::use (s)
, sqdb::use (valence)
);
for (auto iter (list.cbegin());
!error && iter != list.cend(); ++iter)
{
s = to_string (iter->endpoint);
valence = iter->valence;
st.execute_and_fetch (error);
}
}
if (! error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinder_BootstrapCache";
if (! error)
m_session.once (error) <<
"DROP INDEX IF EXISTS PeerFinder_BootstrapCache_Index";
if (! error)
m_session.once (error) <<
"ALTER TABLE PeerFinder_BootstrapCache_Next "
" RENAME TO PeerFinder_BootstrapCache";
if (! error)
m_session.once (error) <<
"CREATE INDEX IF NOT EXISTS "
" PeerFinder_BootstrapCache_Index ON PeerFinder_BootstrapCache "
" ( "
" address "
" ); "
;
}
if (!error)
if (! error && version < 3)
{
//
// Remove legacy endpoints from the schema
//
if (! error)
m_session.once (error) <<
"DROP TABLE IF EXISTS LegacyEndpoints";
if (! error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinderLegacyEndpoints";
if (! error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints";
if (! error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints_Index";
}
if (! error)
{
int const version (currentSchemaVersion);
m_session.once (error) <<
@@ -264,7 +343,7 @@ public:
,sqdb::use(version);
}
if (!error)
if (! error)
error = tr.commit();
if (error)
@@ -283,35 +362,27 @@ private:
sqdb::transaction tr (m_session);
if (! error)
{
m_session.once (error) <<
"PRAGMA encoding=\"UTF-8\"";
}
if (! error)
{
m_session.once (error) <<
"CREATE TABLE IF NOT EXISTS SchemaVersion ( "
" name TEXT PRIMARY KEY, "
" version INTEGER"
");"
;
}
if (! error)
{
m_session.once (error) <<
"CREATE TABLE IF NOT EXISTS PeerFinder_BootstrapCache ( "
" id INTEGER PRIMARY KEY AUTOINCREMENT, "
" address TEXT UNIQUE NOT NULL, "
" uptime INTEGER,"
" valence INTEGER"
");"
;
}
if (! error)
{
m_session.once (error) <<
"CREATE INDEX IF NOT EXISTS "
" PeerFinder_BootstrapCache_Index ON PeerFinder_BootstrapCache "
@@ -319,12 +390,9 @@ private:
" address "
" ); "
;
}
if (! error)
{
error = tr.commit();
}
if (error)
{

View File

@@ -58,18 +58,6 @@ enum
/** The default value of Config::maxPeers. */
,defaultMaxPeers = 21
//---------------------------------------------------------
//
// LegacyEndpoints
//
//---------------------------------------------------------
// How many legacy endpoints to keep in our cache
,legacyEndpointCacheSize = 1000
// How many cache mutations between each database update
,legacyEndpointMutationsPerUpdate = 50
};
//------------------------------------------------------------------------------
@@ -111,21 +99,17 @@ static std::chrono::seconds const bootcacheCooldownTime (60);
enum
{
// Drop incoming messages with hops greater than this number
maxHops = 10
maxHops = 6
// How many Endpoint to send in each mtENDPOINTS
,numberOfEndpoints = 10
,numberOfEndpoints = 2 * maxHops
// The most Endpoint we will accept in mtENDPOINTS
,numberOfEndpointsMax = 20
// The maximum number of hops that we allow. Peers farther
// away than this are dropped.
,maxPeerHopCount = 10
// The number of peers that we want by default, unless an
// explicit value is set in the config file.
,defaultMaxPeerCount = 20
,defaultMaxPeerCount = 21
/** Number of addresses we provide when redirecting. */
,redirectEndpointCount = 10
@@ -136,7 +120,16 @@ static std::chrono::seconds const secondsPerMessage (5);
// How long an Endpoint will stay in the cache
// This should be a small multiple of the broadcast frequency
static std::chrono::seconds const liveCacheSecondsToLive (60);
static std::chrono::seconds const liveCacheSecondsToLive (30);
//
//
//
// How much time to wait before trying an outgoing address again.
// Note that we ignore the port for purposes of comparison.
static std::chrono::seconds const recentAttemptDuration (60);
}
/** @} */

View File

@@ -0,0 +1,88 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_HANDOUT_H_INCLUDED
#define RIPPLE_PEERFINDER_HANDOUT_H_INCLUDED
namespace ripple {
namespace PeerFinder {
namespace detail {
/** Tries to insert one object in the target.
When an item is handed out it is moved to the end of the container.
@return The number of objects handed out
*/
// VFALCO TODO specialization that handles std::list for SequenceContainer
// using splice for optimization over erase/push_back
//
template <class Target, class HopContainer>
std::size_t handout_one (Target& t, HopContainer& h)
{
assert (! t.full());
for (auto hi (h.begin()); hi != h.end(); ++hi)
{
auto const& e (*hi);
if (t.try_insert (e))
{
h.move_back (hi);
return 1;
}
}
return 0;
}
}
/** Distributes objects to targets according to business rules.
A best effort is made to evenly distribute items in the sequence
container list into the target sequence list.
*/
template <class TargetFwdIter, class SeqFwdIter>
void handout (TargetFwdIter first, TargetFwdIter last,
SeqFwdIter seq_first, SeqFwdIter seq_last)
{
for (;;)
{
std::size_t n (0);
for (auto si (seq_first); si != seq_last; ++si)
{
auto c (*si);
bool all_full (true);
for (auto ti (first); ti != last; ++ti)
{
auto& t (*ti);
if (! t.full())
{
n += detail::handout_one (t, c);
all_full = false;
}
}
if (all_full)
return;
}
if (! n)
break;
}
}
}
}
#endif

View File

@@ -56,38 +56,37 @@ using namespace beast;
#endif
#include "impl/PrivateTypes.h"
# include "impl/Tuning.h"
# include "impl/Checker.h"
#include "impl/Tuning.h"
#include "impl/Checker.h"
#include "impl/CheckerAdapter.h"
# include "impl/Sorts.h"
# include "impl/Giveaways.h"
# include "impl/Livecache.h"
# include "impl/SlotImp.h"
# include "impl/Counts.h"
# include "impl/Source.h"
#include "impl/Livecache.h"
#include "impl/SlotImp.h"
#include "impl/Counts.h"
#include "impl/Source.h"
#include "impl/SourceStrings.h"
# include "impl/Store.h"
# include "impl/Bootcache.h"
//# include "impl/Peer.h"
#include "impl/Store.h"
#include "impl/Bootcache.h"
#include "impl/StoreSqdb.h"
# include "impl/Reporting.h"
#include "impl/Reporting.h"
#include "impl/Logic.h"
#include "impl/Bootcache.cpp"
#include "impl/Checker.cpp"
#include "impl/Config.cpp"
#include "impl/ConnectHandouts.cpp"
#include "impl/Endpoint.cpp"
#include "impl/Livecache.cpp"
#include "impl/Manager.cpp"
#include "impl/RedirectHandouts.cpp"
#include "impl/SlotHandouts.cpp"
#include "impl/SlotImp.cpp"
#include "impl/SourceStrings.cpp"
//#include "sim/sync_timer.h"
#include "sim/GraphAlgorithms.h"
#include "sim/WrappedSink.h"
# include "sim/Predicates.h"
# include "sim/FunctionQueue.h"
# include "sim/Message.h"
# include "sim/NodeSnapshot.h"
# include "sim/Params.h"
#include "sim/Predicates.h"
#include "sim/FunctionQueue.h"
#include "sim/Message.h"
#include "sim/NodeSnapshot.h"
#include "sim/Params.h"
#include "sim/Tests.cpp"

View File

@@ -189,7 +189,7 @@ public:
, m_next_port (m_config.listening_endpoint.port() + 1)
, m_logic (boost::in_place (
boost::ref (clock), boost::ref (*this), boost::ref (*this), boost::ref (*this), m_journal))
, m_whenSweep (m_network.now() + Tuning::liveCacheSecondsToLive)
, m_when_expire (m_network.now() + std::chrono::seconds (1))
{
logic().setConfig (m_config.config);
logic().load ();
@@ -285,10 +285,10 @@ public:
logic().makeOutgoingConnections ();
logic().sendEndpoints ();
if (m_network.now() >= m_whenSweep)
if (m_network.now() >= m_when_expire)
{
logic().sweepCache();
m_whenSweep = m_network.now() + Tuning::liveCacheSecondsToLive;
logic().expire();
m_when_expire = m_network.now() + std::chrono::seconds (1);
}
m_livecache_history.emplace_back (
@@ -533,7 +533,7 @@ private:
Journal m_journal;
IP::Port m_next_port;
boost::optional <Logic> m_logic;
clock_type::time_point m_whenSweep;
clock_type::time_point m_when_expire;
SavedBootstrapAddresses m_bootstrap_cache;
};

View File

@@ -64,8 +64,6 @@ public:
virtual Json::Value json () = 0;
virtual bool isConnected () const = 0;
virtual bool isInCluster () const = 0;
virtual std::string getClusterNodeName() const = 0;

View File

@@ -240,8 +240,6 @@ struct match_peer
bool operator() (Peer::ref peer) const
{
bassert(peer->isConnected());
if(matchPeer && (peer.get () == matchPeer))
return true;
@@ -285,8 +283,6 @@ struct peer_in_set
bool operator() (Peer::ref peer) const
{
bassert(peer->isConnected());
if (peerSet.count (peer->getShortId ()) == 0)
return false;

View File

@@ -560,11 +560,6 @@ public:
return ret;
}
bool isConnected () const
{
// CHECKME should this be stateActive or something else?
return (m_state == stateActive) && !m_detaching;
}
bool isInCluster () const
{
return m_clusterNode;

View File

@@ -316,7 +316,6 @@ public:
void send (PeerFinder::Slot::ptr const& slot,
std::vector <PeerFinder::Endpoint> const& endpoints)
{
bassert (! endpoints.empty());
typedef std::vector <PeerFinder::Endpoint> List;
protocol::TMEndpoints tm;
for (List::const_iterator iter (endpoints.begin());
@@ -346,10 +345,7 @@ public:
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
// VFALCO TODO Why are we checking isConnected?
// That should not be needed
if (peer->isConnected())
peer->sendPacket (msg, false);
peer->sendPacket (msg, false);
}
}