Improve handling of endpoints during peer discovery

This commit is contained in:
Nik Bougalis
2022-01-18 14:39:57 -08:00
parent c5dc00af74
commit 289bc0afd9
10 changed files with 75 additions and 73 deletions

View File

@@ -33,12 +33,15 @@ Endpoint::Endpoint(Address const& addr, Port port) : m_addr(addr), m_port(port)
std::optional<Endpoint> std::optional<Endpoint>
Endpoint::from_string_checked(std::string const& s) Endpoint::from_string_checked(std::string const& s)
{
if (s.size() <= 64)
{ {
std::stringstream is(boost::trim_copy(s)); std::stringstream is(boost::trim_copy(s));
Endpoint endpoint; Endpoint endpoint;
is >> endpoint; is >> endpoint;
if (!is.fail() && is.rdbuf()->in_avail() == 0) if (!is.fail() && is.rdbuf()->in_avail() == 0)
return endpoint; return endpoint;
}
return {}; return {};
} }

View File

@@ -1480,16 +1480,26 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
if (tracking_.load() != Tracking::converged || m->version() != 2) if (tracking_.load() != Tracking::converged || m->version() != 2)
return; return;
// The number is arbitrary and doesn't have any real significance or
// implication for the protocol.
if (m->endpoints_v2().size() >= 1024)
{
charge(Resource::feeBadData);
return;
}
std::vector<PeerFinder::Endpoint> endpoints; std::vector<PeerFinder::Endpoint> endpoints;
endpoints.reserve(m->endpoints_v2().size()); endpoints.reserve(m->endpoints_v2().size());
for (auto const& tm : m->endpoints_v2()) for (auto const& tm : m->endpoints_v2())
{ {
auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint()); auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
if (!result) if (!result)
{ {
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {" JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
<< tm.endpoint() << "}"; << tm.endpoint() << "}";
charge(Resource::feeBadData);
continue; continue;
} }
@@ -1499,10 +1509,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
// time, then we'll verify that their listener can receive incoming // time, then we'll verify that their listener can receive incoming
// by performing a connectivity test. if hops > 0, then we just // by performing a connectivity test. if hops > 0, then we just
// take the address/port we were given // take the address/port we were given
if (tm.hops() == 0)
result = remote_address_.at_port(result->port());
endpoints.emplace_back( endpoints.emplace_back(*result, tm.hops());
tm.hops() > 0 ? *result : remote_address_.at_port(result->port()),
tm.hops());
} }
if (!endpoints.empty()) if (!endpoints.empty())

View File

@@ -112,16 +112,19 @@ struct Config
/** Describes a connectible peer address along with some metadata. */ /** Describes a connectible peer address along with some metadata. */
struct Endpoint struct Endpoint
{ {
Endpoint(); Endpoint() = default;
Endpoint(beast::IP::Endpoint const& ep, int hops_); Endpoint(beast::IP::Endpoint const& ep, std::uint32_t hops_);
int hops; std::uint32_t hops = 0;
beast::IP::Endpoint address; beast::IP::Endpoint address;
}; };
bool inline bool
operator<(Endpoint const& lhs, Endpoint const& rhs); operator<(Endpoint const& lhs, Endpoint const& rhs)
{
return lhs.address < rhs.address;
}
/** A set of Endpoint used for connecting. */ /** A set of Endpoint used for connecting. */
using Endpoints = std::vector<Endpoint>; using Endpoints = std::vector<Endpoint>;

View File

