mirror of
				https://github.com/Xahau/xahaud.git
				synced 2025-11-04 02:35:48 +00:00 
			
		
		
		
	Improve handling of endpoints during peer discovery
This commit is contained in:
		@@ -34,11 +34,14 @@ Endpoint::Endpoint(Address const& addr, Port port) : m_addr(addr), m_port(port)
 | 
			
		||||
std::optional<Endpoint>
 | 
			
		||||
Endpoint::from_string_checked(std::string const& s)
 | 
			
		||||
{
 | 
			
		||||
    std::stringstream is(boost::trim_copy(s));
 | 
			
		||||
    Endpoint endpoint;
 | 
			
		||||
    is >> endpoint;
 | 
			
		||||
    if (!is.fail() && is.rdbuf()->in_avail() == 0)
 | 
			
		||||
        return endpoint;
 | 
			
		||||
    if (s.size() <= 64)
 | 
			
		||||
    {
 | 
			
		||||
        std::stringstream is(boost::trim_copy(s));
 | 
			
		||||
        Endpoint endpoint;
 | 
			
		||||
        is >> endpoint;
 | 
			
		||||
        if (!is.fail() && is.rdbuf()->in_avail() == 0)
 | 
			
		||||
            return endpoint;
 | 
			
		||||
    }
 | 
			
		||||
    return {};
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1480,16 +1480,26 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
 | 
			
		||||
    if (tracking_.load() != Tracking::converged || m->version() != 2)
 | 
			
		||||
        return;
 | 
			
		||||
 | 
			
		||||
    // The number is arbitrary and doesn't have any real significance or
 | 
			
		||||
    // implication for the protocol.
 | 
			
		||||
    if (m->endpoints_v2().size() >= 1024)
 | 
			
		||||
    {
 | 
			
		||||
        charge(Resource::feeBadData);
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::vector<PeerFinder::Endpoint> endpoints;
 | 
			
		||||
    endpoints.reserve(m->endpoints_v2().size());
 | 
			
		||||
 | 
			
		||||
    for (auto const& tm : m->endpoints_v2())
 | 
			
		||||
    {
 | 
			
		||||
        auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
 | 
			
		||||
 | 
			
		||||
        if (!result)
 | 
			
		||||
        {
 | 
			
		||||
            JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
 | 
			
		||||
                                     << tm.endpoint() << "}";
 | 
			
		||||
            charge(Resource::feeBadData);
 | 
			
		||||
            continue;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@@ -1499,10 +1509,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
 | 
			
		||||
        // time, then we'll verify that their listener can receive incoming
 | 
			
		||||
        // by performing a connectivity test.  if hops > 0, then we just
 | 
			
		||||
        // take the address/port we were given
 | 
			
		||||
        if (tm.hops() == 0)
 | 
			
		||||
            result = remote_address_.at_port(result->port());
 | 
			
		||||
 | 
			
		||||
        endpoints.emplace_back(
 | 
			
		||||
            tm.hops() > 0 ? *result : remote_address_.at_port(result->port()),
 | 
			
		||||
            tm.hops());
 | 
			
		||||
        endpoints.emplace_back(*result, tm.hops());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (!endpoints.empty())
 | 
			
		||||
 
 | 
			
		||||
@@ -112,16 +112,19 @@ struct Config
 | 
			
		||||
/** Describes a connectible peer address along with some metadata. */
 | 
			
		||||
struct Endpoint
 | 
			
		||||
{
 | 
			
		||||
    Endpoint();
 | 
			
		||||
    Endpoint() = default;
 | 
			
		||||
 | 
			
		||||
    Endpoint(beast::IP::Endpoint const& ep, int hops_);
 | 
			
		||||
    Endpoint(beast::IP::Endpoint const& ep, std::uint32_t hops_);
 | 
			
		||||
 | 
			
		||||
    int hops;
 | 
			
		||||
    std::uint32_t hops = 0;
 | 
			
		||||
    beast::IP::Endpoint address;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
bool
 | 
			
		||||
operator<(Endpoint const& lhs, Endpoint const& rhs);
 | 
			
		||||
inline bool
 | 
			
		||||
operator<(Endpoint const& lhs, Endpoint const& rhs)
 | 
			
		||||
{
 | 
			
		||||
    return lhs.address < rhs.address;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/** A set of Endpoint used for connecting. */
 | 
			
		||||
using Endpoints = std::vector<Endpoint>;
 | 
			
		||||
 
 | 
			
		||||
@@ -18,24 +18,15 @@
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#include <ripple/peerfinder/PeerfinderManager.h>
 | 
			
		||||
#include <ripple/peerfinder/impl/Tuning.h>
 | 
			
		||||
 | 
			
		||||
namespace ripple {
 | 
			
		||||
namespace PeerFinder {
 | 
			
		||||
 | 
			
		||||
Endpoint::Endpoint() : hops(0)
 | 
			
		||||
Endpoint::Endpoint(beast::IP::Endpoint const& ep, std::uint32_t hops_)
 | 
			
		||||
    : hops(std::min(hops_, Tuning::maxHops + 1)), address(ep)
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
Endpoint::Endpoint(beast::IP::Endpoint const& ep, int hops_)
 | 
			
		||||
    : hops(hops_), address(ep)
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool
 | 
			
		||||
operator<(Endpoint const& lhs, Endpoint const& rhs)
 | 
			
		||||
{
 | 
			
		||||
    return lhs.address < rhs.address;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}  // namespace PeerFinder
 | 
			
		||||
}  // namespace ripple
 | 
			
		||||
 
 | 
			
		||||
@@ -363,7 +363,7 @@ public:
 | 
			
		||||
 | 
			
		||||
        // Reinsert e at a new hops
 | 
			
		||||
        void
 | 
			
		||||
        reinsert(Element& e, int hops);
 | 
			
		||||
        reinsert(Element& e, std::uint32_t hops);
 | 
			
		||||
 | 
			
		||||
        void
 | 
			
		||||
        remove(Element& e);
 | 
			
		||||
@@ -444,8 +444,7 @@ Livecache<Allocator>::insert(Endpoint const& ep)
 | 
			
		||||
    // when redirecting.
 | 
			
		||||
    //
 | 
			
		||||
    assert(ep.hops <= (Tuning::maxHops + 1));
 | 
			
		||||
    std::pair<typename cache_type::iterator, bool> result(
 | 
			
		||||
        m_cache.emplace(ep.address, ep));
 | 
			
		||||
    auto result = m_cache.emplace(ep.address, ep);
 | 
			
		||||
    Element& e(result.first->second);
 | 
			
		||||
    if (result.second)
 | 
			
		||||
    {
 | 
			
		||||
@@ -522,12 +521,14 @@ template <class Allocator>
 | 
			
		||||
std::string
 | 
			
		||||
Livecache<Allocator>::hops_t::histogram() const
 | 
			
		||||
{
 | 
			
		||||
    std::stringstream ss;
 | 
			
		||||
    for (typename decltype(m_hist)::size_type i(0); i < m_hist.size(); ++i)
 | 
			
		||||
    std::string s;
 | 
			
		||||
    for (auto const& h : m_hist)
 | 
			
		||||
    {
 | 
			
		||||
        ss << m_hist[i] << ((i < Tuning::maxHops + 1) ? ", " : "");
 | 
			
		||||
        if (!s.empty())
 | 
			
		||||
            s += ", ";
 | 
			
		||||
        s += std::to_string(h);
 | 
			
		||||
    }
 | 
			
		||||
    return ss.str();
 | 
			
		||||
    return s;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <class Allocator>
 | 
			
		||||
@@ -540,7 +541,7 @@ template <class Allocator>
 | 
			
		||||
void
 | 
			
		||||
Livecache<Allocator>::hops_t::insert(Element& e)
 | 
			
		||||
{
 | 
			
		||||
    assert(e.endpoint.hops >= 0 && e.endpoint.hops <= Tuning::maxHops + 1);
 | 
			
		||||
    assert(e.endpoint.hops <= Tuning::maxHops + 1);
 | 
			
		||||
    // This has security implications without a shuffle
 | 
			
		||||
    m_lists[e.endpoint.hops].push_front(e);
 | 
			
		||||
    ++m_hist[e.endpoint.hops];
 | 
			
		||||
@@ -548,11 +549,13 @@ Livecache<Allocator>::hops_t::insert(Element& e)
 | 
			
		||||
 | 
			
		||||
template <class Allocator>
 | 
			
		||||
void
 | 
			
		||||
Livecache<Allocator>::hops_t::reinsert(Element& e, int numHops)
 | 
			
		||||
Livecache<Allocator>::hops_t::reinsert(Element& e, std::uint32_t numHops)
 | 
			
		||||
{
 | 
			
		||||
    assert(numHops >= 0 && numHops <= Tuning::maxHops + 1);
 | 
			
		||||
    list_type& list(m_lists[e.endpoint.hops]);
 | 
			
		||||
    assert(numHops <= Tuning::maxHops + 1);
 | 
			
		||||
 | 
			
		||||
    auto& list = m_lists[e.endpoint.hops];
 | 
			
		||||
    list.erase(list.iterator_to(e));
 | 
			
		||||
 | 
			
		||||
    --m_hist[e.endpoint.hops];
 | 
			
		||||
 | 
			
		||||
    e.endpoint.hops = numHops;
 | 
			
		||||
@@ -564,7 +567,8 @@ void
 | 
			
		||||
Livecache<Allocator>::hops_t::remove(Element& e)
 | 
			
		||||
{
 | 
			
		||||
    --m_hist[e.endpoint.hops];
 | 
			
		||||
    list_type& list(m_lists[e.endpoint.hops]);
 | 
			
		||||
 | 
			
		||||
    auto& list = m_lists[e.endpoint.hops];
 | 
			
		||||
    list.erase(list.iterator_to(e));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -762,6 +762,13 @@ public:
 | 
			
		||||
    void
 | 
			
		||||
    on_endpoints(SlotImp::ptr const& slot, Endpoints list)
 | 
			
		||||
    {
 | 
			
		||||
        // If we're sent too many endpoints, sample them at random:
 | 
			
		||||
        if (list.size() > Tuning::numberOfEndpointsMax)
 | 
			
		||||
        {
 | 
			
		||||
            std::shuffle(list.begin(), list.end(), default_prng());
 | 
			
		||||
            list.resize(Tuning::numberOfEndpointsMax);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        JLOG(m_journal.trace())
 | 
			
		||||
            << beast::leftw(18) << "Endpoints from " << slot->remote_endpoint()
 | 
			
		||||
            << " contained " << list.size()
 | 
			
		||||
 
 | 
			
		||||
@@ -102,7 +102,7 @@ SlotImp::recent_t::recent_t(clock_type& clock) : cache(clock)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, int hops)
 | 
			
		||||
SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, std::uint32_t hops)
 | 
			
		||||
{
 | 
			
		||||
    auto const result(cache.emplace(ep, hops));
 | 
			
		||||
    if (!result.second)
 | 
			
		||||
@@ -117,7 +117,7 @@ SlotImp::recent_t::insert(beast::IP::Endpoint const& ep, int hops)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool
 | 
			
		||||
SlotImp::recent_t::filter(beast::IP::Endpoint const& ep, int hops)
 | 
			
		||||
SlotImp::recent_t::filter(beast::IP::Endpoint const& ep, std::uint32_t hops)
 | 
			
		||||
{
 | 
			
		||||
    auto const iter(cache.find(ep));
 | 
			
		||||
    if (iter == cache.end())
 | 
			
		||||
 
 | 
			
		||||
@@ -32,9 +32,6 @@ namespace PeerFinder {
 | 
			
		||||
 | 
			
		||||
class SlotImp : public Slot
 | 
			
		||||
{
 | 
			
		||||
private:
 | 
			
		||||
    using recent_type = beast::aged_unordered_map<beast::IP::Endpoint, int>;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    using ptr = std::shared_ptr<SlotImp>;
 | 
			
		||||
 | 
			
		||||
@@ -155,18 +152,18 @@ public:
 | 
			
		||||
            sending a slot the same address too frequently.
 | 
			
		||||
        */
 | 
			
		||||
        void
 | 
			
		||||
        insert(beast::IP::Endpoint const& ep, int hops);
 | 
			
		||||
        insert(beast::IP::Endpoint const& ep, std::uint32_t hops);
 | 
			
		||||
 | 
			
		||||
        /** Returns `true` if we should not send endpoint to the slot. */
 | 
			
		||||
        bool
 | 
			
		||||
        filter(beast::IP::Endpoint const& ep, int hops);
 | 
			
		||||
        filter(beast::IP::Endpoint const& ep, std::uint32_t hops);
 | 
			
		||||
 | 
			
		||||
    private:
 | 
			
		||||
        void
 | 
			
		||||
        expire();
 | 
			
		||||
 | 
			
		||||
        friend class SlotImp;
 | 
			
		||||
        recent_type cache;
 | 
			
		||||
        beast::aged_unordered_map<beast::IP::Endpoint, std::uint32_t> cache;
 | 
			
		||||
    } recent;
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
 
 | 
			
		||||
@@ -106,43 +106,30 @@ static std::chrono::seconds const bootcacheCooldownTime(60);
 | 
			
		||||
//
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
enum {
 | 
			
		||||
    // Drop incoming messages with hops greater than this number
 | 
			
		||||
    maxHops = 6
 | 
			
		||||
// Drop incoming messages with hops greater than this number
 | 
			
		||||
std::uint32_t constexpr maxHops = 6;
 | 
			
		||||
 | 
			
		||||
    // How many Endpoint to send in each mtENDPOINTS
 | 
			
		||||
    ,
 | 
			
		||||
    numberOfEndpoints = 2 * maxHops
 | 
			
		||||
// How many Endpoint to send in each mtENDPOINTS
 | 
			
		||||
std::uint32_t constexpr numberOfEndpoints = 2 * maxHops;
 | 
			
		||||
 | 
			
		||||
    // The most Endpoint we will accept in mtENDPOINTS
 | 
			
		||||
    ,
 | 
			
		||||
    numberOfEndpointsMax = 20
 | 
			
		||||
// The most Endpoint we will accept in mtENDPOINTS
 | 
			
		||||
std::uint32_t constexpr numberOfEndpointsMax =
 | 
			
		||||
    std::max<decltype(numberOfEndpoints)>(numberOfEndpoints * 2, 64);
 | 
			
		||||
 | 
			
		||||
    // The number of peers that we want by default, unless an
 | 
			
		||||
    // explicit value is set in the config file.
 | 
			
		||||
    ,
 | 
			
		||||
    defaultMaxPeerCount = 21
 | 
			
		||||
 | 
			
		||||
    /** Number of addresses we provide when redirecting. */
 | 
			
		||||
    ,
 | 
			
		||||
    redirectEndpointCount = 10
 | 
			
		||||
};
 | 
			
		||||
// Number of addresses we provide when redirecting.
 | 
			
		||||
std::uint32_t constexpr redirectEndpointCount = 10;
 | 
			
		||||
 | 
			
		||||
// How often we send or accept mtENDPOINTS messages per peer
 | 
			
		||||
// (we use a prime number of purpose)
 | 
			
		||||
static std::chrono::seconds const secondsPerMessage(61);
 | 
			
		||||
std::chrono::seconds constexpr secondsPerMessage(151);
 | 
			
		||||
 | 
			
		||||
// How long an Endpoint will stay in the cache
 | 
			
		||||
// This should be a small multiple of the broadcast frequency
 | 
			
		||||
static std::chrono::seconds const liveCacheSecondsToLive(30);
 | 
			
		||||
 | 
			
		||||
//
 | 
			
		||||
//
 | 
			
		||||
//
 | 
			
		||||
std::chrono::seconds constexpr liveCacheSecondsToLive(30);
 | 
			
		||||
 | 
			
		||||
// How much time to wait before trying an outgoing address again.
 | 
			
		||||
// Note that we ignore the port for purposes of comparison.
 | 
			
		||||
static std::chrono::seconds const recentAttemptDuration(60);
 | 
			
		||||
std::chrono::seconds constexpr recentAttemptDuration(60);
 | 
			
		||||
 | 
			
		||||
}  // namespace Tuning
 | 
			
		||||
/** @} */
 | 
			
		||||
 
 | 
			
		||||
@@ -48,7 +48,7 @@ public:
 | 
			
		||||
    // Add the address as an endpoint
 | 
			
		||||
    template <class C>
 | 
			
		||||
    inline void
 | 
			
		||||
    add(beast::IP::Endpoint ep, C& c, int hops = 0)
 | 
			
		||||
    add(beast::IP::Endpoint ep, C& c, std::uint32_t hops = 0)
 | 
			
		||||
    {
 | 
			
		||||
        Endpoint cep{ep, hops};
 | 
			
		||||
        c.insert(cep);
 | 
			
		||||
@@ -139,7 +139,7 @@ public:
 | 
			
		||||
        for (auto i = 0; i < num_eps; ++i)
 | 
			
		||||
            add(beast::IP::randomEP(true),
 | 
			
		||||
                c,
 | 
			
		||||
                ripple::rand_int(0, safe_cast<int>(Tuning::maxHops + 1)));
 | 
			
		||||
                ripple::rand_int<std::uint32_t>());
 | 
			
		||||
        auto h = c.hops.histogram();
 | 
			
		||||
        if (!BEAST_EXPECT(!h.empty()))
 | 
			
		||||
            return;
 | 
			
		||||
@@ -163,7 +163,7 @@ public:
 | 
			
		||||
        for (auto i = 0; i < 100; ++i)
 | 
			
		||||
            add(beast::IP::randomEP(true),
 | 
			
		||||
                c,
 | 
			
		||||
                ripple::rand_int(0, safe_cast<int>(Tuning::maxHops + 1)));
 | 
			
		||||
                ripple::rand_int(Tuning::maxHops + 1));
 | 
			
		||||
 | 
			
		||||
        using at_hop = std::vector<ripple::PeerFinder::Endpoint>;
 | 
			
		||||
        using all_hops = std::array<at_hop, 1 + Tuning::maxHops + 1>;
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user