Improve handling of endpoints during peer discovery

This commit is contained in:
Nik Bougalis
2022-01-18 14:39:57 -08:00
parent c5dc00af74
commit 289bc0afd9
10 changed files with 75 additions and 73 deletions

View File

@@ -34,11 +34,14 @@ Endpoint::Endpoint(Address const& addr, Port port) : m_addr(addr), m_port(port)
std::optional<Endpoint>
Endpoint::from_string_checked(std::string const& s)
{
std::stringstream is(boost::trim_copy(s));
Endpoint endpoint;
is >> endpoint;
if (!is.fail() && is.rdbuf()->in_avail() == 0)
return endpoint;
if (s.size() <= 64)
{
std::stringstream is(boost::trim_copy(s));
Endpoint endpoint;
is >> endpoint;
if (!is.fail() && is.rdbuf()->in_avail() == 0)
return endpoint;
}
return {};
}

View File

@@ -1480,16 +1480,26 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
if (tracking_.load() != Tracking::converged || m->version() != 2)
return;
// The number is arbitrary and doesn't have any real significance or
// implication for the protocol.
if (m->endpoints_v2().size() >= 1024)
{
charge(Resource::feeBadData);
return;
}
std::vector<PeerFinder::Endpoint> endpoints;
endpoints.reserve(m->endpoints_v2().size());
for (auto const& tm : m->endpoints_v2())
{
auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
if (!result)
{
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
<< tm.endpoint() << "}";
charge(Resource::feeBadData);
continue;
}
@@ -1499,10 +1509,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
// time, then we'll verify that their listener can receive incoming
// by performing a connectivity test. if hops > 0, then we just
// take the address/port we were given
if (tm.hops() == 0)
result = remote_address_.at_port(result->port());
endpoints.emplace_back(
tm.hops() > 0 ? *result : remote_address_.at_port(result->port()),
tm.hops());
endpoints.emplace_back(*result, tm.hops());
}
if (!endpoints.empty())

View File

@@ -112,16 +112,19 @@ struct Config
/** Describes a connectible peer address along with some metadata. */
struct Endpoint
{
Endpoint();
Endpoint() = default;
Endpoint(beast::IP::Endpoint const& ep, int hops_);
Endpoint(beast::IP::Endpoint const& ep, std::uint32_t hops_);
int hops;
std::uint32_t hops = 0;
beast::IP::Endpoint address;
};
bool
operator<(Endpoint const& lhs, Endpoint const& rhs);
inline bool
operator<(Endpoint const& lhs, Endpoint const& rhs)
{
return lhs.address < rhs.address;
}
/** A set of Endpoint used for connecting. */
using Endpoints = std::vector<Endpoint>;

View File

@@ -18,24 +18,15 @@
//==============================================================================
#include <ripple/peerfinder/PeerfinderManager.h>
#include <ripple/peerfinder/impl/Tuning.h>
namespace ripple {
namespace PeerFinder {
Endpoint::Endpoint() : hops(0)
Endpoint::Endpoint(beast::IP::Endpoint const& ep, std::uint32_t hops_)
: hops(std::min(hops_, Tuning::maxHops + 1)), address(ep)
{
}
Endpoint::Endpoint(beast::IP::Endpoint const& ep, int hops_)
: hops(hops_), address(ep)
{
}
bool
operator<(Endpoint const& lhs, Endpoint const& rhs)
{
return lhs.address < rhs.address;
}
} // namespace PeerFinder
} // namespace ripple

View File