@@ -18,24 +18,15 @@
//============================================================================== //==============================================================================
#include <ripple/peerfinder/PeerfinderManager.h> #include <ripple/peerfinder/PeerfinderManager.h>
#include <ripple/peerfinder/impl/Tuning.h>
namespace ripple { namespace ripple {
namespace PeerFinder { namespace PeerFinder {
Endpoint::Endpoint() : hops(0) Endpoint::Endpoint(beast::IP::Endpoint const& ep, std::uint32_t hops_)
: hops(std::min(hops_, Tuning::maxHops + 1)), address(ep)
{ {
} }
Endpoint::Endpoint(beast::IP::Endpoint const& ep, int hops_)
: hops(hops_), address(ep)
{
}
bool
operator<(Endpoint const& lhs, Endpoint const& rhs)
{
return lhs.address < rhs.address;
}
} // namespace PeerFinder } // namespace PeerFinder
} // namespace ripple } // namespace ripple

View File

@@ -363,7 +363,7 @@ public:
// Reinsert e at a new hops // Reinsert e at a new hops
void void
reinsert(Element& e, int hops); reinsert(Element& e, std::uint32_t hops);
void void
remove(Element& e); remove(Element& e);
@@ -444,8 +444,7 @@ Livecache<Allocator>::insert(Endpoint const& ep)
// when redirecting. // when redirecting.
// //
assert(ep.hops <= (Tuning::maxHops + 1)); assert(ep.hops <= (Tuning::maxHops + 1));
std::pair<typename cache_type::iterator, bool> result( auto result = m_cache.emplace(ep.address, ep);
m_cache.emplace(ep.address, ep));
Element& e(result.first->second); Element& e(result.first->second);
if (result.second) if (result.second)
{ {
@@ -522,12 +521,14 @@ template <class Allocator>
std::string std::string
Livecache<Allocator>::hops_t::histogram() const Livecache<Allocator>::hops_t::histogram() const
{ {
std::stringstream ss; std::string s;
for (typename decltype(m_hist)::size_type i(0); i < m_hist.size(); ++i) for (auto const& h : m_hist)
{ {
ss << m_hist[i] << ((i < Tuning::maxHops + 1) ? ", " : ""); if (!s.empty())
s += ", ";
s += std::to_string(h);
} }
return ss.str(); return s;
} }
template <class Allocator> template <class Allocator>
@@ -540,7 +541,7 @@ template <class Allocator>
void void
Livecache<Allocator>::hops_t::insert(Element& e) Livecache<Allocator>::hops_t::insert(Element& e)
{ {
assert(e.endpoint.hops >= 0 && e.endpoint.hops <= Tuning::maxHops + 1); assert(e.endpoint.hops <= Tuning::maxHops + 1);
// This has security implications without a shuffle // This has security implications without a shuffle
m_lists[e.endpoint.hops].push_front(e); m_lists[e.endpoint.hops].push_front(e);
++m_hist[e.endpoint.hops]; ++m_hist[e.endpoint.hops];
@@ -548,11 +549,13 @@ Livecache<Allocator>::hops_t::insert(Element& e)
template <class Allocator> template <class Allocator>
void void
Livecache<Allocator>::hops_t::reinsert(Element& e, int numHops) Livecache<Allocator>::hops_t::reinsert(Element& e, std::uint32_t numHops)
{ {
assert(numHops >= 0 && numHops <= Tuning::maxHops + 1); assert(numHops <= Tuning::maxHops + 1);
list_type& list(m_lists[e.endpoint.hops]);
auto& list = m_lists[e.endpoint.hops];
list.erase(list.iterator_to(e)); list.erase(list.iterator_to(e));
--m_hist[e.endpoint.hops]; --m_hist[e.endpoint.hops];
e.endpoint.hops = numHops; e.endpoint.hops = numHops;
@@ -564,7 +567,8 @@ void
Livecache<Allocator>::hops_t::remove(Element& e) Livecache<Allocator>::hops_t::remove(Element& e)
{ {
--m_hist[e.endpoint.hops]; --m_hist[e.endpoint.hops];
list_type& list(m_lists[e.endpoint.hops]);
auto& list = m_lists[e.endpoint.hops];
list.erase(list.iterator_to(e)); list.erase(list.iterator_to(e));
} }

View File

@@ -762,6 +762,13 @@ public:
void void
on_endpoints(SlotImp::ptr const& slot, Endpoints list) on_endpoints(SlotImp::ptr const& slot, Endpoints list)
{ {
// If we're sent too many endpoints, sample them at random:
if (list.size() > Tuning::numberOfEndpointsMax)
{
std::shuffle(list.begin(), list.end(), default_prng());
list.resize(Tuning::numberOfEndpointsMax);
}
JLOG(m_journal.trace()) JLOG(m_journal.trace())
<< beast::leftw(18) << "Endpoints from " << slot->remote_endpoint() << beast::leftw(18) << "Endpoints from " << slot->remote_endpoint()
<< " contained " << list.size() << " contained " << list.size()

View File

@@ -102,7 +102,7 @@ SlotImp::recent_t::recent_t(clock_type& clock) : cache(clock)
} }
void void
SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, int hops) SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, std::uint32_t hops)
{ {
auto const result(cache.emplace(ep, hops)); auto const result(cache.emplace(ep, hops));
if (!result.second) if (!result.second)
@@ -117,7 +117,7 @@ SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, int hops)
} }
bool bool
SlotImp::recent_t::filter(beast::IP::Endpoint const& ep, int hops) SlotImp::recent_t::filter(beast::IP::Endpoint const& ep, std::uint32_t hops)
{ {
auto const iter(cache.find(ep)); auto const iter(cache.find(ep));
if (iter == cache.end()) if (iter == cache.end())

View File

@@ -32,9 +32,6 @@ namespace PeerFinder {
class SlotImp : public Slot class SlotImp : public Slot
{ {
private:
using recent_type = beast::aged_unordered_map<beast::IP::Endpoint, int>;
public: public:
using ptr = std::shared_ptr<SlotImp>; using ptr = std::shared_ptr<SlotImp>;
@@ -155,18 +152,18 @@ public:
sending a slot the same address too frequently. sending a slot the same address too frequently.
*/ */
void void
insert(beast::IP::Endpoint const& ep, int hops); insert(beast::IP::Endpoint const& ep, std::uint32_t hops);
/** Returns `true` if we should not send endpoint to the slot. */ /** Returns `true` if we should not send endpoint to the slot. */
bool bool
filter(beast::IP::Endpoint const& ep, int hops); filter(beast::IP::Endpoint const& ep, std::uint32_t hops);
private: private:
void void
expire(); expire();
friend class SlotImp; friend class SlotImp;
recent_type cache; beast::aged_unordered_map<beast::IP::Endpoint, std::uint32_t> cache;
} recent; } recent;
void void

View File

@@ -106,43 +106,30 @@ static std::chrono::seconds const bootcacheCooldownTime(60);
// //
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
enum {
// Drop incoming messages with hops greater than this number // Drop incoming messages with hops greater than this number
maxHops = 6 std::uint32_t constexpr maxHops = 6;
// How many Endpoint to send in each mtENDPOINTS // How many Endpoint to send in each mtENDPOINTS
, std::uint32_t constexpr numberOfEndpoints = 2 * maxHops;
numberOfEndpoints = 2 * maxHops
// The most Endpoint we will accept in mtENDPOINTS // The most Endpoint we will accept in mtENDPOINTS
, std::uint32_t constexpr numberOfEndpointsMax =
numberOfEndpointsMax = 20 std::max<decltype(numberOfEndpoints)>(numberOfEndpoints * 2, 64);
// The number of peers that we want by default, unless an // Number of addresses we provide when redirecting.
// explicit value is set in the config file. std::uint32_t constexpr redirectEndpointCount = 10;
,
defaultMaxPeerCount = 21
/** Number of addresses we provide when redirecting. */
,
redirectEndpointCount = 10
};
// How often we send or accept mtENDPOINTS messages per peer // How often we send or accept mtENDPOINTS messages per peer
// (we use a prime number of purpose) // (we use a prime number of purpose)
static std::chrono::seconds const secondsPerMessage(61); std::chrono::seconds constexpr secondsPerMessage(151);
// How long an Endpoint will stay in the cache // How long an Endpoint will stay in the cache
// This should be a small multiple of the broadcast frequency // This should be a small multiple of the broadcast frequency
static std::chrono::seconds const liveCacheSecondsToLive(30); std::chrono::seconds constexpr liveCacheSecondsToLive(30);
//
//
//
// How much time to wait before trying an outgoing address again. // How much time to wait before trying an outgoing address again.
// Note that we ignore the port for purposes of comparison. // Note that we ignore the port for purposes of comparison.
static std::chrono::seconds const recentAttemptDuration(60); std::chrono::seconds constexpr recentAttemptDuration(60);
} // namespace Tuning } // namespace Tuning
/** @} */ /** @} */

View File

@@ -48,7 +48,7 @@ public:
// Add the address as an endpoint // Add the address as an endpoint
template <class C> template <class C>
inline void inline void
add(beast::IP::Endpoint ep, C& c, int hops = 0) add(beast::IP::Endpoint ep, C& c, std::uint32_t hops = 0)
{ {
Endpoint cep{ep, hops}; Endpoint cep{ep, hops};
c.insert(cep); c.insert(cep);
@@ -139,7 +139,7 @@ public:
for (auto i = 0; i < num_eps; ++i) for (auto i = 0; i < num_eps; ++i)
add(beast::IP::randomEP(true), add(beast::IP::randomEP(true),
c, c,
ripple::rand_int(0, safe_cast<int>(Tuning::maxHops + 1))); ripple::rand_int<std::uint32_t>());
auto h = c.hops.histogram(); auto h = c.hops.histogram();
if (!BEAST_EXPECT(!h.empty())) if (!BEAST_EXPECT(!h.empty()))
return; return;
@@ -163,7 +163,7 @@ public:
for (auto i = 0; i < 100; ++i) for (auto i = 0; i < 100; ++i)
add(beast::IP::randomEP(true), add(beast::IP::randomEP(true),
c, c,
ripple::rand_int(0, safe_cast<int>(Tuning::maxHops + 1))); ripple::rand_int(Tuning::maxHops + 1));
using at_hop = std::vector<ripple::PeerFinder::Endpoint>; using at_hop = std::vector<ripple::PeerFinder::Endpoint>;
using all_hops = std::array<at_hop, 1 + Tuning::maxHops + 1>; using all_hops = std::array<at_hop, 1 + Tuning::maxHops + 1>;