mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-21 11:35:53 +00:00
Improve handling of endpoints during peer discovery
This commit is contained in:
@@ -34,11 +34,14 @@ Endpoint::Endpoint(Address const& addr, Port port) : m_addr(addr), m_port(port)
|
||||
std::optional<Endpoint>
|
||||
Endpoint::from_string_checked(std::string const& s)
|
||||
{
|
||||
std::stringstream is(boost::trim_copy(s));
|
||||
Endpoint endpoint;
|
||||
is >> endpoint;
|
||||
if (!is.fail() && is.rdbuf()->in_avail() == 0)
|
||||
return endpoint;
|
||||
if (s.size() <= 64)
|
||||
{
|
||||
std::stringstream is(boost::trim_copy(s));
|
||||
Endpoint endpoint;
|
||||
is >> endpoint;
|
||||
if (!is.fail() && is.rdbuf()->in_avail() == 0)
|
||||
return endpoint;
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
@@ -1480,16 +1480,26 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
||||
if (tracking_.load() != Tracking::converged || m->version() != 2)
|
||||
return;
|
||||
|
||||
// The number is arbitrary and doesn't have any real significance or
|
||||
// implication for the protocol.
|
||||
if (m->endpoints_v2().size() >= 1024)
|
||||
{
|
||||
charge(Resource::feeBadData);
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<PeerFinder::Endpoint> endpoints;
|
||||
endpoints.reserve(m->endpoints_v2().size());
|
||||
|
||||
for (auto const& tm : m->endpoints_v2())
|
||||
{
|
||||
auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
|
||||
|
||||
if (!result)
|
||||
{
|
||||
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
|
||||
<< tm.endpoint() << "}";
|
||||
charge(Resource::feeBadData);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1499,10 +1509,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
||||
// time, then we'll verify that their listener can receive incoming
|
||||
// by performing a connectivity test. if hops > 0, then we just
|
||||
// take the address/port we were given
|
||||
if (tm.hops() == 0)
|
||||
result = remote_address_.at_port(result->port());
|
||||
|
||||
endpoints.emplace_back(
|
||||
tm.hops() > 0 ? *result : remote_address_.at_port(result->port()),
|
||||
tm.hops());
|
||||
endpoints.emplace_back(*result, tm.hops());
|
||||
}
|
||||
|
||||
if (!endpoints.empty())
|
||||
|
||||
@@ -112,16 +112,19 @@ struct Config
|
||||
/** Describes a connectible peer address along with some metadata. */
|
||||
struct Endpoint
|
||||
{
|
||||
Endpoint();
|
||||
Endpoint() = default;
|
||||
|
||||
Endpoint(beast::IP::Endpoint const& ep, int hops_);
|
||||
Endpoint(beast::IP::Endpoint const& ep, std::uint32_t hops_);
|
||||
|
||||
int hops;
|
||||
std::uint32_t hops = 0;
|
||||
beast::IP::Endpoint address;
|
||||
};
|
||||
|
||||
bool
|
||||
operator<(Endpoint const& lhs, Endpoint const& rhs);
|
||||
inline bool
|
||||
operator<(Endpoint const& lhs, Endpoint const& rhs)
|
||||
{
|
||||
return lhs.address < rhs.address;
|
||||
}
|
||||
|
||||
/** A set of Endpoint used for connecting. */
|
||||
using Endpoints = std::vector<Endpoint>;
|
||||
|
||||
@@ -18,24 +18,15 @@
|
||||
//==============================================================================
|
||||
|
||||
#include <ripple/peerfinder/PeerfinderManager.h>
|
||||
#include <ripple/peerfinder/impl/Tuning.h>
|
||||
|
||||
namespace ripple {
|
||||
namespace PeerFinder {
|
||||
|
||||
Endpoint::Endpoint() : hops(0)
|
||||
Endpoint::Endpoint(beast::IP::Endpoint const& ep, std::uint32_t hops_)
|
||||
: hops(std::min(hops_, Tuning::maxHops + 1)), address(ep)
|
||||
{
|
||||
}
|
||||
|
||||
Endpoint::Endpoint(beast::IP::Endpoint const& ep, int hops_)
|
||||
: hops(hops_), address(ep)
|
||||
{
|
||||
}
|
||||
|
||||
bool
|
||||
operator<(Endpoint const& lhs, Endpoint const& rhs)
|
||||
{
|
||||
return lhs.address < rhs.address;
|
||||
}
|
||||
|
||||
} // namespace PeerFinder
|
||||
} // namespace ripple
|
||||
|
||||
@@ -363,7 +363,7 @@ public:
|
||||
|
||||
// Reinsert e at a new hops
|
||||
void
|
||||
reinsert(Element& e, int hops);
|
||||
reinsert(Element& e, std::uint32_t hops);
|
||||
|
||||
void
|
||||
remove(Element& e);
|
||||
@@ -444,8 +444,7 @@ Livecache<Allocator>::insert(Endpoint const& ep)
|
||||
// when redirecting.
|
||||
//
|
||||
assert(ep.hops <= (Tuning::maxHops + 1));
|
||||
std::pair<typename cache_type::iterator, bool> result(
|
||||
m_cache.emplace(ep.address, ep));
|
||||
auto result = m_cache.emplace(ep.address, ep);
|
||||
Element& e(result.first->second);
|
||||
if (result.second)
|
||||
{
|
||||
@@ -522,12 +521,14 @@ template <class Allocator>
|
||||
std::string
|
||||
Livecache<Allocator>::hops_t::histogram() const
|
||||
{
|
||||
std::stringstream ss;
|
||||
for (typename decltype(m_hist)::size_type i(0); i < m_hist.size(); ++i)
|
||||
std::string s;
|
||||
for (auto const& h : m_hist)
|
||||
{
|
||||
ss << m_hist[i] << ((i < Tuning::maxHops + 1) ? ", " : "");
|
||||
if (!s.empty())
|
||||
s += ", ";
|
||||
s += std::to_string(h);
|
||||
}
|
||||
return ss.str();
|
||||
return s;
|
||||
}
|
||||
|
||||
template <class Allocator>
|
||||
@@ -540,7 +541,7 @@ template <class Allocator>
|
||||
void
|
||||
Livecache<Allocator>::hops_t::insert(Element& e)
|
||||
{
|
||||
assert(e.endpoint.hops >= 0 && e.endpoint.hops <= Tuning::maxHops + 1);
|
||||
assert(e.endpoint.hops <= Tuning::maxHops + 1);
|
||||
// This has security implications without a shuffle
|
||||
m_lists[e.endpoint.hops].push_front(e);
|
||||
++m_hist[e.endpoint.hops];
|
||||
@@ -548,11 +549,13 @@ Livecache<Allocator>::hops_t::insert(Element& e)
|
||||
|
||||
template <class Allocator>
|
||||
void
|
||||
Livecache<Allocator>::hops_t::reinsert(Element& e, int numHops)
|
||||
Livecache<Allocator>::hops_t::reinsert(Element& e, std::uint32_t numHops)
|
||||
{
|
||||
assert(numHops >= 0 && numHops <= Tuning::maxHops + 1);
|
||||
list_type& list(m_lists[e.endpoint.hops]);
|
||||
assert(numHops <= Tuning::maxHops + 1);
|
||||
|
||||
auto& list = m_lists[e.endpoint.hops];
|
||||
list.erase(list.iterator_to(e));
|
||||
|
||||
--m_hist[e.endpoint.hops];
|
||||
|
||||
e.endpoint.hops = numHops;
|
||||
@@ -564,7 +567,8 @@ void
|
||||
Livecache<Allocator>::hops_t::remove(Element& e)
|
||||
{
|
||||
--m_hist[e.endpoint.hops];
|
||||
list_type& list(m_lists[e.endpoint.hops]);
|
||||
|
||||
auto& list = m_lists[e.endpoint.hops];
|
||||
list.erase(list.iterator_to(e));
|
||||
}
|
||||
|
||||
|
||||
@@ -762,6 +762,13 @@ public:
|
||||
void
|
||||
on_endpoints(SlotImp::ptr const& slot, Endpoints list)
|
||||
{
|
||||
// If we're sent too many endpoints, sample them at random:
|
||||
if (list.size() > Tuning::numberOfEndpointsMax)
|
||||
{
|
||||
std::shuffle(list.begin(), list.end(), default_prng());
|
||||
list.resize(Tuning::numberOfEndpointsMax);
|
||||
}
|
||||
|
||||
JLOG(m_journal.trace())
|
||||
<< beast::leftw(18) << "Endpoints from " << slot->remote_endpoint()
|
||||
<< " contained " << list.size()
|
||||
|
||||
@@ -102,7 +102,7 @@ SlotImp::recent_t::recent_t(clock_type& clock) : cache(clock)
|
||||
}
|
||||
|
||||
void
|
||||
SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, int hops)
|
||||
SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, std::uint32_t hops)
|
||||
{
|
||||
auto const result(cache.emplace(ep, hops));
|
||||
if (!result.second)
|
||||
@@ -117,7 +117,7 @@ SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, int hops)
|
||||
}
|
||||
|
||||
bool
|
||||
SlotImp::recent_t::filter(beast::IP::Endpoint const& ep, int hops)
|
||||
SlotImp::recent_t::filter(beast::IP::Endpoint const& ep, std::uint32_t hops)
|
||||
{
|
||||
auto const iter(cache.find(ep));
|
||||
if (iter == cache.end())
|
||||
|
||||
@@ -32,9 +32,6 @@ namespace PeerFinder {
|
||||
|
||||
class SlotImp : public Slot
|
||||
{
|
||||
private:
|
||||
using recent_type = beast::aged_unordered_map<beast::IP::Endpoint, int>;
|
||||
|
||||
public:
|
||||
using ptr = std::shared_ptr<SlotImp>;
|
||||
|
||||
@@ -155,18 +152,18 @@ public:
|
||||
sending a slot the same address too frequently.
|
||||
*/
|
||||
void
|
||||
insert(beast::IP::Endpoint const& ep, int hops);
|
||||
insert(beast::IP::Endpoint const& ep, std::uint32_t hops);
|
||||
|
||||
/** Returns `true` if we should not send endpoint to the slot. */
|
||||
bool
|
||||
filter(beast::IP::Endpoint const& ep, int hops);
|
||||
filter(beast::IP::Endpoint const& ep, std::uint32_t hops);
|
||||
|
||||
private:
|
||||
void
|
||||
expire();
|
||||
|
||||
friend class SlotImp;
|
||||
recent_type cache;
|
||||
beast::aged_unordered_map<beast::IP::Endpoint, std::uint32_t> cache;
|
||||
} recent;
|
||||
|
||||
void
|
||||
|
||||
@@ -106,43 +106,30 @@ static std::chrono::seconds const bootcacheCooldownTime(60);
|
||||
//
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
enum {
|
||||
// Drop incoming messages with hops greater than this number
|
||||
maxHops = 6
|
||||
// Drop incoming messages with hops greater than this number
|
||||
std::uint32_t constexpr maxHops = 6;
|
||||
|
||||
// How many Endpoint to send in each mtENDPOINTS
|
||||
,
|
||||
numberOfEndpoints = 2 * maxHops
|
||||
// How many Endpoint to send in each mtENDPOINTS
|
||||
std::uint32_t constexpr numberOfEndpoints = 2 * maxHops;
|
||||
|
||||
// The most Endpoint we will accept in mtENDPOINTS
|
||||
,
|
||||
numberOfEndpointsMax = 20
|
||||
// The most Endpoint we will accept in mtENDPOINTS
|
||||
std::uint32_t constexpr numberOfEndpointsMax =
|
||||
std::max<decltype(numberOfEndpoints)>(numberOfEndpoints * 2, 64);
|
||||
|
||||
// The number of peers that we want by default, unless an
|
||||
// explicit value is set in the config file.
|
||||
,
|
||||
defaultMaxPeerCount = 21
|
||||
|
||||
/** Number of addresses we provide when redirecting. */
|
||||
,
|
||||
redirectEndpointCount = 10
|
||||
};
|
||||
// Number of addresses we provide when redirecting.
|
||||
std::uint32_t constexpr redirectEndpointCount = 10;
|
||||
|
||||
// How often we send or accept mtENDPOINTS messages per peer
|
||||
// (we use a prime number of purpose)
|
||||
static std::chrono::seconds const secondsPerMessage(61);
|
||||
std::chrono::seconds constexpr secondsPerMessage(151);
|
||||
|
||||
// How long an Endpoint will stay in the cache
|
||||
// This should be a small multiple of the broadcast frequency
|
||||
static std::chrono::seconds const liveCacheSecondsToLive(30);
|
||||
|
||||
//
|
||||
//
|
||||
//
|
||||
std::chrono::seconds constexpr liveCacheSecondsToLive(30);
|
||||
|
||||
// How much time to wait before trying an outgoing address again.
|
||||
// Note that we ignore the port for purposes of comparison.
|
||||
static std::chrono::seconds const recentAttemptDuration(60);
|
||||
std::chrono::seconds constexpr recentAttemptDuration(60);
|
||||
|
||||
} // namespace Tuning
|
||||
/** @} */
|
||||
|
||||
@@ -48,7 +48,7 @@ public:
|
||||
// Add the address as an endpoint
|
||||
template <class C>
|
||||
inline void
|
||||
add(beast::IP::Endpoint ep, C& c, int hops = 0)
|
||||
add(beast::IP::Endpoint ep, C& c, std::uint32_t hops = 0)
|
||||
{
|
||||
Endpoint cep{ep, hops};
|
||||
c.insert(cep);
|
||||
@@ -139,7 +139,7 @@ public:
|
||||
for (auto i = 0; i < num_eps; ++i)
|
||||
add(beast::IP::randomEP(true),
|
||||
c,
|
||||
ripple::rand_int(0, safe_cast<int>(Tuning::maxHops + 1)));
|
||||
ripple::rand_int<std::uint32_t>());
|
||||
auto h = c.hops.histogram();
|
||||
if (!BEAST_EXPECT(!h.empty()))
|
||||
return;
|
||||
@@ -163,7 +163,7 @@ public:
|
||||
for (auto i = 0; i < 100; ++i)
|
||||
add(beast::IP::randomEP(true),
|
||||
c,
|
||||
ripple::rand_int(0, safe_cast<int>(Tuning::maxHops + 1)));
|
||||
ripple::rand_int(Tuning::maxHops + 1));
|
||||
|
||||
using at_hop = std::vector<ripple::PeerFinder::Endpoint>;
|
||||
using all_hops = std::array<at_hop, 1 + Tuning::maxHops + 1>;
|
||||
|
||||
Reference in New Issue
Block a user