@@ -363,7 +363,7 @@ public:
// Reinsert e at a new hops
void
reinsert(Element& e, int hops);
reinsert(Element& e, std::uint32_t hops);
void
remove(Element& e);
@@ -444,8 +444,7 @@ Livecache<Allocator>::insert(Endpoint const& ep)
// when redirecting.
//
assert(ep.hops <= (Tuning::maxHops + 1));
std::pair<typename cache_type::iterator, bool> result(
m_cache.emplace(ep.address, ep));
auto result = m_cache.emplace(ep.address, ep);
Element& e(result.first->second);
if (result.second)
{
@@ -522,12 +521,14 @@ template <class Allocator>
std::string
Livecache<Allocator>::hops_t::histogram() const
{
std::stringstream ss;
for (typename decltype(m_hist)::size_type i(0); i < m_hist.size(); ++i)
std::string s;
for (auto const& h : m_hist)
{
ss << m_hist[i] << ((i < Tuning::maxHops + 1) ? ", " : "");
if (!s.empty())
s += ", ";
s += std::to_string(h);
}
return ss.str();
return s;
}
template <class Allocator>
@@ -540,7 +541,7 @@ template <class Allocator>
void
Livecache<Allocator>::hops_t::insert(Element& e)
{
assert(e.endpoint.hops >= 0 && e.endpoint.hops <= Tuning::maxHops + 1);
assert(e.endpoint.hops <= Tuning::maxHops + 1);
// This has security implications without a shuffle
m_lists[e.endpoint.hops].push_front(e);
++m_hist[e.endpoint.hops];
@@ -548,11 +549,13 @@ Livecache<Allocator>::hops_t::insert(Element& e)
template <class Allocator>
void
Livecache<Allocator>::hops_t::reinsert(Element& e, int numHops)
Livecache<Allocator>::hops_t::reinsert(Element& e, std::uint32_t numHops)
{
assert(numHops >= 0 && numHops <= Tuning::maxHops + 1);
list_type& list(m_lists[e.endpoint.hops]);
assert(numHops <= Tuning::maxHops + 1);
auto& list = m_lists[e.endpoint.hops];
list.erase(list.iterator_to(e));
--m_hist[e.endpoint.hops];
e.endpoint.hops = numHops;
@@ -564,7 +567,8 @@ void
Livecache<Allocator>::hops_t::remove(Element& e)
{
--m_hist[e.endpoint.hops];
list_type& list(m_lists[e.endpoint.hops]);
auto& list = m_lists[e.endpoint.hops];
list.erase(list.iterator_to(e));
}

View File

@@ -762,6 +762,13 @@ public:
void
on_endpoints(SlotImp::ptr const& slot, Endpoints list)
{
// If we're sent too many endpoints, sample them at random:
if (list.size() > Tuning::numberOfEndpointsMax)
{
std::shuffle(list.begin(), list.end(), default_prng());
list.resize(Tuning::numberOfEndpointsMax);
}
JLOG(m_journal.trace())
<< beast::leftw(18) << "Endpoints from " << slot->remote_endpoint()
<< " contained " << list.size()

View File

@@ -102,7 +102,7 @@ SlotImp::recent_t::recent_t(clock_type& clock) : cache(clock)
}
void
SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, int hops)
SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, std::uint32_t hops)
{
auto const result(cache.emplace(ep, hops));
if (!result.second)
@@ -117,7 +117,7 @@ SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, int hops)
}
bool
SlotImp::recent_t::filter(beast::IP::Endpoint const& ep, int hops)
SlotImp::recent_t::filter(beast::IP::Endpoint const& ep, std::uint32_t hops)
{
auto const iter(cache.find(ep));
if (iter == cache.end())

View File

@@ -32,9 +32,6 @@ namespace PeerFinder {
class SlotImp : public Slot
{
private:
using recent_type = beast::aged_unordered_map<beast::IP::Endpoint, int>;
public:
using ptr = std::shared_ptr<SlotImp>;
@@ -155,18 +152,18 @@ public:
sending a slot the same address too frequently.
*/
void
insert(beast::IP::Endpoint const& ep, int hops);
insert(beast::IP::Endpoint const& ep, std::uint32_t hops);
/** Returns `true` if we should not send endpoint to the slot. */
bool
filter(beast::IP::Endpoint const& ep, int hops);
filter(beast::IP::Endpoint const& ep, std::uint32_t hops);
private:
void
expire();
friend class SlotImp;
recent_type cache;
beast::aged_unordered_map<beast::IP::Endpoint, std::uint32_t> cache;
} recent;
void

View File

@@ -106,43 +106,30 @@ static std::chrono::seconds const bootcacheCooldownTime(60);
//
//------------------------------------------------------------------------------
enum {
// Drop incoming messages with hops greater than this number
maxHops = 6
// Drop incoming messages with hops greater than this number
std::uint32_t constexpr maxHops = 6;
// How many Endpoint to send in each mtENDPOINTS
,
numberOfEndpoints = 2 * maxHops
// How many Endpoint to send in each mtENDPOINTS
std::uint32_t constexpr numberOfEndpoints = 2 * maxHops;
// The most Endpoint we will accept in mtENDPOINTS
,
numberOfEndpointsMax = 20
// The most Endpoint we will accept in mtENDPOINTS
std::uint32_t constexpr numberOfEndpointsMax =
std::max<decltype(numberOfEndpoints)>(numberOfEndpoints * 2, 64);
// The number of peers that we want by default, unless an
// explicit value is set in the config file.
,
defaultMaxPeerCount = 21
/** Number of addresses we provide when redirecting. */
,
redirectEndpointCount = 10
};
// Number of addresses we provide when redirecting.
std::uint32_t constexpr redirectEndpointCount = 10;
// How often we send or accept mtENDPOINTS messages per peer
// (we use a prime number of purpose)
static std::chrono::seconds const secondsPerMessage(61);
std::chrono::seconds constexpr secondsPerMessage(151);
// How long an Endpoint will stay in the cache
// This should be a small multiple of the broadcast frequency
static std::chrono::seconds const liveCacheSecondsToLive(30);
//
//
//
std::chrono::seconds constexpr liveCacheSecondsToLive(30);
// How much time to wait before trying an outgoing address again.
// Note that we ignore the port for purposes of comparison.
static std::chrono::seconds const recentAttemptDuration(60);
std::chrono::seconds constexpr recentAttemptDuration(60);
} // namespace Tuning
/** @} */

View File

@@ -48,7 +48,7 @@ public:
// Add the address as an endpoint
template <class C>
inline void
add(beast::IP::Endpoint ep, C& c, int hops = 0)
add(beast::IP::Endpoint ep, C& c, std::uint32_t hops = 0)
{
Endpoint cep{ep, hops};
c.insert(cep);
@@ -139,7 +139,7 @@ public:
for (auto i = 0; i < num_eps; ++i)
add(beast::IP::randomEP(true),
c,
ripple::rand_int(0, safe_cast<int>(Tuning::maxHops + 1)));
ripple::rand_int<std::uint32_t>());
auto h = c.hops.histogram();
if (!BEAST_EXPECT(!h.empty()))
return;
@@ -163,7 +163,7 @@ public:
for (auto i = 0; i < 100; ++i)
add(beast::IP::randomEP(true),
c,
ripple::rand_int(0, safe_cast<int>(Tuning::maxHops + 1)));
ripple::rand_int(Tuning::maxHops + 1));
using at_hop = std::vector<ripple::PeerFinder::Endpoint>;
using all_hops = std::array<at_hop, 1 + Tuning::maxHops + 1>;