mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-23 20:45:51 +00:00
Improve handling of endpoints during peer discovery
This commit is contained in:
@@ -33,12 +33,15 @@ Endpoint::Endpoint(Address const& addr, Port port) : m_addr(addr), m_port(port)
|
|||||||
|
|
||||||
std::optional<Endpoint>
|
std::optional<Endpoint>
|
||||||
Endpoint::from_string_checked(std::string const& s)
|
Endpoint::from_string_checked(std::string const& s)
|
||||||
|
{
|
||||||
|
if (s.size() <= 64)
|
||||||
{
|
{
|
||||||
std::stringstream is(boost::trim_copy(s));
|
std::stringstream is(boost::trim_copy(s));
|
||||||
Endpoint endpoint;
|
Endpoint endpoint;
|
||||||
is >> endpoint;
|
is >> endpoint;
|
||||||
if (!is.fail() && is.rdbuf()->in_avail() == 0)
|
if (!is.fail() && is.rdbuf()->in_avail() == 0)
|
||||||
return endpoint;
|
return endpoint;
|
||||||
|
}
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1480,16 +1480,26 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
|||||||
if (tracking_.load() != Tracking::converged || m->version() != 2)
|
if (tracking_.load() != Tracking::converged || m->version() != 2)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
// The number is arbitrary and doesn't have any real significance or
|
||||||
|
// implication for the protocol.
|
||||||
|
if (m->endpoints_v2().size() >= 1024)
|
||||||
|
{
|
||||||
|
charge(Resource::feeBadData);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
std::vector<PeerFinder::Endpoint> endpoints;
|
std::vector<PeerFinder::Endpoint> endpoints;
|
||||||
endpoints.reserve(m->endpoints_v2().size());
|
endpoints.reserve(m->endpoints_v2().size());
|
||||||
|
|
||||||
for (auto const& tm : m->endpoints_v2())
|
for (auto const& tm : m->endpoints_v2())
|
||||||
{
|
{
|
||||||
auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
|
auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
|
||||||
|
|
||||||
if (!result)
|
if (!result)
|
||||||
{
|
{
|
||||||
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
|
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
|
||||||
<< tm.endpoint() << "}";
|
<< tm.endpoint() << "}";
|
||||||
|
charge(Resource::feeBadData);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1499,10 +1509,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
|||||||
// time, then we'll verify that their listener can receive incoming
|
// time, then we'll verify that their listener can receive incoming
|
||||||
// by performing a connectivity test. if hops > 0, then we just
|
// by performing a connectivity test. if hops > 0, then we just
|
||||||
// take the address/port we were given
|
// take the address/port we were given
|
||||||
|
if (tm.hops() == 0)
|
||||||
|
result = remote_address_.at_port(result->port());
|
||||||
|
|
||||||
endpoints.emplace_back(
|
endpoints.emplace_back(*result, tm.hops());
|
||||||
tm.hops() > 0 ? *result : remote_address_.at_port(result->port()),
|
|
||||||
tm.hops());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!endpoints.empty())
|
if (!endpoints.empty())
|
||||||
|
|||||||
@@ -112,16 +112,19 @@ struct Config
|
|||||||
/** Describes a connectible peer address along with some metadata. */
|
/** Describes a connectible peer address along with some metadata. */
|
||||||
struct Endpoint
|
struct Endpoint
|
||||||
{
|
{
|
||||||
Endpoint();
|
Endpoint() = default;
|
||||||
|
|
||||||
Endpoint(beast::IP::Endpoint const& ep, int hops_);
|
Endpoint(beast::IP::Endpoint const& ep, std::uint32_t hops_);
|
||||||
|
|
||||||
int hops;
|
std::uint32_t hops = 0;
|
||||||
beast::IP::Endpoint address;
|
beast::IP::Endpoint address;
|
||||||
};
|
};
|
||||||
|
|
||||||
bool
|
inline bool
|
||||||
operator<(Endpoint const& lhs, Endpoint const& rhs);
|
operator<(Endpoint const& lhs, Endpoint const& rhs)
|
||||||
|
{
|
||||||
|
return lhs.address < rhs.address;
|
||||||
|
}
|
||||||
|
|
||||||
/** A set of Endpoint used for connecting. */
|
/** A set of Endpoint used for connecting. */
|
||||||
using Endpoints = std::vector<Endpoint>;
|
using Endpoints = std::vector<Endpoint>;
|
||||||
|
|||||||
@@ -18,24 +18,15 @@
|
|||||||
//==============================================================================
|
//==============================================================================
|
||||||
|
|
||||||
#include <ripple/peerfinder/PeerfinderManager.h>
|
#include <ripple/peerfinder/PeerfinderManager.h>
|
||||||
|
#include <ripple/peerfinder/impl/Tuning.h>
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
namespace PeerFinder {
|
namespace PeerFinder {
|
||||||
|
|
||||||
Endpoint::Endpoint() : hops(0)
|
Endpoint::Endpoint(beast::IP::Endpoint const& ep, std::uint32_t hops_)
|
||||||
|
: hops(std::min(hops_, Tuning::maxHops + 1)), address(ep)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
Endpoint::Endpoint(beast::IP::Endpoint const& ep, int hops_)
|
|
||||||
: hops(hops_), address(ep)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
bool
|
|
||||||
operator<(Endpoint const& lhs, Endpoint const& rhs)
|
|
||||||
{
|
|
||||||
return lhs.address < rhs.address;
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace PeerFinder
|
} // namespace PeerFinder
|
||||||
} // namespace ripple
|
} // namespace ripple
|
||||||
|
|||||||
@@ -363,7 +363,7 @@ public:
|
|||||||
|
|
||||||
// Reinsert e at a new hops
|
// Reinsert e at a new hops
|
||||||
void
|
void
|
||||||
reinsert(Element& e, int hops);
|
reinsert(Element& e, std::uint32_t hops);
|
||||||
|
|
||||||
void
|
void
|
||||||
remove(Element& e);
|
remove(Element& e);
|
||||||
@@ -444,8 +444,7 @@ Livecache<Allocator>::insert(Endpoint const& ep)
|
|||||||
// when redirecting.
|
// when redirecting.
|
||||||
//
|
//
|
||||||
assert(ep.hops <= (Tuning::maxHops + 1));
|
assert(ep.hops <= (Tuning::maxHops + 1));
|
||||||
std::pair<typename cache_type::iterator, bool> result(
|
auto result = m_cache.emplace(ep.address, ep);
|
||||||
m_cache.emplace(ep.address, ep));
|
|
||||||
Element& e(result.first->second);
|
Element& e(result.first->second);
|
||||||
if (result.second)
|
if (result.second)
|
||||||
{
|
{
|
||||||
@@ -522,12 +521,14 @@ template <class Allocator>
|
|||||||
std::string
|
std::string
|
||||||
Livecache<Allocator>::hops_t::histogram() const
|
Livecache<Allocator>::hops_t::histogram() const
|
||||||
{
|
{
|
||||||
std::stringstream ss;
|
std::string s;
|
||||||
for (typename decltype(m_hist)::size_type i(0); i < m_hist.size(); ++i)
|
for (auto const& h : m_hist)
|
||||||
{
|
{
|
||||||
ss << m_hist[i] << ((i < Tuning::maxHops + 1) ? ", " : "");
|
if (!s.empty())
|
||||||
|
s += ", ";
|
||||||
|
s += std::to_string(h);
|
||||||
}
|
}
|
||||||
return ss.str();
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class Allocator>
|
template <class Allocator>
|
||||||
@@ -540,7 +541,7 @@ template <class Allocator>
|
|||||||
void
|
void
|
||||||
Livecache<Allocator>::hops_t::insert(Element& e)
|
Livecache<Allocator>::hops_t::insert(Element& e)
|
||||||
{
|
{
|
||||||
assert(e.endpoint.hops >= 0 && e.endpoint.hops <= Tuning::maxHops + 1);
|
assert(e.endpoint.hops <= Tuning::maxHops + 1);
|
||||||
// This has security implications without a shuffle
|
// This has security implications without a shuffle
|
||||||
m_lists[e.endpoint.hops].push_front(e);
|
m_lists[e.endpoint.hops].push_front(e);
|
||||||
++m_hist[e.endpoint.hops];
|
++m_hist[e.endpoint.hops];
|
||||||
@@ -548,11 +549,13 @@ Livecache<Allocator>::hops_t::insert(Element& e)
|
|||||||
|
|
||||||
template <class Allocator>
|
template <class Allocator>
|
||||||
void
|
void
|
||||||
Livecache<Allocator>::hops_t::reinsert(Element& e, int numHops)
|
Livecache<Allocator>::hops_t::reinsert(Element& e, std::uint32_t numHops)
|
||||||
{
|
{
|
||||||
assert(numHops >= 0 && numHops <= Tuning::maxHops + 1);
|
assert(numHops <= Tuning::maxHops + 1);
|
||||||
list_type& list(m_lists[e.endpoint.hops]);
|
|
||||||
|
auto& list = m_lists[e.endpoint.hops];
|
||||||
list.erase(list.iterator_to(e));
|
list.erase(list.iterator_to(e));
|
||||||
|
|
||||||
--m_hist[e.endpoint.hops];
|
--m_hist[e.endpoint.hops];
|
||||||
|
|
||||||
e.endpoint.hops = numHops;
|
e.endpoint.hops = numHops;
|
||||||
@@ -564,7 +567,8 @@ void
|
|||||||
Livecache<Allocator>::hops_t::remove(Element& e)
|
Livecache<Allocator>::hops_t::remove(Element& e)
|
||||||
{
|
{
|
||||||
--m_hist[e.endpoint.hops];
|
--m_hist[e.endpoint.hops];
|
||||||
list_type& list(m_lists[e.endpoint.hops]);
|
|
||||||
|
auto& list = m_lists[e.endpoint.hops];
|
||||||
list.erase(list.iterator_to(e));
|
list.erase(list.iterator_to(e));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -762,6 +762,13 @@ public:
|
|||||||
void
|
void
|
||||||
on_endpoints(SlotImp::ptr const& slot, Endpoints list)
|
on_endpoints(SlotImp::ptr const& slot, Endpoints list)
|
||||||
{
|
{
|
||||||
|
// If we're sent too many endpoints, sample them at random:
|
||||||
|
if (list.size() > Tuning::numberOfEndpointsMax)
|
||||||
|
{
|
||||||
|
std::shuffle(list.begin(), list.end(), default_prng());
|
||||||
|
list.resize(Tuning::numberOfEndpointsMax);
|
||||||
|
}
|
||||||
|
|
||||||
JLOG(m_journal.trace())
|
JLOG(m_journal.trace())
|
||||||
<< beast::leftw(18) << "Endpoints from " << slot->remote_endpoint()
|
<< beast::leftw(18) << "Endpoints from " << slot->remote_endpoint()
|
||||||
<< " contained " << list.size()
|
<< " contained " << list.size()
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ SlotImp::recent_t::recent_t(clock_type& clock) : cache(clock)
|
|||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, int hops)
|
SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, std::uint32_t hops)
|
||||||
{
|
{
|
||||||
auto const result(cache.emplace(ep, hops));
|
auto const result(cache.emplace(ep, hops));
|
||||||
if (!result.second)
|
if (!result.second)
|
||||||
@@ -117,7 +117,7 @@ SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, int hops)
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
SlotImp::recent_t::filter(beast::IP::Endpoint const& ep, int hops)
|
SlotImp::recent_t::filter(beast::IP::Endpoint const& ep, std::uint32_t hops)
|
||||||
{
|
{
|
||||||
auto const iter(cache.find(ep));
|
auto const iter(cache.find(ep));
|
||||||
if (iter == cache.end())
|
if (iter == cache.end())
|
||||||
|
|||||||
@@ -32,9 +32,6 @@ namespace PeerFinder {
|
|||||||
|
|
||||||
class SlotImp : public Slot
|
class SlotImp : public Slot
|
||||||
{
|
{
|
||||||
private:
|
|
||||||
using recent_type = beast::aged_unordered_map<beast::IP::Endpoint, int>;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
using ptr = std::shared_ptr<SlotImp>;
|
using ptr = std::shared_ptr<SlotImp>;
|
||||||
|
|
||||||
@@ -155,18 +152,18 @@ public:
|
|||||||
sending a slot the same address too frequently.
|
sending a slot the same address too frequently.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
insert(beast::IP::Endpoint const& ep, int hops);
|
insert(beast::IP::Endpoint const& ep, std::uint32_t hops);
|
||||||
|
|
||||||
/** Returns `true` if we should not send endpoint to the slot. */
|
/** Returns `true` if we should not send endpoint to the slot. */
|
||||||
bool
|
bool
|
||||||
filter(beast::IP::Endpoint const& ep, int hops);
|
filter(beast::IP::Endpoint const& ep, std::uint32_t hops);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void
|
void
|
||||||
expire();
|
expire();
|
||||||
|
|
||||||
friend class SlotImp;
|
friend class SlotImp;
|
||||||
recent_type cache;
|
beast::aged_unordered_map<beast::IP::Endpoint, std::uint32_t> cache;
|
||||||
} recent;
|
} recent;
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|||||||
@@ -106,43 +106,30 @@ static std::chrono::seconds const bootcacheCooldownTime(60);
|
|||||||
//
|
//
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
enum {
|
|
||||||
// Drop incoming messages with hops greater than this number
|
// Drop incoming messages with hops greater than this number
|
||||||
maxHops = 6
|
std::uint32_t constexpr maxHops = 6;
|
||||||
|
|
||||||
// How many Endpoint to send in each mtENDPOINTS
|
// How many Endpoint to send in each mtENDPOINTS
|
||||||
,
|
std::uint32_t constexpr numberOfEndpoints = 2 * maxHops;
|
||||||
numberOfEndpoints = 2 * maxHops
|
|
||||||
|
|
||||||
// The most Endpoint we will accept in mtENDPOINTS
|
// The most Endpoint we will accept in mtENDPOINTS
|
||||||
,
|
std::uint32_t constexpr numberOfEndpointsMax =
|
||||||
numberOfEndpointsMax = 20
|
std::max<decltype(numberOfEndpoints)>(numberOfEndpoints * 2, 64);
|
||||||
|
|
||||||
// The number of peers that we want by default, unless an
|
// Number of addresses we provide when redirecting.
|
||||||
// explicit value is set in the config file.
|
std::uint32_t constexpr redirectEndpointCount = 10;
|
||||||
,
|
|
||||||
defaultMaxPeerCount = 21
|
|
||||||
|
|
||||||
/** Number of addresses we provide when redirecting. */
|
|
||||||
,
|
|
||||||
redirectEndpointCount = 10
|
|
||||||
};
|
|
||||||
|
|
||||||
// How often we send or accept mtENDPOINTS messages per peer
|
// How often we send or accept mtENDPOINTS messages per peer
|
||||||
// (we use a prime number of purpose)
|
// (we use a prime number of purpose)
|
||||||
static std::chrono::seconds const secondsPerMessage(61);
|
std::chrono::seconds constexpr secondsPerMessage(151);
|
||||||
|
|
||||||
// How long an Endpoint will stay in the cache
|
// How long an Endpoint will stay in the cache
|
||||||
// This should be a small multiple of the broadcast frequency
|
// This should be a small multiple of the broadcast frequency
|
||||||
static std::chrono::seconds const liveCacheSecondsToLive(30);
|
std::chrono::seconds constexpr liveCacheSecondsToLive(30);
|
||||||
|
|
||||||
//
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
// How much time to wait before trying an outgoing address again.
|
// How much time to wait before trying an outgoing address again.
|
||||||
// Note that we ignore the port for purposes of comparison.
|
// Note that we ignore the port for purposes of comparison.
|
||||||
static std::chrono::seconds const recentAttemptDuration(60);
|
std::chrono::seconds constexpr recentAttemptDuration(60);
|
||||||
|
|
||||||
} // namespace Tuning
|
} // namespace Tuning
|
||||||
/** @} */
|
/** @} */
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ public:
|
|||||||
// Add the address as an endpoint
|
// Add the address as an endpoint
|
||||||
template <class C>
|
template <class C>
|
||||||
inline void
|
inline void
|
||||||
add(beast::IP::Endpoint ep, C& c, int hops = 0)
|
add(beast::IP::Endpoint ep, C& c, std::uint32_t hops = 0)
|
||||||
{
|
{
|
||||||
Endpoint cep{ep, hops};
|
Endpoint cep{ep, hops};
|
||||||
c.insert(cep);
|
c.insert(cep);
|
||||||
@@ -139,7 +139,7 @@ public:
|
|||||||
for (auto i = 0; i < num_eps; ++i)
|
for (auto i = 0; i < num_eps; ++i)
|
||||||
add(beast::IP::randomEP(true),
|
add(beast::IP::randomEP(true),
|
||||||
c,
|
c,
|
||||||
ripple::rand_int(0, safe_cast<int>(Tuning::maxHops + 1)));
|
ripple::rand_int<std::uint32_t>());
|
||||||
auto h = c.hops.histogram();
|
auto h = c.hops.histogram();
|
||||||
if (!BEAST_EXPECT(!h.empty()))
|
if (!BEAST_EXPECT(!h.empty()))
|
||||||
return;
|
return;
|
||||||
@@ -163,7 +163,7 @@ public:
|
|||||||
for (auto i = 0; i < 100; ++i)
|
for (auto i = 0; i < 100; ++i)
|
||||||
add(beast::IP::randomEP(true),
|
add(beast::IP::randomEP(true),
|
||||||
c,
|
c,
|
||||||
ripple::rand_int(0, safe_cast<int>(Tuning::maxHops + 1)));
|
ripple::rand_int(Tuning::maxHops + 1));
|
||||||
|
|
||||||
using at_hop = std::vector<ripple::PeerFinder::Endpoint>;
|
using at_hop = std::vector<ripple::PeerFinder::Endpoint>;
|
||||||
using all_hops = std::array<at_hop, 1 + Tuning::maxHops + 1>;
|
using all_hops = std::array<at_hop, 1 + Tuning::maxHops + 1>;
|
||||||
|
|||||||
Reference in New Issue
Block a user