mirror of
				https://github.com/XRPLF/rippled.git
				synced 2025-11-04 11:15:56 +00:00 
			
		
		
		
	Compare commits
	
		
			15 Commits
		
	
	
		
			a1q123456/
			...
			tapanito/e
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					c142e1d86b | ||
| 
						 | 
					feef06393a | ||
| 
						 | 
					e80f46f296 | ||
| 
						 | 
					72b19a1c89 | ||
| 
						 | 
					2a9c38693a | ||
| 
						 | 
					261cf0c74c | ||
| 
						 | 
					a038b70bf4 | ||
| 
						 | 
					d4c6910c8b | ||
| 
						 | 
					9ecb457e55 | ||
| 
						 | 
					71871bb9b6 | ||
| 
						 | 
					ba536ebfd8 | ||
| 
						 | 
					1e02961a63 | ||
| 
						 | 
					5613dab898 | ||
| 
						 | 
					69aec23e1b | ||
| 
						 | 
					34c3591554 | 
@@ -17,8 +17,8 @@
 | 
			
		||||
*/
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#include <test/jtx.h>
 | 
			
		||||
#include <test/jtx/Env.h>
 | 
			
		||||
#include <test/overlay/clock.h>
 | 
			
		||||
 | 
			
		||||
#include <xrpld/overlay/Message.h>
 | 
			
		||||
#include <xrpld/overlay/Peer.h>
 | 
			
		||||
@@ -33,8 +33,8 @@
 | 
			
		||||
 | 
			
		||||
#include <boost/thread.hpp>
 | 
			
		||||
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <iostream>
 | 
			
		||||
#include <algorithm>
 | 
			
		||||
#include <iterator>
 | 
			
		||||
#include <numeric>
 | 
			
		||||
#include <optional>
 | 
			
		||||
 | 
			
		||||
@@ -191,52 +191,6 @@ public:
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
/** Manually advanced clock. */
 | 
			
		||||
class ManualClock
 | 
			
		||||
{
 | 
			
		||||
public:
 | 
			
		||||
    typedef uint64_t rep;
 | 
			
		||||
    typedef std::milli period;
 | 
			
		||||
    typedef std::chrono::duration<std::uint32_t, period> duration;
 | 
			
		||||
    typedef std::chrono::time_point<ManualClock> time_point;
 | 
			
		||||
    inline static bool const is_steady = false;
 | 
			
		||||
 | 
			
		||||
    static void
 | 
			
		||||
    advance(duration d) noexcept
 | 
			
		||||
    {
 | 
			
		||||
        now_ += d;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static void
 | 
			
		||||
    randAdvance(milliseconds min, milliseconds max)
 | 
			
		||||
    {
 | 
			
		||||
        now_ += randDuration(min, max);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static void
 | 
			
		||||
    reset() noexcept
 | 
			
		||||
    {
 | 
			
		||||
        now_ = time_point(seconds(0));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static time_point
 | 
			
		||||
    now() noexcept
 | 
			
		||||
    {
 | 
			
		||||
        return now_;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static duration
 | 
			
		||||
    randDuration(milliseconds min, milliseconds max)
 | 
			
		||||
    {
 | 
			
		||||
        return duration(milliseconds(rand_int(min.count(), max.count())));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    explicit ManualClock() = default;
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
    inline static time_point now_ = time_point(seconds(0));
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
/** Simulate server's OverlayImpl */
 | 
			
		||||
class Overlay
 | 
			
		||||
{
 | 
			
		||||
@@ -249,8 +203,7 @@ public:
 | 
			
		||||
        uint256 const& key,
 | 
			
		||||
        PublicKey const& validator,
 | 
			
		||||
        Peer::id_t id,
 | 
			
		||||
        SquelchCB f,
 | 
			
		||||
        protocol::MessageType type = protocol::mtVALIDATION) = 0;
 | 
			
		||||
        SquelchCB f) = 0;
 | 
			
		||||
 | 
			
		||||
    virtual void deleteIdlePeers(UnsquelchCB) = 0;
 | 
			
		||||
 | 
			
		||||
@@ -538,8 +491,14 @@ public:
 | 
			
		||||
    std::uint16_t
 | 
			
		||||
    inState(PublicKey const& validator, reduce_relay::PeerState state)
 | 
			
		||||
    {
 | 
			
		||||
        auto res = slots_.inState(validator, state);
 | 
			
		||||
        return res ? *res : 0;
 | 
			
		||||
        auto const& it = slots_.slots_.find(validator);
 | 
			
		||||
        if (it != slots_.slots_.end())
 | 
			
		||||
            return std::count_if(
 | 
			
		||||
                it->second.peers_.begin(),
 | 
			
		||||
                it->second.peers_.end(),
 | 
			
		||||
                [&](auto const& it) { return (it.second.state == state); });
 | 
			
		||||
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
@@ -547,11 +506,10 @@ public:
 | 
			
		||||
        uint256 const& key,
 | 
			
		||||
        PublicKey const& validator,
 | 
			
		||||
        Peer::id_t id,
 | 
			
		||||
        SquelchCB f,
 | 
			
		||||
        protocol::MessageType type = protocol::mtVALIDATION) override
 | 
			
		||||
        SquelchCB f) override
 | 
			
		||||
    {
 | 
			
		||||
        squelch_ = f;
 | 
			
		||||
        slots_.updateSlotAndSquelch(key, validator, id, type);
 | 
			
		||||
        slots_.updateSlotAndSquelch(key, validator, id, true);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
@@ -632,40 +590,56 @@ public:
 | 
			
		||||
    bool
 | 
			
		||||
    isCountingState(PublicKey const& validator)
 | 
			
		||||
    {
 | 
			
		||||
        return slots_.inState(validator, reduce_relay::SlotState::Counting);
 | 
			
		||||
        auto const& it = slots_.slots_.find(validator);
 | 
			
		||||
        if (it != slots_.slots_.end())
 | 
			
		||||
            return it->second.state_ == reduce_relay::SlotState::Counting;
 | 
			
		||||
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::set<id_t>
 | 
			
		||||
    getSelected(PublicKey const& validator)
 | 
			
		||||
    {
 | 
			
		||||
        return slots_.getSelected(validator);
 | 
			
		||||
        auto const& it = slots_.slots_.find(validator);
 | 
			
		||||
        if (it == slots_.slots_.end())
 | 
			
		||||
            return {};
 | 
			
		||||
 | 
			
		||||
        std::set<id_t> r;
 | 
			
		||||
        for (auto const& [id, info] : it->second.peers_)
 | 
			
		||||
            if (info.state == reduce_relay::PeerState::Selected)
 | 
			
		||||
                r.insert(id);
 | 
			
		||||
 | 
			
		||||
        return r;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    bool
 | 
			
		||||
    isSelected(PublicKey const& validator, Peer::id_t peer)
 | 
			
		||||
    {
 | 
			
		||||
        auto selected = slots_.getSelected(validator);
 | 
			
		||||
        auto selected = getSelected(validator);
 | 
			
		||||
        return selected.find(peer) != selected.end();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    id_t
 | 
			
		||||
    getSelectedPeer(PublicKey const& validator)
 | 
			
		||||
    {
 | 
			
		||||
        auto selected = slots_.getSelected(validator);
 | 
			
		||||
        auto selected = getSelected(validator);
 | 
			
		||||
        assert(selected.size());
 | 
			
		||||
        return *selected.begin();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::unordered_map<
 | 
			
		||||
        id_t,
 | 
			
		||||
        std::tuple<
 | 
			
		||||
            reduce_relay::PeerState,
 | 
			
		||||
            std::uint16_t,
 | 
			
		||||
            std::uint32_t,
 | 
			
		||||
            std::uint32_t>>
 | 
			
		||||
    std::unordered_map<id_t, reduce_relay::Slot<clock_type>::PeerInfo>
 | 
			
		||||
    getPeers(PublicKey const& validator)
 | 
			
		||||
    {
 | 
			
		||||
        return slots_.getPeers(validator);
 | 
			
		||||
        auto const& it = slots_.slots_.find(validator);
 | 
			
		||||
        if (it == slots_.slots_.end())
 | 
			
		||||
            return {};
 | 
			
		||||
 | 
			
		||||
        auto r = std::
 | 
			
		||||
            unordered_map<id_t, reduce_relay::Slot<clock_type>::PeerInfo>();
 | 
			
		||||
        for (auto const& [id, info] : it->second.peers_)
 | 
			
		||||
            r.emplace(std::make_pair(id, info));
 | 
			
		||||
 | 
			
		||||
        return r;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::uint16_t
 | 
			
		||||
@@ -684,6 +658,13 @@ private:
 | 
			
		||||
        if (auto it = peers_.find(id); it != peers_.end())
 | 
			
		||||
            squelch_(validator, it->second, squelchDuration);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    squelchAll(PublicKey const& validator, std::uint32_t duration) override
 | 
			
		||||
    {
 | 
			
		||||
        for (auto const& [id, peer] : peers_)
 | 
			
		||||
            squelch_(validator, peer, duration);
 | 
			
		||||
    }
 | 
			
		||||
    void
 | 
			
		||||
    unsquelch(PublicKey const& validator, Peer::id_t id) const override
 | 
			
		||||
    {
 | 
			
		||||
@@ -874,8 +855,7 @@ public:
 | 
			
		||||
            for (auto& [_, v] : peers)
 | 
			
		||||
            {
 | 
			
		||||
                (void)_;
 | 
			
		||||
                if (std::get<reduce_relay::PeerState>(v) ==
 | 
			
		||||
                    reduce_relay::PeerState::Squelched)
 | 
			
		||||
                if (v.state == reduce_relay::PeerState::Squelched)
 | 
			
		||||
                    return false;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@@ -887,7 +867,7 @@ private:
 | 
			
		||||
    std::vector<Validator> validators_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class reduce_relay_test : public beast::unit_test::suite
 | 
			
		||||
class base_squelch_test : public beast::unit_test::suite
 | 
			
		||||
{
 | 
			
		||||
    using Slot = reduce_relay::Slot<ManualClock>;
 | 
			
		||||
    using id_t = Peer::id_t;
 | 
			
		||||
@@ -901,8 +881,7 @@ protected:
 | 
			
		||||
                  << "num peers " << (int)network_.overlay().getNumPeers()
 | 
			
		||||
                  << std::endl;
 | 
			
		||||
        for (auto& [k, v] : peers)
 | 
			
		||||
            std::cout << k << ":" << (int)std::get<reduce_relay::PeerState>(v)
 | 
			
		||||
                      << " ";
 | 
			
		||||
            std::cout << k << ":" << to_string(v.state) << " ";
 | 
			
		||||
        std::cout << std::endl;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -1074,7 +1053,10 @@ protected:
 | 
			
		||||
                        network_.overlay().isSelected(*event.key_, event.peer_);
 | 
			
		||||
                    auto peers = network_.overlay().getPeers(*event.key_);
 | 
			
		||||
                    auto d = reduce_relay::epoch<milliseconds>(now).count() -
 | 
			
		||||
                        std::get<3>(peers[event.peer_]);
 | 
			
		||||
                        reduce_relay::epoch<milliseconds>(
 | 
			
		||||
                            peers[event.peer_].lastMessage)
 | 
			
		||||
                            .count();
 | 
			
		||||
 | 
			
		||||
                    mustHandle = event.isSelected_ &&
 | 
			
		||||
                        d > milliseconds(reduce_relay::IDLED).count() &&
 | 
			
		||||
                        network_.overlay().inState(
 | 
			
		||||
@@ -1296,7 +1278,7 @@ protected:
 | 
			
		||||
                [&](PublicKey const& key, PeerWPtr const& peer) {
 | 
			
		||||
                    unsquelched++;
 | 
			
		||||
                });
 | 
			
		||||
            auto peers = network_.overlay().getPeers(network_.validator(0));
 | 
			
		||||
 | 
			
		||||
            BEAST_EXPECT(
 | 
			
		||||
                unsquelched ==
 | 
			
		||||
                MAX_PEERS -
 | 
			
		||||
@@ -1317,8 +1299,7 @@ protected:
 | 
			
		||||
            BEAST_EXPECT(propagateAndSquelch(log, true, false));
 | 
			
		||||
            auto peers = network_.overlay().getPeers(network_.validator(0));
 | 
			
		||||
            auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) {
 | 
			
		||||
                return std::get<reduce_relay::PeerState>(it.second) ==
 | 
			
		||||
                    reduce_relay::PeerState::Squelched;
 | 
			
		||||
                return it.second.state == reduce_relay::PeerState::Squelched;
 | 
			
		||||
            });
 | 
			
		||||
            assert(it != peers.end());
 | 
			
		||||
            std::uint16_t unsquelched = 0;
 | 
			
		||||
@@ -1514,7 +1495,7 @@ vp_base_squelch_max_selected_peers=2
 | 
			
		||||
            auto peers = network_.overlay().getPeers(network_.validator(0));
 | 
			
		||||
            // first message changes Slot state to Counting and is not counted,
 | 
			
		||||
            // hence '-1'.
 | 
			
		||||
            BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
 | 
			
		||||
            BEAST_EXPECT(peers[0].count == (nMessages - 1));
 | 
			
		||||
            // add duplicate
 | 
			
		||||
            uint256 key(nMessages - 1);
 | 
			
		||||
            network_.overlay().updateSlotAndSquelch(
 | 
			
		||||
@@ -1524,7 +1505,7 @@ vp_base_squelch_max_selected_peers=2
 | 
			
		||||
                [&](PublicKey const&, PeerWPtr, std::uint32_t) {});
 | 
			
		||||
            // confirm the same number of messages
 | 
			
		||||
            peers = network_.overlay().getPeers(network_.validator(0));
 | 
			
		||||
            BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
 | 
			
		||||
            BEAST_EXPECT(peers[0].count == (nMessages - 1));
 | 
			
		||||
            // advance the clock
 | 
			
		||||
            ManualClock::advance(reduce_relay::IDLED + seconds(1));
 | 
			
		||||
            network_.overlay().updateSlotAndSquelch(
 | 
			
		||||
@@ -1534,7 +1515,7 @@ vp_base_squelch_max_selected_peers=2
 | 
			
		||||
                [&](PublicKey const&, PeerWPtr, std::uint32_t) {});
 | 
			
		||||
            peers = network_.overlay().getPeers(network_.validator(0));
 | 
			
		||||
            // confirm message number increased
 | 
			
		||||
            BEAST_EXPECT(std::get<1>(peers[0]) == nMessages);
 | 
			
		||||
            BEAST_EXPECT(peers[0].count == nMessages);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -1550,6 +1531,12 @@ vp_base_squelch_max_selected_peers=2
 | 
			
		||||
            if (duration > maxDuration_)
 | 
			
		||||
                maxDuration_ = duration;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        void
 | 
			
		||||
        squelchAll(PublicKey const&, std::uint32_t) override
 | 
			
		||||
        {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        void
 | 
			
		||||
        unsquelch(PublicKey const&, Peer::id_t) const override
 | 
			
		||||
        {
 | 
			
		||||
@@ -1582,10 +1569,7 @@ vp_base_squelch_max_selected_peers=2
 | 
			
		||||
                        std::uint64_t mid = m * 1000 + peer;
 | 
			
		||||
                        uint256 const message{mid};
 | 
			
		||||
                        slots.updateSlotAndSquelch(
 | 
			
		||||
                            message,
 | 
			
		||||
                            validator,
 | 
			
		||||
                            peer,
 | 
			
		||||
                            protocol::MessageType::mtVALIDATION);
 | 
			
		||||
                            message, validator, peer, true);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                // make Slot's internal hash router expire all messages
 | 
			
		||||
@@ -1703,7 +1687,7 @@ vp_base_squelch_max_selected_peers=2
 | 
			
		||||
    Network network_;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    reduce_relay_test()
 | 
			
		||||
    base_squelch_test()
 | 
			
		||||
        : env_(*this, jtx::envconfig([](std::unique_ptr<Config> cfg) {
 | 
			
		||||
            cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = true;
 | 
			
		||||
            cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6;
 | 
			
		||||
@@ -1716,7 +1700,7 @@ public:
 | 
			
		||||
    void
 | 
			
		||||
    run() override
 | 
			
		||||
    {
 | 
			
		||||
        bool log = false;
 | 
			
		||||
        bool log = true;
 | 
			
		||||
        testConfig(log);
 | 
			
		||||
        testInitialRound(log);
 | 
			
		||||
        testPeerUnsquelchedTooSoon(log);
 | 
			
		||||
@@ -1732,7 +1716,7 @@ public:
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class reduce_relay_simulate_test : public reduce_relay_test
 | 
			
		||||
class base_squelch_simulate_test : public base_squelch_test
 | 
			
		||||
{
 | 
			
		||||
    void
 | 
			
		||||
    testRandom(bool log)
 | 
			
		||||
@@ -1748,8 +1732,8 @@ class reduce_relay_simulate_test : public reduce_relay_test
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
BEAST_DEFINE_TESTSUITE(reduce_relay, ripple_data, ripple);
 | 
			
		||||
BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, ripple_data, ripple);
 | 
			
		||||
BEAST_DEFINE_TESTSUITE(base_squelch, ripple_data, ripple);
 | 
			
		||||
BEAST_DEFINE_TESTSUITE_MANUAL(base_squelch_simulate, ripple_data, ripple);
 | 
			
		||||
 | 
			
		||||
}  // namespace test
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										83
									
								
								src/test/overlay/clock.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										83
									
								
								src/test/overlay/clock.h
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,83 @@
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
/*
 | 
			
		||||
    This file is part of rippled: https://github.com/ripple/rippled
 | 
			
		||||
    Copyright (c) 2025 Ripple Labs Inc.
 | 
			
		||||
 | 
			
		||||
    Permission to use, copy, modify, and/or distribute this software for any
 | 
			
		||||
    purpose  with  or without fee is hereby granted, provided that the above
 | 
			
		||||
    copyright notice and this permission notice appear in all copies.
 | 
			
		||||
 | 
			
		||||
    THE  SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 | 
			
		||||
    WITH  REGARD  TO  THIS  SOFTWARE  INCLUDING  ALL  IMPLIED  WARRANTIES  OF
 | 
			
		||||
    MERCHANTABILITY  AND  FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 | 
			
		||||
    ANY  SPECIAL ,  DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 | 
			
		||||
    WHATSOEVER  RESULTING  FROM  LOSS  OF USE, DATA OR PROFITS, WHETHER IN AN
 | 
			
		||||
    ACTION  OF  CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 | 
			
		||||
    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | 
			
		||||
*/
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#ifndef RIPPLE_TEST_OVERLAY_CLOCK_H_INCLUDED
 | 
			
		||||
#define RIPPLE_TEST_OVERLAY_CLOCK_H_INCLUDED
 | 
			
		||||
 | 
			
		||||
#include <xrpl/basics/random.h>
 | 
			
		||||
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <ratio>
 | 
			
		||||
 | 
			
		||||
namespace ripple {
 | 
			
		||||
 | 
			
		||||
namespace test {
 | 
			
		||||
 | 
			
		||||
using namespace std::chrono;
 | 
			
		||||
 | 
			
		||||
/** Manually advanced clock. */
 | 
			
		||||
class ManualClock
 | 
			
		||||
{
 | 
			
		||||
public:
 | 
			
		||||
    typedef uint64_t rep;
 | 
			
		||||
    typedef std::milli period;
 | 
			
		||||
    typedef std::chrono::duration<std::uint32_t, period> duration;
 | 
			
		||||
    typedef std::chrono::time_point<ManualClock> time_point;
 | 
			
		||||
    inline static bool const is_steady = false;
 | 
			
		||||
 | 
			
		||||
    static void
 | 
			
		||||
    advance(duration d) noexcept
 | 
			
		||||
    {
 | 
			
		||||
        now_ += d;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static void
 | 
			
		||||
    randAdvance(milliseconds min, milliseconds max)
 | 
			
		||||
    {
 | 
			
		||||
        now_ += randDuration(min, max);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static void
 | 
			
		||||
    reset() noexcept
 | 
			
		||||
    {
 | 
			
		||||
        now_ = time_point(seconds(0));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static time_point
 | 
			
		||||
    now() noexcept
 | 
			
		||||
    {
 | 
			
		||||
        return now_;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static duration
 | 
			
		||||
    randDuration(milliseconds min, milliseconds max)
 | 
			
		||||
    {
 | 
			
		||||
        return duration(milliseconds(rand_int(min.count(), max.count())));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    explicit ManualClock() = default;
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
    inline static time_point now_ = time_point(seconds(0));
 | 
			
		||||
};
 | 
			
		||||
}  // namespace test
 | 
			
		||||
}  // namespace ripple
 | 
			
		||||
 | 
			
		||||
#endif
 | 
			
		||||
							
								
								
									
										759
									
								
								src/test/overlay/enhanced_squelch_test.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										759
									
								
								src/test/overlay/enhanced_squelch_test.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,759 @@
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
/*
 | 
			
		||||
    This file is part of rippled: https://github.com/ripple/rippled
 | 
			
		||||
    Copyright (c) 2025 Ripple Labs Inc.
 | 
			
		||||
 | 
			
		||||
    Permission to use, copy, modify, and/or distribute this software for any
 | 
			
		||||
    purpose  with  or without fee is hereby granted, provided that the above
 | 
			
		||||
    copyright notice and this permission notice appear in all copies.
 | 
			
		||||
 | 
			
		||||
    THE  SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 | 
			
		||||
    WITH  REGARD  TO  THIS  SOFTWARE  INCLUDING  ALL  IMPLIED  WARRANTIES  OF
 | 
			
		||||
    MERCHANTABILITY  AND  FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 | 
			
		||||
    ANY  SPECIAL ,  DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 | 
			
		||||
    WHATSOEVER  RESULTING  FROM  LOSS  OF USE, DATA OR PROFITS, WHETHER IN AN
 | 
			
		||||
    ACTION  OF  CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 | 
			
		||||
    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | 
			
		||||
*/
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#include <test/jtx/Env.h>
 | 
			
		||||
 | 
			
		||||
#include <xrpld/overlay/Slot.h>
 | 
			
		||||
 | 
			
		||||
#include <xrpl/beast/unit_test.h>
 | 
			
		||||
#include <xrpl/protocol/SecretKey.h>
 | 
			
		||||
 | 
			
		||||
#include "test/overlay/clock.h"
 | 
			
		||||
#include "xrpld/overlay/Peer.h"
 | 
			
		||||
#include "xrpld/overlay/ReduceRelayCommon.h"
 | 
			
		||||
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <optional>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
namespace ripple {
 | 
			
		||||
namespace test {
 | 
			
		||||
 | 
			
		||||
class TestHandler : public reduce_relay::SquelchHandler
 | 
			
		||||
{
 | 
			
		||||
public:
 | 
			
		||||
    using squelch_method =
 | 
			
		||||
        std::function<void(PublicKey const&, Peer::id_t, std::uint32_t)>;
 | 
			
		||||
    using squelchAll_method =
 | 
			
		||||
        std::function<void(PublicKey const&, std::uint32_t)>;
 | 
			
		||||
    using unsquelch_method = std::function<void(PublicKey const&, Peer::id_t)>;
 | 
			
		||||
 | 
			
		||||
    squelch_method squelch_f_;
 | 
			
		||||
    squelchAll_method squelchAll_f_;
 | 
			
		||||
    unsquelch_method unsquelch_f_;
 | 
			
		||||
 | 
			
		||||
    TestHandler(
 | 
			
		||||
        squelch_method const& squelch_f,
 | 
			
		||||
        squelchAll_method const& squelchAll_f,
 | 
			
		||||
        unsquelch_method const& unsquelch_f)
 | 
			
		||||
        : squelch_f_(squelch_f)
 | 
			
		||||
        , squelchAll_f_(squelchAll_f)
 | 
			
		||||
        , unsquelch_f_(unsquelch_f)
 | 
			
		||||
    {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    TestHandler(TestHandler& copy)
 | 
			
		||||
    {
 | 
			
		||||
        squelch_f_ = copy.squelch_f_;
 | 
			
		||||
        squelchAll_f_ = copy.squelchAll_f_;
 | 
			
		||||
        unsquelch_f_ = copy.unsquelch_f_;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    squelch(PublicKey const& validator, Peer::id_t peer, std::uint32_t duration)
 | 
			
		||||
        const override
 | 
			
		||||
    {
 | 
			
		||||
        squelch_f_(validator, peer, duration);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    squelchAll(PublicKey const& validator, std::uint32_t duration) override
 | 
			
		||||
    {
 | 
			
		||||
        squelchAll_f_(validator, duration);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    unsquelch(PublicKey const& validator, Peer::id_t peer) const override
 | 
			
		||||
    {
 | 
			
		||||
        unsquelch_f_(validator, peer);
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class enhanced_squelch_test : public beast::unit_test::suite
 | 
			
		||||
{
 | 
			
		||||
public:
 | 
			
		||||
    TestHandler::squelch_method noop_squelch =
 | 
			
		||||
        [&](PublicKey const&, Peer::id_t, std::uint32_t) {
 | 
			
		||||
            BEAST_EXPECTS(false, "unexpected call to squelch handler");
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
    TestHandler::squelchAll_method noop_squelchAll = [&](PublicKey const&,
 | 
			
		||||
                                                         std::uint32_t) {
 | 
			
		||||
        BEAST_EXPECTS(false, "unexpected call to squelchAll handler");
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    TestHandler::unsquelch_method noop_unsquelch = [&](PublicKey const&,
 | 
			
		||||
                                                       Peer::id_t) {
 | 
			
		||||
        BEAST_EXPECTS(false, "unexpected call to unsquelch handler");
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    // noop_handler is passed as a place holder Handler to slots
 | 
			
		||||
    TestHandler noop_handler = {
 | 
			
		||||
        noop_squelch,
 | 
			
		||||
        noop_squelchAll,
 | 
			
		||||
        noop_unsquelch,
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    jtx::Env env_;
 | 
			
		||||
 | 
			
		||||
    enhanced_squelch_test() : env_(*this)
 | 
			
		||||
    {
 | 
			
		||||
        env_.app().config().VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    testConfig()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("Test Config - enabled enhanced squelching");
 | 
			
		||||
        Config c;
 | 
			
		||||
 | 
			
		||||
        std::string toLoad(R"rippleConfig(
 | 
			
		||||
[reduce_relay]
 | 
			
		||||
vp_enhanced_squelch_enable=1
 | 
			
		||||
)rippleConfig");
 | 
			
		||||
 | 
			
		||||
        c.loadFromString(toLoad);
 | 
			
		||||
        BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == true);
 | 
			
		||||
 | 
			
		||||
        toLoad = R"rippleConfig(
 | 
			
		||||
[reduce_relay]
 | 
			
		||||
vp_enhanced_squelch_enable=0
 | 
			
		||||
)rippleConfig";
 | 
			
		||||
 | 
			
		||||
        c.loadFromString(toLoad);
 | 
			
		||||
        BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == false);
 | 
			
		||||
 | 
			
		||||
        toLoad = R"rippleConfig(
 | 
			
		||||
[reduce_relay]
 | 
			
		||||
)rippleConfig";
 | 
			
		||||
 | 
			
		||||
        c.loadFromString(toLoad);
 | 
			
		||||
        BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == false);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Tests tracking for squelched validators and peers */
 | 
			
		||||
    void
 | 
			
		||||
    testSquelchTracking()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("squelchTracking");
 | 
			
		||||
        Peer::id_t squelchedPeerID = 0;
 | 
			
		||||
        Peer::id_t newPeerID = 1;
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), noop_handler, env_.app().config());
 | 
			
		||||
        auto const publicKey = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
 | 
			
		||||
        // a new key should not be squelched
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.validatorSquelched(publicKey), "validator squelched");
 | 
			
		||||
 | 
			
		||||
        slots.squelchValidator(publicKey, squelchedPeerID);
 | 
			
		||||
 | 
			
		||||
        // after squelching a peer, the validator must be squelched
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.validatorSquelched(publicKey), "validator not squelched");
 | 
			
		||||
 | 
			
		||||
        // the peer must also be squelched
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.peerSquelched(publicKey, squelchedPeerID),
 | 
			
		||||
            "peer not squelched");
 | 
			
		||||
 | 
			
		||||
        // a new peer must not be squelched
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.peerSquelched(publicKey, newPeerID), "new peer squelched");
 | 
			
		||||
 | 
			
		||||
        // advance the manual clock to after expiration
 | 
			
		||||
        ManualClock::advance(
 | 
			
		||||
            reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT +
 | 
			
		||||
            std::chrono::seconds{11});
 | 
			
		||||
 | 
			
		||||
        // validator squelch should expire
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.validatorSquelched(publicKey),
 | 
			
		||||
            "validator squelched after expiry");
 | 
			
		||||
 | 
			
		||||
        // peer squelch should also expire
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.peerSquelched(publicKey, squelchedPeerID),
 | 
			
		||||
            "validator squelched after expiry");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    testUpdateValidatorSlot_newValidator()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("updateValidatorSlot_newValidator");
 | 
			
		||||
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), noop_handler, env_.app().config());
 | 
			
		||||
 | 
			
		||||
        Peer::id_t const peerID = 1;
 | 
			
		||||
        auto const validator = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
        uint256 message{0};
 | 
			
		||||
 | 
			
		||||
        slots.updateValidatorSlot(message, validator, peerID);
 | 
			
		||||
 | 
			
		||||
        // adding untrusted slot does not effect trusted slots
 | 
			
		||||
        BEAST_EXPECTS(slots.slots_.size() == 0, "trusted slots changed");
 | 
			
		||||
 | 
			
		||||
        // we expect that the validator was not added to untrusted slots
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.untrusted_slots_.size() == 0, "untrusted slot changed");
 | 
			
		||||
 | 
			
		||||
        // we expect that the validator was added to th consideration list
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.considered_validators_.contains(validator),
 | 
			
		||||
            "new validator was not considered");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    testUpdateValidatorSlot_squelchedValidator()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("testUpdateValidatorSlot_squelchedValidator");
 | 
			
		||||
 | 
			
		||||
        Peer::id_t squelchedPeerID = 0;
 | 
			
		||||
        Peer::id_t newPeerID = 1;
 | 
			
		||||
        auto const validator = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
 | 
			
		||||
        TestHandler::squelch_method const squelch_f =
 | 
			
		||||
            [&](PublicKey const& key, Peer::id_t id, std::uint32_t duration) {
 | 
			
		||||
                BEAST_EXPECTS(
 | 
			
		||||
                    key == validator,
 | 
			
		||||
                    "squelch called for unknown validator key");
 | 
			
		||||
 | 
			
		||||
                BEAST_EXPECTS(
 | 
			
		||||
                    id == newPeerID, "squelch called for the wrong peer");
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
        TestHandler handler{squelch_f, noop_squelchAll, noop_unsquelch};
 | 
			
		||||
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), handler, env_.app().config());
 | 
			
		||||
 | 
			
		||||
        slots.squelchValidator(validator, squelchedPeerID);
 | 
			
		||||
 | 
			
		||||
        // this should not trigger squelch assertions, the peer is squelched
 | 
			
		||||
        slots.updateValidatorSlot(
 | 
			
		||||
            sha512Half(validator), validator, squelchedPeerID);
 | 
			
		||||
 | 
			
		||||
        slots.updateValidatorSlot(sha512Half(validator), validator, newPeerID);
 | 
			
		||||
 | 
			
		||||
        // the squelched peer remained squelched
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.peerSquelched(validator, squelchedPeerID),
 | 
			
		||||
            "peer not squelched");
 | 
			
		||||
 | 
			
		||||
        // because the validator was squelched, the new peer was also squelched
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.peerSquelched(validator, newPeerID),
 | 
			
		||||
            "new peer was not squelched");
 | 
			
		||||
 | 
			
		||||
        // a squelched validator must not be considered
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.considered_validators_.contains(validator),
 | 
			
		||||
            "squelched validator was added for consideration");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    testUpdateValidatorSlot_slotsFull()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("updateValidatorSlot_slotsFull");
 | 
			
		||||
        Peer::id_t const peerID = 1;
 | 
			
		||||
 | 
			
		||||
        // while there are open untrusted slots, no calls should be made to
 | 
			
		||||
        // squelch any validators
 | 
			
		||||
        TestHandler handler{noop_handler};
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), handler, env_.app().config());
 | 
			
		||||
 | 
			
		||||
        // saturate validator slots
 | 
			
		||||
        auto const validators = fillUntrustedSlots(slots);
 | 
			
		||||
 | 
			
		||||
        // adding untrusted slot does not effect trusted slots
 | 
			
		||||
        BEAST_EXPECTS(slots.slots_.size() == 0, "trusted slots changed");
 | 
			
		||||
 | 
			
		||||
        // simulate additional messages from already selected validators
 | 
			
		||||
        for (auto const& validator : validators)
 | 
			
		||||
            for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD; ++i)
 | 
			
		||||
                slots.updateValidatorSlot(
 | 
			
		||||
                    sha512Half(validator) + static_cast<uint256>(i),
 | 
			
		||||
                    validator,
 | 
			
		||||
                    peerID);
 | 
			
		||||
 | 
			
		||||
        // an untrusted slot was added for each validator
 | 
			
		||||
        BEAST_EXPECT(
 | 
			
		||||
            slots.untrusted_slots_.size() ==
 | 
			
		||||
            env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
 | 
			
		||||
 | 
			
		||||
        for (auto const& validator : validators)
 | 
			
		||||
            BEAST_EXPECTS(
 | 
			
		||||
                !slots.validatorSquelched(validator),
 | 
			
		||||
                "selected validator was squelched");
 | 
			
		||||
 | 
			
		||||
        auto const newValidator = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
 | 
			
		||||
        // once slots are full squelchAll must be called for new peer/validator
 | 
			
		||||
        handler.squelchAll_f_ = [&](PublicKey const& key, std::uint32_t) {
 | 
			
		||||
            BEAST_EXPECTS(
 | 
			
		||||
                key == newValidator, "unexpected validator squelched");
 | 
			
		||||
            slots.squelchValidator(key, peerID);
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        slots.updateValidatorSlot(
 | 
			
		||||
            sha512Half(newValidator), newValidator, peerID);
 | 
			
		||||
 | 
			
		||||
        // Once the slots are saturated every other validator is squelched
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.validatorSquelched(newValidator),
 | 
			
		||||
            "untrusted validator not squelched");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.peerSquelched(newValidator, peerID),
 | 
			
		||||
            "peer for untrusted validator not squelched");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    testDeleteIdlePeers_deleteIdleSlots()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("deleteIdlePeers");
 | 
			
		||||
        TestHandler handler{noop_handler};
 | 
			
		||||
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), handler, env_.app().config());
 | 
			
		||||
        auto keys = fillUntrustedSlots(slots);
 | 
			
		||||
 | 
			
		||||
        //  verify that squelchAll is called for each idled slot validator
 | 
			
		||||
        handler.squelchAll_f_ = [&](PublicKey const& actualKey,
 | 
			
		||||
                                    std::uint32_t duration) {
 | 
			
		||||
            for (auto it = keys.begin(); it != keys.end(); ++it)
 | 
			
		||||
            {
 | 
			
		||||
                if (*it == actualKey)
 | 
			
		||||
                {
 | 
			
		||||
                    keys.erase(it);
 | 
			
		||||
                    return;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            BEAST_EXPECTS(false, "unexpected key passed to squelchAll");
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.untrusted_slots_.size() ==
 | 
			
		||||
                env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS,
 | 
			
		||||
            "unexpected number of untrusted slots");
 | 
			
		||||
 | 
			
		||||
        // advance the manual clock to after slot expiration
 | 
			
		||||
        ManualClock::advance(
 | 
			
		||||
            reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT +
 | 
			
		||||
            std::chrono::seconds{1});
 | 
			
		||||
 | 
			
		||||
        slots.deleteIdlePeers();
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.untrusted_slots_.size() == 0,
 | 
			
		||||
            "unexpected number of untrusted slots");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(keys.empty(), "not all validators were squelched");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    testDeleteIdlePeers_deleteIdleUntrustedPeer()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("deleteIdleUntrustedPeer");
 | 
			
		||||
        Peer::id_t const peerID = 1;
 | 
			
		||||
        Peer::id_t const peerID2 = 2;
 | 
			
		||||
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), noop_handler, env_.app().config());
 | 
			
		||||
 | 
			
		||||
        // fill one untrustd validator slot
 | 
			
		||||
        auto const validator = fillUntrustedSlots(slots, 1)[0];
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.untrusted_slots_.size() == 1,
 | 
			
		||||
            "unexpected number of untrusted slots");
 | 
			
		||||
 | 
			
		||||
        slots.updateSlotAndSquelch(
 | 
			
		||||
            sha512Half(validator) + static_cast<uint256>(100),
 | 
			
		||||
            validator,
 | 
			
		||||
            peerID,
 | 
			
		||||
            false);
 | 
			
		||||
 | 
			
		||||
        slots.updateSlotAndSquelch(
 | 
			
		||||
            sha512Half(validator) + static_cast<uint256>(100),
 | 
			
		||||
            validator,
 | 
			
		||||
            peerID2,
 | 
			
		||||
            false);
 | 
			
		||||
 | 
			
		||||
        slots.deletePeer(peerID, true);
 | 
			
		||||
 | 
			
		||||
        auto const slotPeers = getUntrustedSlotPeers(validator, slots);
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slotPeers.size() == 1, "untrusted validator slot is missing");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slotPeers.contains(peerID),
 | 
			
		||||
            "peer was not removed from untrusted slots");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slotPeers.contains(peerID2),
 | 
			
		||||
            "peer was removed from untrusted slots");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Test that untrusted validator slots are correctly updated by
 | 
			
		||||
     * updateSlotAndSquelch
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    testUpdateSlotAndSquelch_untrustedValidator()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("updateUntrsutedValidatorSlot");
 | 
			
		||||
        TestHandler handler{noop_handler};
 | 
			
		||||
 | 
			
		||||
        handler.squelch_f_ = [](PublicKey const&, Peer::id_t, std::uint32_t) {};
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), handler, env_.app().config());
 | 
			
		||||
 | 
			
		||||
        // peers that will be source of validator messages
 | 
			
		||||
        std::vector<Peer::id_t> peers = {};
 | 
			
		||||
 | 
			
		||||
        // prepare n+1 peers, we expect the n+1st peer will be squelched
 | 
			
		||||
        for (int i = 0; i <
 | 
			
		||||
             env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS + 1;
 | 
			
		||||
             ++i)
 | 
			
		||||
            peers.push_back(i);
 | 
			
		||||
 | 
			
		||||
        auto const validator = fillUntrustedSlots(slots, 1)[0];
 | 
			
		||||
 | 
			
		||||
        // Squelching logic resets all counters each time a new peer is added
 | 
			
		||||
        // Therfore we need to populate counters for each peer before sending
 | 
			
		||||
        // new messages
 | 
			
		||||
        for (auto const& peer : peers)
 | 
			
		||||
        {
 | 
			
		||||
            auto const now = ManualClock::now();
 | 
			
		||||
            slots.updateSlotAndSquelch(
 | 
			
		||||
                sha512Half(validator) +
 | 
			
		||||
                    static_cast<uint256>(now.time_since_epoch().count()),
 | 
			
		||||
                validator,
 | 
			
		||||
                peer,
 | 
			
		||||
                false);
 | 
			
		||||
 | 
			
		||||
            ManualClock::advance(std::chrono::milliseconds{10});
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // simulate new, unique validator messages sent by peers
 | 
			
		||||
        for (auto const& peer : peers)
 | 
			
		||||
            for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i)
 | 
			
		||||
            {
 | 
			
		||||
                auto const now = ManualClock::now();
 | 
			
		||||
                slots.updateSlotAndSquelch(
 | 
			
		||||
                    sha512Half(validator) +
 | 
			
		||||
                        static_cast<uint256>(now.time_since_epoch().count()),
 | 
			
		||||
                    validator,
 | 
			
		||||
                    peer,
 | 
			
		||||
                    false);
 | 
			
		||||
 | 
			
		||||
                ManualClock::advance(std::chrono::milliseconds{10});
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        auto const slotPeers = getUntrustedSlotPeers(validator, slots);
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slotPeers.size() ==
 | 
			
		||||
                env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS +
 | 
			
		||||
                    1,
 | 
			
		||||
            "untrusted validator slot is missing");
 | 
			
		||||
 | 
			
		||||
        int selected = 0;
 | 
			
		||||
        int squelched = 0;
 | 
			
		||||
        for (auto const& [_, info] : slotPeers)
 | 
			
		||||
        {
 | 
			
		||||
            switch (info.state)
 | 
			
		||||
            {
 | 
			
		||||
                case reduce_relay::PeerState::Selected:
 | 
			
		||||
                    ++selected;
 | 
			
		||||
                    break;
 | 
			
		||||
                case reduce_relay::PeerState::Squelched:
 | 
			
		||||
                    ++squelched;
 | 
			
		||||
                    break;
 | 
			
		||||
                case reduce_relay::PeerState::Counting:
 | 
			
		||||
                    BEAST_EXPECTS(
 | 
			
		||||
                        false, "peer should not be in counting state");
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(squelched == 1, "expected one squelched peer");
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            selected ==
 | 
			
		||||
                env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS,
 | 
			
		||||
            "wrong number of peers selected");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    testUpdateConsideredValidator_newValidator()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("testUpdateConsideredValidator_newValidator");
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), noop_handler, env_.app().config());
 | 
			
		||||
 | 
			
		||||
        // insert some random validator key
 | 
			
		||||
        auto const validator = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
        Peer::id_t const peerID = 0;
 | 
			
		||||
        Peer::id_t const peerID2 = 1;
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.updateConsideredValidator(validator, peerID),
 | 
			
		||||
            "validator was selected with insufficient number of peers");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.considered_validators_.contains(validator),
 | 
			
		||||
            "new validator was not added for consideration");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.updateConsideredValidator(validator, peerID),
 | 
			
		||||
            "validator was selected with insufficient number of peers");
 | 
			
		||||
 | 
			
		||||
        // expect that a peer will be registered once as a message source
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.considered_validators_.at(validator).peers.size() == 1,
 | 
			
		||||
            "duplicate peer was registered");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.updateConsideredValidator(validator, peerID2),
 | 
			
		||||
            "validator was selected with insufficient number of peers");
 | 
			
		||||
 | 
			
		||||
        // expect that each distinct peer will be registered
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.considered_validators_.at(validator).peers.size() == 2,
 | 
			
		||||
            "distinct peers were not registered");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    testUpdateConsideredValidator_idleValidator()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("testUpdateConsideredValidator_idleValidator");
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), noop_handler, env_.app().config());
 | 
			
		||||
 | 
			
		||||
        // insert some random validator key
 | 
			
		||||
        auto const validator = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
        Peer::id_t peerID = 0;
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.updateConsideredValidator(validator, peerID),
 | 
			
		||||
            "validator was selected with insufficient number of peers");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.considered_validators_.contains(validator),
 | 
			
		||||
            "new validator was not added for consideration");
 | 
			
		||||
 | 
			
		||||
        auto const state = slots.considered_validators_.at(validator);
 | 
			
		||||
 | 
			
		||||
        // simulate a validator sending a new message before the idle timer
 | 
			
		||||
        ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1));
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.updateConsideredValidator(validator, peerID),
 | 
			
		||||
            "validator was selected with insufficient number of peers");
 | 
			
		||||
        auto const newState = slots.considered_validators_.at(validator);
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            state.count + 1 == newState.count,
 | 
			
		||||
            "non-idling validator was updated");
 | 
			
		||||
 | 
			
		||||
        // simulate a validator idling
 | 
			
		||||
        ManualClock::advance(reduce_relay::IDLED + std::chrono::seconds(1));
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.updateConsideredValidator(validator, peerID),
 | 
			
		||||
            "validator was selected with insufficient number of peers");
 | 
			
		||||
 | 
			
		||||
        auto const idleState = slots.considered_validators_.at(validator);
 | 
			
		||||
        // we expect that an idling validator will not be updated
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            newState.count == idleState.count, "idling validator was updated");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    testUpdateConsideredValidator_selectQualifyingValidator()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("testUpdateConsideredValidator_selectQualifyingValidator");
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), noop_handler, env_.app().config());
 | 
			
		||||
 | 
			
		||||
        // insert some random validator key
 | 
			
		||||
        auto const validator = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
        auto const validator2 = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
        Peer::id_t peerID = 0;
 | 
			
		||||
        Peer::id_t peerID2 =
 | 
			
		||||
            env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS;
 | 
			
		||||
 | 
			
		||||
        // a validator that sends only unique messages, but only from one peer
 | 
			
		||||
        // must not be selected
 | 
			
		||||
        for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i)
 | 
			
		||||
        {
 | 
			
		||||
            BEAST_EXPECTS(
 | 
			
		||||
                !slots.updateConsideredValidator(validator, peerID),
 | 
			
		||||
                "validator was selected before reaching message threshold");
 | 
			
		||||
            BEAST_EXPECTS(
 | 
			
		||||
                !slots.updateConsideredValidator(validator2, peerID),
 | 
			
		||||
                "validator was selected before reaching message threshold");
 | 
			
		||||
 | 
			
		||||
            ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1));
 | 
			
		||||
        }
 | 
			
		||||
        // as long as the peer criteria is not met, the validator most not be
 | 
			
		||||
        // selected
 | 
			
		||||
        for (int i = 1; i <
 | 
			
		||||
             env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS - 1;
 | 
			
		||||
             ++i)
 | 
			
		||||
        {
 | 
			
		||||
            BEAST_EXPECTS(
 | 
			
		||||
                !slots.updateConsideredValidator(validator, i),
 | 
			
		||||
                "validator was selected before reaching enough peers");
 | 
			
		||||
            BEAST_EXPECTS(
 | 
			
		||||
                !slots.updateConsideredValidator(validator2, i),
 | 
			
		||||
                "validator was selected before reaching enough peers");
 | 
			
		||||
 | 
			
		||||
            ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        auto const consideredValidator =
 | 
			
		||||
            slots.updateConsideredValidator(validator, peerID2);
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            consideredValidator && *consideredValidator == validator,
 | 
			
		||||
            "expected validator was not selected");
 | 
			
		||||
 | 
			
		||||
        // expect that selected peer was removed
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.considered_validators_.contains(validator),
 | 
			
		||||
            "selected validator was not removed from considered list");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.considered_validators_.contains(validator2),
 | 
			
		||||
            "unqualified validator was removed from considered list");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    testCleanConsideredValidators_deleteIdleValidator()
 | 
			
		||||
    {
 | 
			
		||||
        testcase("cleanConsideredValidators_deleteIdleValidator");
 | 
			
		||||
 | 
			
		||||
        reduce_relay::Slots<ManualClock> slots(
 | 
			
		||||
            env_.app().logs(), noop_handler, env_.app().config());
 | 
			
		||||
 | 
			
		||||
        // insert some random validator key
 | 
			
		||||
        auto const lateValidator = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
        auto const validator = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
        Peer::id_t peerID = 0;
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.updateConsideredValidator(lateValidator, peerID),
 | 
			
		||||
            "validator was selected with insufficient number of peers");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.considered_validators_.contains(lateValidator),
 | 
			
		||||
            "new validator was not added for consideration");
 | 
			
		||||
 | 
			
		||||
        // simulate a validator idling
 | 
			
		||||
        ManualClock::advance(reduce_relay::IDLED + std::chrono::seconds(1));
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.updateConsideredValidator(validator, peerID),
 | 
			
		||||
            "validator was selected with insufficient number of peers");
 | 
			
		||||
 | 
			
		||||
        auto const invalidValidators = slots.cleanConsideredValidators();
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            invalidValidators.size() == 1,
 | 
			
		||||
            "unexpected number of invalid validators");
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            invalidValidators[0] == lateValidator, "removed invalid validator");
 | 
			
		||||
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            !slots.considered_validators_.contains(lateValidator),
 | 
			
		||||
            "late validator was not removed");
 | 
			
		||||
        BEAST_EXPECTS(
 | 
			
		||||
            slots.considered_validators_.contains(validator),
 | 
			
		||||
            "timely validator was removed");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
    /** A helper method to fill untrusted slots of a given Slots instance
 | 
			
		||||
     * with random validator messages*/
 | 
			
		||||
    std::vector<PublicKey>
 | 
			
		||||
    fillUntrustedSlots(
 | 
			
		||||
        reduce_relay::Slots<ManualClock>& slots,
 | 
			
		||||
        int64_t maxSlots = reduce_relay::MAX_UNTRUSTED_SLOTS)
 | 
			
		||||
    {
 | 
			
		||||
        std::vector<PublicKey> keys;
 | 
			
		||||
        for (int i = 0; i < maxSlots; ++i)
 | 
			
		||||
        {
 | 
			
		||||
            auto const validator = randomKeyPair(KeyType::ed25519).first;
 | 
			
		||||
            keys.push_back(validator);
 | 
			
		||||
            for (int j = 0; j <
 | 
			
		||||
                 env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS;
 | 
			
		||||
                 ++j)
 | 
			
		||||
                // send enough messages so that a validator slot is selected
 | 
			
		||||
                for (int k = 0; k < reduce_relay::MAX_MESSAGE_THRESHOLD; ++k)
 | 
			
		||||
                    slots.updateValidatorSlot(
 | 
			
		||||
                        sha512Half(validator) + static_cast<uint256>(k),
 | 
			
		||||
                        validator,
 | 
			
		||||
                        j);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return keys;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::unordered_map<Peer::id_t, reduce_relay::Slot<ManualClock>::PeerInfo>
 | 
			
		||||
    getUntrustedSlotPeers(
 | 
			
		||||
        PublicKey const& validator,
 | 
			
		||||
        reduce_relay::Slots<ManualClock> const& slots)
 | 
			
		||||
    {
 | 
			
		||||
        auto const& it = slots.untrusted_slots_.find(validator);
 | 
			
		||||
        if (it == slots.untrusted_slots_.end())
 | 
			
		||||
            return {};
 | 
			
		||||
 | 
			
		||||
        auto r = std::unordered_map<
 | 
			
		||||
            Peer::id_t,
 | 
			
		||||
            reduce_relay::Slot<ManualClock>::PeerInfo>();
 | 
			
		||||
 | 
			
		||||
        for (auto const& [id, info] : it->second.peers_)
 | 
			
		||||
            r.emplace(std::make_pair(id, info));
 | 
			
		||||
 | 
			
		||||
        return r;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    run() override
 | 
			
		||||
    {
 | 
			
		||||
        testConfig();
 | 
			
		||||
        testSquelchTracking();
 | 
			
		||||
        testUpdateValidatorSlot_newValidator();
 | 
			
		||||
        testUpdateValidatorSlot_slotsFull();
 | 
			
		||||
        testUpdateValidatorSlot_squelchedValidator();
 | 
			
		||||
        testDeleteIdlePeers_deleteIdleSlots();
 | 
			
		||||
        testDeleteIdlePeers_deleteIdleUntrustedPeer();
 | 
			
		||||
        testUpdateSlotAndSquelch_untrustedValidator();
 | 
			
		||||
        testUpdateConsideredValidator_newValidator();
 | 
			
		||||
        testUpdateConsideredValidator_idleValidator();
 | 
			
		||||
        testUpdateConsideredValidator_selectQualifyingValidator();
 | 
			
		||||
        testCleanConsideredValidators_deleteIdleValidator();
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
BEAST_DEFINE_TESTSUITE(enhanced_squelch, overlay, ripple);
 | 
			
		||||
 | 
			
		||||
}  // namespace test
 | 
			
		||||
 | 
			
		||||
}  // namespace ripple
 | 
			
		||||
@@ -254,6 +254,9 @@ public:
 | 
			
		||||
    std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 5;
 | 
			
		||||
    /////////////////    END OF TEMPORARY CODE BLOCK    /////////////////////
 | 
			
		||||
 | 
			
		||||
    // Enable enhanced squelching of unique untrusted validator messages
 | 
			
		||||
    bool VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = false;
 | 
			
		||||
 | 
			
		||||
    // Transaction reduce-relay feature
 | 
			
		||||
    bool TX_REDUCE_RELAY_ENABLE = false;
 | 
			
		||||
    // If tx reduce-relay feature is disabled
 | 
			
		||||
 
 | 
			
		||||
@@ -775,6 +775,9 @@ Config::loadFromString(std::string const& fileContents)
 | 
			
		||||
                "greater than or equal to 3");
 | 
			
		||||
        /////////////////  !!END OF TEMPORARY CODE BLOCK!! /////////////////////
 | 
			
		||||
 | 
			
		||||
        VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE =
 | 
			
		||||
            sec.value_or("vp_enhanced_squelch_enable", false);
 | 
			
		||||
 | 
			
		||||
        TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false);
 | 
			
		||||
        TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false);
 | 
			
		||||
        TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20);
 | 
			
		||||
 
 | 
			
		||||
@@ -40,7 +40,7 @@ static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT = std::chrono::seconds{600};
 | 
			
		||||
static constexpr auto SQUELCH_PER_PEER = std::chrono::seconds(10);
 | 
			
		||||
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS = std::chrono::seconds{3600};
 | 
			
		||||
// No message received threshold before identifying a peer as idled
 | 
			
		||||
static constexpr auto IDLED = std::chrono::seconds{8};
 | 
			
		||||
static constexpr auto IDLED = std::chrono::seconds{5};
 | 
			
		||||
// Message count threshold to start selecting peers as the source
 | 
			
		||||
// of messages from the validator. We add peers who reach
 | 
			
		||||
// MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS
 | 
			
		||||
@@ -49,6 +49,8 @@ static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 19;
 | 
			
		||||
static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 20;
 | 
			
		||||
// Max selected peers to choose as the source of messages from validator
 | 
			
		||||
static constexpr uint16_t MAX_SELECTED_PEERS = 5;
 | 
			
		||||
// Max number of untrusted slots the server will maintain
 | 
			
		||||
static constexpr uint16_t MAX_UNTRUSTED_SLOTS = 5;
 | 
			
		||||
// Wait before reduce-relay feature is enabled on boot up to let
 | 
			
		||||
// the server establish peer connections
 | 
			
		||||
static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10};
 | 
			
		||||
 
 | 
			
		||||
@@ -29,17 +29,25 @@
 | 
			
		||||
#include <xrpl/basics/random.h>
 | 
			
		||||
#include <xrpl/beast/container/aged_unordered_map.h>
 | 
			
		||||
#include <xrpl/beast/utility/Journal.h>
 | 
			
		||||
#include <xrpl/beast/utility/PropertyStream.h>
 | 
			
		||||
#include <xrpl/protocol/PublicKey.h>
 | 
			
		||||
#include <xrpl/protocol/messages.h>
 | 
			
		||||
 | 
			
		||||
#include <algorithm>
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <cstddef>
 | 
			
		||||
#include <iostream>
 | 
			
		||||
#include <optional>
 | 
			
		||||
#include <set>
 | 
			
		||||
#include <tuple>
 | 
			
		||||
#include <unordered_map>
 | 
			
		||||
#include <unordered_set>
 | 
			
		||||
 | 
			
		||||
namespace ripple {
 | 
			
		||||
// used to make private members of Slots class accessible for testing
 | 
			
		||||
namespace test {
 | 
			
		||||
class enhanced_squelch_test;
 | 
			
		||||
class base_squelch_test;
 | 
			
		||||
class OverlaySim;
 | 
			
		||||
}  // namespace test
 | 
			
		||||
 | 
			
		||||
namespace reduce_relay {
 | 
			
		||||
 | 
			
		||||
@@ -52,12 +60,42 @@ enum class PeerState : uint8_t {
 | 
			
		||||
    Selected,   // selected to relay, counting if Slot in Counting
 | 
			
		||||
    Squelched,  // squelched, doesn't relay
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
inline std::string
 | 
			
		||||
to_string(PeerState state)
 | 
			
		||||
{
 | 
			
		||||
    switch (state)
 | 
			
		||||
    {
 | 
			
		||||
        case PeerState::Counting:
 | 
			
		||||
            return "counting";
 | 
			
		||||
        case PeerState::Selected:
 | 
			
		||||
            return "selected";
 | 
			
		||||
        case PeerState::Squelched:
 | 
			
		||||
            return "squelched";
 | 
			
		||||
        default:
 | 
			
		||||
            return "unknown";
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
/** Slot's State */
 | 
			
		||||
enum class SlotState : uint8_t {
 | 
			
		||||
    Counting,  // counting messages
 | 
			
		||||
    Selected,  // peers selected, stop counting
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
inline std::string
 | 
			
		||||
to_string(SlotState state)
 | 
			
		||||
{
 | 
			
		||||
    switch (state)
 | 
			
		||||
    {
 | 
			
		||||
        case SlotState::Counting:
 | 
			
		||||
            return "counting";
 | 
			
		||||
        case SlotState::Selected:
 | 
			
		||||
            return "selected";
 | 
			
		||||
        default:
 | 
			
		||||
            return "unknown";
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename Unit, typename TP>
 | 
			
		||||
Unit
 | 
			
		||||
epoch(TP const& t)
 | 
			
		||||
@@ -75,7 +113,7 @@ public:
 | 
			
		||||
    virtual ~SquelchHandler()
 | 
			
		||||
    {
 | 
			
		||||
    }
 | 
			
		||||
    /** Squelch handler
 | 
			
		||||
    /** Squelch handler for a single peer
 | 
			
		||||
     * @param validator Public key of the source validator
 | 
			
		||||
     * @param id Peer's id to squelch
 | 
			
		||||
     * @param duration Squelch duration in seconds
 | 
			
		||||
@@ -83,6 +121,15 @@ public:
 | 
			
		||||
    virtual void
 | 
			
		||||
    squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration)
 | 
			
		||||
        const = 0;
 | 
			
		||||
 | 
			
		||||
    /** Squelch for all peers, the method must call slots.squelchValidator
 | 
			
		||||
     * to register that a (validator,peer) was squelched
 | 
			
		||||
     * @param validator Public key of the source validator
 | 
			
		||||
     * @param duration Squelch duration in seconds
 | 
			
		||||
     */
 | 
			
		||||
    virtual void
 | 
			
		||||
    squelchAll(PublicKey const& validator, std::uint32_t duration) = 0;
 | 
			
		||||
 | 
			
		||||
    /** Unsquelch handler
 | 
			
		||||
     * @param validator Public key of the source validator
 | 
			
		||||
     * @param id Peer's id to unsquelch
 | 
			
		||||
@@ -104,8 +151,10 @@ public:
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
class Slot final
 | 
			
		||||
{
 | 
			
		||||
private:
 | 
			
		||||
    friend class Slots<clock_type>;
 | 
			
		||||
    friend class test::enhanced_squelch_test;
 | 
			
		||||
    friend class test::OverlaySim;
 | 
			
		||||
 | 
			
		||||
    using id_t = Peer::id_t;
 | 
			
		||||
    using time_point = typename clock_type::time_point;
 | 
			
		||||
 | 
			
		||||
@@ -121,13 +170,15 @@ private:
 | 
			
		||||
    Slot(
 | 
			
		||||
        SquelchHandler const& handler,
 | 
			
		||||
        beast::Journal journal,
 | 
			
		||||
        uint16_t maxSelectedPeers)
 | 
			
		||||
        uint16_t maxSelectedPeers,
 | 
			
		||||
        bool isTrusted)
 | 
			
		||||
        : reachedThreshold_(0)
 | 
			
		||||
        , lastSelected_(clock_type::now())
 | 
			
		||||
        , state_(SlotState::Counting)
 | 
			
		||||
        , handler_(handler)
 | 
			
		||||
        , journal_(journal)
 | 
			
		||||
        , maxSelectedPeers_(maxSelectedPeers)
 | 
			
		||||
        , isTrusted_(isTrusted)
 | 
			
		||||
    {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -139,22 +190,19 @@ private:
 | 
			
		||||
     * MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the
 | 
			
		||||
     * number of considered peers who reached MAX_MESSAGE_THRESHOLD is
 | 
			
		||||
     * maxSelectedPeers_ then randomly select maxSelectedPeers_ from
 | 
			
		||||
     * considered peers, and call squelch handler for each peer, which is not
 | 
			
		||||
     * selected and not already in Squelched state. Set the state for those
 | 
			
		||||
     * peers to Squelched and reset the count of all peers. Set slot's state to
 | 
			
		||||
     * Selected. Message count is not updated when the slot is in Selected
 | 
			
		||||
     * state.
 | 
			
		||||
     * considered peers, and call squelch handler for each peer, which is
 | 
			
		||||
     * not selected and not already in Squelched state. Set the state for
 | 
			
		||||
     * those peers to Squelched and reset the count of all peers. Set slot's
 | 
			
		||||
     * state to Selected. Message count is not updated when the slot is in
 | 
			
		||||
     * Selected state.
 | 
			
		||||
     * @param validator Public key of the source validator
 | 
			
		||||
     * @param id Peer id which received the message
 | 
			
		||||
     * @param type  Message type (Validation and Propose Set only,
 | 
			
		||||
     *     others are ignored, future use)
 | 
			
		||||
     * @param callback A callback to report ignored squelches
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    update(
 | 
			
		||||
        PublicKey const& validator,
 | 
			
		||||
        id_t id,
 | 
			
		||||
        protocol::MessageType type,
 | 
			
		||||
        ignored_squelch_callback callback);
 | 
			
		||||
 | 
			
		||||
    /** Handle peer deletion when a peer disconnects.
 | 
			
		||||
@@ -177,32 +225,6 @@ private:
 | 
			
		||||
        return lastSelected_;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Return number of peers in state */
 | 
			
		||||
    std::uint16_t
 | 
			
		||||
    inState(PeerState state) const;
 | 
			
		||||
 | 
			
		||||
    /** Return number of peers not in state */
 | 
			
		||||
    std::uint16_t
 | 
			
		||||
    notInState(PeerState state) const;
 | 
			
		||||
 | 
			
		||||
    /** Return Slot's state */
 | 
			
		||||
    SlotState
 | 
			
		||||
    getState() const
 | 
			
		||||
    {
 | 
			
		||||
        return state_;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Return selected peers */
 | 
			
		||||
    std::set<id_t>
 | 
			
		||||
    getSelected() const;
 | 
			
		||||
 | 
			
		||||
    /** Get peers info. Return map of peer's state, count, squelch
 | 
			
		||||
     * expiration milsec, and last message time milsec.
 | 
			
		||||
     */
 | 
			
		||||
    std::
 | 
			
		||||
        unordered_map<id_t, std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
 | 
			
		||||
        getPeers() const;
 | 
			
		||||
 | 
			
		||||
    /** Check if peers stopped relaying messages. If a peer is
 | 
			
		||||
     * selected peer then call unsquelch handler for all
 | 
			
		||||
     * currently squelched peers and switch the slot to
 | 
			
		||||
@@ -220,7 +242,6 @@ private:
 | 
			
		||||
    std::chrono::seconds
 | 
			
		||||
    getSquelchDuration(std::size_t npeers);
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
    /** Reset counts of peers in Selected or Counting state */
 | 
			
		||||
    void
 | 
			
		||||
    resetCounts();
 | 
			
		||||
@@ -229,13 +250,18 @@ private:
 | 
			
		||||
    void
 | 
			
		||||
    initCounting();
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    onWrite(beast::PropertyStream::Map& stream) const;
 | 
			
		||||
 | 
			
		||||
    /** Data maintained for each peer */
 | 
			
		||||
    struct PeerInfo
 | 
			
		||||
    {
 | 
			
		||||
        PeerState state;         // peer's state
 | 
			
		||||
        std::size_t count;       // message count
 | 
			
		||||
        time_point expire;       // squelch expiration time
 | 
			
		||||
        time_point lastMessage;  // time last message received
 | 
			
		||||
        PeerState state;            // peer's state
 | 
			
		||||
        std::size_t count;          // message count
 | 
			
		||||
        time_point expire;          // squelch expiration time
 | 
			
		||||
        time_point lastMessage;     // time last message received
 | 
			
		||||
        std::size_t timesSelected;  // number of times the peer was selected
 | 
			
		||||
        std::size_t timesCloseToThreshold;
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    std::unordered_map<id_t, PeerInfo> peers_;  // peer's data
 | 
			
		||||
@@ -257,6 +283,9 @@ private:
 | 
			
		||||
    // the maximum number of peers that should be selected as a validator
 | 
			
		||||
    // message source
 | 
			
		||||
    uint16_t const maxSelectedPeers_;
 | 
			
		||||
 | 
			
		||||
    // indicate if the slot is for a trusted validator
 | 
			
		||||
    bool const isTrusted_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
@@ -287,7 +316,6 @@ void
 | 
			
		||||
Slot<clock_type>::update(
 | 
			
		||||
    PublicKey const& validator,
 | 
			
		||||
    id_t id,
 | 
			
		||||
    protocol::MessageType type,
 | 
			
		||||
    ignored_squelch_callback callback)
 | 
			
		||||
{
 | 
			
		||||
    using namespace std::chrono;
 | 
			
		||||
@@ -298,8 +326,15 @@ Slot<clock_type>::update(
 | 
			
		||||
    {
 | 
			
		||||
        JLOG(journal_.trace())
 | 
			
		||||
            << "update: adding peer " << Slice(validator) << " " << id;
 | 
			
		||||
        peers_.emplace(
 | 
			
		||||
            std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now}));
 | 
			
		||||
        peers_.emplace(std::make_pair(
 | 
			
		||||
            id,
 | 
			
		||||
            PeerInfo{
 | 
			
		||||
                .state = PeerState::Counting,
 | 
			
		||||
                .count = 0,
 | 
			
		||||
                .expire = now,
 | 
			
		||||
                .lastMessage = now,
 | 
			
		||||
                .timesSelected = 0,
 | 
			
		||||
                .timesCloseToThreshold = 0}));
 | 
			
		||||
        initCounting();
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
@@ -321,8 +356,10 @@ Slot<clock_type>::update(
 | 
			
		||||
        << " slot state " << static_cast<int>(state_) << " peer state "
 | 
			
		||||
        << static_cast<int>(peer.state) << " count " << peer.count << " last "
 | 
			
		||||
        << duration_cast<milliseconds>(now - peer.lastMessage).count()
 | 
			
		||||
        << " pool " << considered_.size() << " threshold " << reachedThreshold_
 | 
			
		||||
        << " " << (type == protocol::mtVALIDATION ? "validation" : "proposal");
 | 
			
		||||
        << " pool " << considered_.size() << " threshold " << reachedThreshold_;
 | 
			
		||||
 | 
			
		||||
    if (now - peer.lastMessage - IDLED <= milliseconds{500})
 | 
			
		||||
        ++peer.timesCloseToThreshold;
 | 
			
		||||
 | 
			
		||||
    peer.lastMessage = now;
 | 
			
		||||
 | 
			
		||||
@@ -349,6 +386,16 @@ Slot<clock_type>::update(
 | 
			
		||||
 | 
			
		||||
    if (reachedThreshold_ == maxSelectedPeers_)
 | 
			
		||||
    {
 | 
			
		||||
        for (auto const& [id, info] : peers_)
 | 
			
		||||
        {
 | 
			
		||||
            if (info.state == PeerState::Selected &&
 | 
			
		||||
                info.count < MIN_MESSAGE_THRESHOLD)
 | 
			
		||||
            {
 | 
			
		||||
                JLOG(journal_.debug())
 | 
			
		||||
                    << "update: previously selected peer " << id
 | 
			
		||||
                    << " failed to reach a threshold with: " << info.count;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        // Randomly select maxSelectedPeers_ peers from considered.
 | 
			
		||||
        // Exclude peers that have been idling > IDLED -
 | 
			
		||||
        // it's possible that deleteIdlePeer() has not been called yet.
 | 
			
		||||
@@ -403,9 +450,18 @@ Slot<clock_type>::update(
 | 
			
		||||
            v.count = 0;
 | 
			
		||||
 | 
			
		||||
            if (selected.find(k) != selected.end())
 | 
			
		||||
            {
 | 
			
		||||
                v.state = PeerState::Selected;
 | 
			
		||||
                ++v.timesSelected;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            else if (v.state != PeerState::Squelched)
 | 
			
		||||
            {
 | 
			
		||||
                if (v.state == PeerState::Selected)
 | 
			
		||||
                {
 | 
			
		||||
                    JLOG(journal_.debug())
 | 
			
		||||
                        << "squelching previously selected peer";
 | 
			
		||||
                }
 | 
			
		||||
                if (journal_.trace())
 | 
			
		||||
                    str << k << " ";
 | 
			
		||||
                v.state = PeerState::Squelched;
 | 
			
		||||
@@ -482,6 +538,35 @@ Slot<clock_type>::deletePeer(PublicKey const& validator, id_t id, bool erase)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
void
 | 
			
		||||
Slot<clock_type>::onWrite(beast::PropertyStream::Map& stream) const
 | 
			
		||||
{
 | 
			
		||||
    auto const now = clock_type::now();
 | 
			
		||||
    stream["state"] = to_string(state_);
 | 
			
		||||
    stream["reachedThreshold"] = reachedThreshold_;
 | 
			
		||||
    stream["considered"] = considered_.size();
 | 
			
		||||
    stream["lastSelected"] =
 | 
			
		||||
        duration_cast<std::chrono::seconds>(now - lastSelected_).count();
 | 
			
		||||
    stream["isTrusted"] = isTrusted_;
 | 
			
		||||
 | 
			
		||||
    beast::PropertyStream::Set peers("peers", stream);
 | 
			
		||||
 | 
			
		||||
    for (auto const& [id, info] : peers_)
 | 
			
		||||
    {
 | 
			
		||||
        beast::PropertyStream::Map item(peers);
 | 
			
		||||
        item["id"] = id;
 | 
			
		||||
        item["count"] = info.count;
 | 
			
		||||
        item["expire"] =
 | 
			
		||||
            duration_cast<std::chrono::seconds>(info.expire - now).count();
 | 
			
		||||
        item["lastMessage"] =
 | 
			
		||||
            duration_cast<std::chrono::seconds>(now - info.lastMessage).count();
 | 
			
		||||
        item["timesSelected"] = info.timesSelected;
 | 
			
		||||
        item["timesCloseToThreshold"] = info.timesCloseToThreshold;
 | 
			
		||||
        item["state"] = to_string(info.state);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
void
 | 
			
		||||
Slot<clock_type>::resetCounts()
 | 
			
		||||
@@ -503,65 +588,18 @@ Slot<clock_type>::initCounting()
 | 
			
		||||
    resetCounts();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
std::uint16_t
 | 
			
		||||
Slot<clock_type>::inState(PeerState state) const
 | 
			
		||||
{
 | 
			
		||||
    return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) {
 | 
			
		||||
        return (it.second.state == state);
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
std::uint16_t
 | 
			
		||||
Slot<clock_type>::notInState(PeerState state) const
 | 
			
		||||
{
 | 
			
		||||
    return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) {
 | 
			
		||||
        return (it.second.state != state);
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
std::set<typename Peer::id_t>
 | 
			
		||||
Slot<clock_type>::getSelected() const
 | 
			
		||||
{
 | 
			
		||||
    std::set<id_t> r;
 | 
			
		||||
    for (auto const& [id, info] : peers_)
 | 
			
		||||
        if (info.state == PeerState::Selected)
 | 
			
		||||
            r.insert(id);
 | 
			
		||||
    return r;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
std::unordered_map<
 | 
			
		||||
    typename Peer::id_t,
 | 
			
		||||
    std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
 | 
			
		||||
Slot<clock_type>::getPeers() const
 | 
			
		||||
{
 | 
			
		||||
    using namespace std::chrono;
 | 
			
		||||
    auto r = std::unordered_map<
 | 
			
		||||
        id_t,
 | 
			
		||||
        std::tuple<PeerState, std::uint16_t, std::uint32_t, std::uint32_t>>();
 | 
			
		||||
 | 
			
		||||
    for (auto const& [id, info] : peers_)
 | 
			
		||||
        r.emplace(std::make_pair(
 | 
			
		||||
            id,
 | 
			
		||||
            std::move(std::make_tuple(
 | 
			
		||||
                info.state,
 | 
			
		||||
                info.count,
 | 
			
		||||
                epoch<milliseconds>(info.expire).count(),
 | 
			
		||||
                epoch<milliseconds>(info.lastMessage).count()))));
 | 
			
		||||
 | 
			
		||||
    return r;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/** Slots is a container for validator's Slot and handles Slot update
 | 
			
		||||
 * when a message is received from a validator. It also handles Slot aging
 | 
			
		||||
 * and checks for peers which are disconnected or stopped relaying the messages.
 | 
			
		||||
 * and checks for peers which are disconnected or stopped relaying the
 | 
			
		||||
 * messages.
 | 
			
		||||
 */
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
class Slots final
 | 
			
		||||
{
 | 
			
		||||
    friend class test::enhanced_squelch_test;
 | 
			
		||||
    friend class test::base_squelch_test;
 | 
			
		||||
    friend class test::OverlaySim;
 | 
			
		||||
 | 
			
		||||
    using time_point = typename clock_type::time_point;
 | 
			
		||||
    using id_t = typename Peer::id_t;
 | 
			
		||||
    using messages = beast::aged_unordered_map<
 | 
			
		||||
@@ -569,6 +607,12 @@ class Slots final
 | 
			
		||||
        std::unordered_set<Peer::id_t>,
 | 
			
		||||
        clock_type,
 | 
			
		||||
        hardened_hash<strong_hash>>;
 | 
			
		||||
    using validators = beast::aged_unordered_map<
 | 
			
		||||
        PublicKey,
 | 
			
		||||
        std::unordered_set<Peer::id_t>,
 | 
			
		||||
        clock_type,
 | 
			
		||||
        hardened_hash<strong_hash>>;
 | 
			
		||||
    using slots_map = hash_map<PublicKey, Slot<clock_type>>;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    /**
 | 
			
		||||
@@ -576,14 +620,17 @@ public:
 | 
			
		||||
     * @param handler Squelch/unsquelch implementation
 | 
			
		||||
     * @param config reference to the global config
 | 
			
		||||
     */
 | 
			
		||||
    Slots(Logs& logs, SquelchHandler const& handler, Config const& config)
 | 
			
		||||
    Slots(Logs& logs, SquelchHandler& handler, Config const& config)
 | 
			
		||||
        : handler_(handler)
 | 
			
		||||
        , logs_(logs)
 | 
			
		||||
        , journal_(logs.journal("Slots"))
 | 
			
		||||
        , baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
 | 
			
		||||
        , maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS)
 | 
			
		||||
        , enhancedSquelchEnabled_(
 | 
			
		||||
              config.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE)
 | 
			
		||||
    {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    ~Slots() = default;
 | 
			
		||||
 | 
			
		||||
    /** Check if base squelching feature is enabled and ready */
 | 
			
		||||
@@ -593,6 +640,13 @@ public:
 | 
			
		||||
        return baseSquelchEnabled_ && reduceRelayReady();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Check if enhanced squelching feature is enabled and ready */
 | 
			
		||||
    bool
 | 
			
		||||
    enhancedSquelchReady()
 | 
			
		||||
    {
 | 
			
		||||
        return enhancedSquelchEnabled_ && reduceRelayReady();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup */
 | 
			
		||||
    bool
 | 
			
		||||
    reduceRelayReady()
 | 
			
		||||
@@ -605,108 +659,71 @@ public:
 | 
			
		||||
        return reduceRelayReady_;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Calls Slot::update of Slot associated with the validator, with a noop
 | 
			
		||||
     * callback.
 | 
			
		||||
    /** Updates untrusted validator slot. Do not call for trusted
 | 
			
		||||
     * validators. The caller must ensure passed messages are unique.
 | 
			
		||||
     * @param key Message hash
 | 
			
		||||
     * @param validator Validator public key
 | 
			
		||||
     * @param id The ID of the peer that sent the message
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    updateValidatorSlot(uint256 const& key, PublicKey const& validator, id_t id)
 | 
			
		||||
    {
 | 
			
		||||
        updateValidatorSlot(key, validator, id, []() {});
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Updates untrusted validator slot. Do not call for trusted
 | 
			
		||||
     * validators. The caller must ensure passed messages are unique.
 | 
			
		||||
     * @param key Message hash
 | 
			
		||||
     * @param validator Validator public key
 | 
			
		||||
     * @param id The ID of the peer that sent the message
 | 
			
		||||
     * @param callback A callback to report ignored validations
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    updateValidatorSlot(
 | 
			
		||||
        uint256 const& key,
 | 
			
		||||
        PublicKey const& validator,
 | 
			
		||||
        id_t id,
 | 
			
		||||
        typename Slot<clock_type>::ignored_squelch_callback callback);
 | 
			
		||||
 | 
			
		||||
    /** Calls Slot::update of Slot associated with the validator, with a
 | 
			
		||||
     * noop callback.
 | 
			
		||||
     * @param key Message's hash
 | 
			
		||||
     * @param validator Validator's public key
 | 
			
		||||
     * @param id Peer's id which received the message
 | 
			
		||||
     * @param type Received protocol message type
 | 
			
		||||
     * @param isTrusted Boolean to indicate if the message is from a trusted
 | 
			
		||||
     * validator
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    updateSlotAndSquelch(
 | 
			
		||||
        uint256 const& key,
 | 
			
		||||
        PublicKey const& validator,
 | 
			
		||||
        id_t id,
 | 
			
		||||
        protocol::MessageType type)
 | 
			
		||||
        bool isTrusted)
 | 
			
		||||
    {
 | 
			
		||||
        updateSlotAndSquelch(key, validator, id, type, []() {});
 | 
			
		||||
        updateSlotAndSquelch(key, validator, id, []() {}, isTrusted);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Calls Slot::update of Slot associated with the validator.
 | 
			
		||||
     * @param key Message's hash
 | 
			
		||||
     * @param validator Validator's public key
 | 
			
		||||
     * @param id Peer's id which received the message
 | 
			
		||||
     * @param type Received protocol message type
 | 
			
		||||
     * @param callback A callback to report ignored validations
 | 
			
		||||
     * @param isTrusted Boolean to indicate if the message is from a trusted
 | 
			
		||||
     * validator
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    updateSlotAndSquelch(
 | 
			
		||||
        uint256 const& key,
 | 
			
		||||
        PublicKey const& validator,
 | 
			
		||||
        id_t id,
 | 
			
		||||
        protocol::MessageType type,
 | 
			
		||||
        typename Slot<clock_type>::ignored_squelch_callback callback);
 | 
			
		||||
        typename Slot<clock_type>::ignored_squelch_callback callback,
 | 
			
		||||
        bool isTrusted);
 | 
			
		||||
 | 
			
		||||
    /** Check if peers stopped relaying messages
 | 
			
		||||
     * and if slots stopped receiving messages from the validator.
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    deleteIdlePeers();
 | 
			
		||||
 | 
			
		||||
    /** Return number of peers in state */
 | 
			
		||||
    std::optional<std::uint16_t>
 | 
			
		||||
    inState(PublicKey const& validator, PeerState state) const
 | 
			
		||||
    {
 | 
			
		||||
        auto const& it = slots_.find(validator);
 | 
			
		||||
        if (it != slots_.end())
 | 
			
		||||
            return it->second.inState(state);
 | 
			
		||||
        return {};
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Return number of peers not in state */
 | 
			
		||||
    std::optional<std::uint16_t>
 | 
			
		||||
    notInState(PublicKey const& validator, PeerState state) const
 | 
			
		||||
    {
 | 
			
		||||
        auto const& it = slots_.find(validator);
 | 
			
		||||
        if (it != slots_.end())
 | 
			
		||||
            return it->second.notInState(state);
 | 
			
		||||
        return {};
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Return true if Slot is in state */
 | 
			
		||||
    bool
 | 
			
		||||
    inState(PublicKey const& validator, SlotState state) const
 | 
			
		||||
    {
 | 
			
		||||
        auto const& it = slots_.find(validator);
 | 
			
		||||
        if (it != slots_.end())
 | 
			
		||||
            return it->second.state_ == state;
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Get selected peers */
 | 
			
		||||
    std::set<id_t>
 | 
			
		||||
    getSelected(PublicKey const& validator)
 | 
			
		||||
    {
 | 
			
		||||
        auto const& it = slots_.find(validator);
 | 
			
		||||
        if (it != slots_.end())
 | 
			
		||||
            return it->second.getSelected();
 | 
			
		||||
        return {};
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Get peers info. Return map of peer's state, count, and squelch
 | 
			
		||||
     * expiration milliseconds.
 | 
			
		||||
     */
 | 
			
		||||
    std::unordered_map<
 | 
			
		||||
        typename Peer::id_t,
 | 
			
		||||
        std::tuple<PeerState, uint16_t, uint32_t, std::uint32_t>>
 | 
			
		||||
    getPeers(PublicKey const& validator)
 | 
			
		||||
    {
 | 
			
		||||
        auto const& it = slots_.find(validator);
 | 
			
		||||
        if (it != slots_.end())
 | 
			
		||||
            return it->second.getPeers();
 | 
			
		||||
        return {};
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Get Slot's state */
 | 
			
		||||
    std::optional<SlotState>
 | 
			
		||||
    getState(PublicKey const& validator)
 | 
			
		||||
    {
 | 
			
		||||
        auto const& it = slots_.find(validator);
 | 
			
		||||
        if (it != slots_.end())
 | 
			
		||||
            return it->second.getState();
 | 
			
		||||
        return {};
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Called when a peer is deleted. If the peer was selected to be the
 | 
			
		||||
     * source of messages from the validator then squelched peers have to be
 | 
			
		||||
     * unsquelched.
 | 
			
		||||
@@ -716,6 +733,26 @@ public:
 | 
			
		||||
    void
 | 
			
		||||
    deletePeer(id_t id, bool erase);
 | 
			
		||||
 | 
			
		||||
    /** Called to register that a given validator was squelched for a given
 | 
			
		||||
     * peer. It is expected that this method is called by SquelchHandler.
 | 
			
		||||
     *
 | 
			
		||||
     * @param key Validator public key
 | 
			
		||||
     * @param id peer ID
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    squelchValidator(PublicKey const& key, id_t id)
 | 
			
		||||
    {
 | 
			
		||||
        auto it = peersWithValidators_.find(key);
 | 
			
		||||
        if (it == peersWithValidators_.end())
 | 
			
		||||
            peersWithValidators_.emplace(key, std::unordered_set<id_t>{id});
 | 
			
		||||
 | 
			
		||||
        else if (it->second.find(id) == it->second.end())
 | 
			
		||||
            it->second.insert(id);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    onWrite(beast::PropertyStream::Map& stream) const;
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
    /** Add message/peer if have not seen this message
 | 
			
		||||
     * from the peer. A message is aged after IDLED seconds.
 | 
			
		||||
@@ -723,15 +760,69 @@ private:
 | 
			
		||||
    bool
 | 
			
		||||
    addPeerMessage(uint256 const& key, id_t id);
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Updates the last message sent from a validator.
 | 
			
		||||
     * @param validator The validator public key
 | 
			
		||||
     * @param peer The peer ID sending the message
 | 
			
		||||
     * @return true if the validator was updated, false otherwise
 | 
			
		||||
     */
 | 
			
		||||
    std::optional<PublicKey>
 | 
			
		||||
    updateConsideredValidator(PublicKey const& validator, Peer::id_t peer);
 | 
			
		||||
 | 
			
		||||
    /** Remove all validators that have become invalid due to selection
 | 
			
		||||
     * criteria
 | 
			
		||||
     * @return zero or more validators that have been removed.
 | 
			
		||||
     */
 | 
			
		||||
    std::vector<PublicKey>
 | 
			
		||||
    cleanConsideredValidators();
 | 
			
		||||
 | 
			
		||||
    /** Checks whether a given validator is squelched.
 | 
			
		||||
     * @param key Validator public key
 | 
			
		||||
     * @return true if a given validator was squelched
 | 
			
		||||
     */
 | 
			
		||||
    bool
 | 
			
		||||
    validatorSquelched(PublicKey const& key) const
 | 
			
		||||
    {
 | 
			
		||||
        beast::expire(
 | 
			
		||||
            peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT);
 | 
			
		||||
 | 
			
		||||
        return peersWithValidators_.find(key) != peersWithValidators_.end();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /** Checks whether a given peer was recently sent a squelch message for
 | 
			
		||||
     * a given validator.
 | 
			
		||||
     * @param key Validator public key
 | 
			
		||||
     * @param id Peer id
 | 
			
		||||
     * @return true if a given validator was squelched for a given peeru
 | 
			
		||||
     */
 | 
			
		||||
    bool
 | 
			
		||||
    peerSquelched(PublicKey const& key, id_t id) const
 | 
			
		||||
    {
 | 
			
		||||
        beast::expire(
 | 
			
		||||
            peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT);
 | 
			
		||||
 | 
			
		||||
        auto const it = peersWithValidators_.find(key);
 | 
			
		||||
 | 
			
		||||
        // if validator was not squelched, the peer was also not squelched
 | 
			
		||||
        if (it == peersWithValidators_.end())
 | 
			
		||||
            return false;
 | 
			
		||||
 | 
			
		||||
        // if a peer is found the squelch for it has not expired
 | 
			
		||||
        return it->second.find(id) != it->second.end();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::atomic_bool reduceRelayReady_{false};
 | 
			
		||||
 | 
			
		||||
    hash_map<PublicKey, Slot<clock_type>> slots_;
 | 
			
		||||
    SquelchHandler const& handler_;  // squelch/unsquelch handler
 | 
			
		||||
    slots_map slots_;
 | 
			
		||||
    slots_map untrusted_slots_;
 | 
			
		||||
 | 
			
		||||
    SquelchHandler& handler_;  // squelch/unsquelch handler
 | 
			
		||||
    Logs& logs_;
 | 
			
		||||
    beast::Journal const journal_;
 | 
			
		||||
 | 
			
		||||
    bool const baseSquelchEnabled_;
 | 
			
		||||
    uint16_t const maxSelectedPeers_;
 | 
			
		||||
    bool const enhancedSquelchEnabled_;
 | 
			
		||||
 | 
			
		||||
    // Maintain aged container of message/peers. This is required
 | 
			
		||||
    // to discard duplicate message from the same peer. A message
 | 
			
		||||
@@ -739,6 +830,22 @@ private:
 | 
			
		||||
    // after it was relayed is ignored by PeerImp.
 | 
			
		||||
    inline static messages peersWithMessage_{
 | 
			
		||||
        beast::get_abstract_clock<clock_type>()};
 | 
			
		||||
 | 
			
		||||
    // Maintain aged container of validator/peers. This is used to track
 | 
			
		||||
    // which validator/peer were squelced. A peer that whose squelch
 | 
			
		||||
    // has expired is removed.
 | 
			
		||||
    inline static validators peersWithValidators_{
 | 
			
		||||
        beast::get_abstract_clock<clock_type>()};
 | 
			
		||||
 | 
			
		||||
    struct ValidatorInfo
 | 
			
		||||
    {
 | 
			
		||||
        size_t count;  // the number of messages sent from this validator
 | 
			
		||||
        time_point lastMessage;                // timestamp of the last message
 | 
			
		||||
        std::unordered_set<Peer::id_t> peers;  // a list of peer IDs that sent a
 | 
			
		||||
                                               // message for this validator
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    hash_map<PublicKey, ValidatorInfo> considered_validators_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
@@ -774,61 +881,272 @@ Slots<clock_type>::addPeerMessage(uint256 const& key, id_t id)
 | 
			
		||||
    return true;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
std::optional<PublicKey>
 | 
			
		||||
Slots<clock_type>::updateConsideredValidator(
 | 
			
		||||
    PublicKey const& validator,
 | 
			
		||||
    Peer::id_t peer)
 | 
			
		||||
{
 | 
			
		||||
    auto const now = clock_type::now();
 | 
			
		||||
 | 
			
		||||
    auto it = considered_validators_.find(validator);
 | 
			
		||||
    if (it == considered_validators_.end())
 | 
			
		||||
    {
 | 
			
		||||
        considered_validators_.emplace(std::make_pair(
 | 
			
		||||
            validator,
 | 
			
		||||
            ValidatorInfo{
 | 
			
		||||
                .count = 1,
 | 
			
		||||
                .lastMessage = now,
 | 
			
		||||
                .peers = {peer},
 | 
			
		||||
            }));
 | 
			
		||||
 | 
			
		||||
        return {};
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // the validator idled. Don't update it, it will be cleaned later
 | 
			
		||||
    if (now - it->second.lastMessage > IDLED)
 | 
			
		||||
        return {};
 | 
			
		||||
 | 
			
		||||
    it->second.peers.insert(peer);
 | 
			
		||||
 | 
			
		||||
    it->second.lastMessage = now;
 | 
			
		||||
    ++it->second.count;
 | 
			
		||||
 | 
			
		||||
    if (it->second.count < MAX_MESSAGE_THRESHOLD ||
 | 
			
		||||
        it->second.peers.size() < reduce_relay::MAX_SELECTED_PEERS)
 | 
			
		||||
        return {};
 | 
			
		||||
 | 
			
		||||
    auto const key = it->first;
 | 
			
		||||
    considered_validators_.erase(it);
 | 
			
		||||
 | 
			
		||||
    return key;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
std::vector<PublicKey>
 | 
			
		||||
Slots<clock_type>::cleanConsideredValidators()
 | 
			
		||||
{
 | 
			
		||||
    auto const now = clock_type::now();
 | 
			
		||||
 | 
			
		||||
    std::vector<PublicKey> keys;
 | 
			
		||||
    for (auto it = considered_validators_.begin();
 | 
			
		||||
         it != considered_validators_.end();)
 | 
			
		||||
    {
 | 
			
		||||
        if (now - it->second.lastMessage > IDLED)
 | 
			
		||||
        {
 | 
			
		||||
            keys.push_back(it->first);
 | 
			
		||||
            it = considered_validators_.erase(it);
 | 
			
		||||
        }
 | 
			
		||||
        else
 | 
			
		||||
            ++it;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return keys;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
void
 | 
			
		||||
Slots<clock_type>::updateSlotAndSquelch(
 | 
			
		||||
    uint256 const& key,
 | 
			
		||||
    PublicKey const& validator,
 | 
			
		||||
    id_t id,
 | 
			
		||||
    protocol::MessageType type,
 | 
			
		||||
    typename Slot<clock_type>::ignored_squelch_callback callback)
 | 
			
		||||
    typename Slot<clock_type>::ignored_squelch_callback callback,
 | 
			
		||||
    bool isTrusted)
 | 
			
		||||
{
 | 
			
		||||
    if (!addPeerMessage(key, id))
 | 
			
		||||
        return;
 | 
			
		||||
 | 
			
		||||
    auto it = slots_.find(validator);
 | 
			
		||||
    if (it == slots_.end())
 | 
			
		||||
    // If we receive a message from a trusted validator either update an
 | 
			
		||||
    // existing slot or insert a new one. If we are not running enhanced
 | 
			
		||||
    // squelching also deduplicate untrusted validator messages
 | 
			
		||||
    if (isTrusted || !enhancedSquelchEnabled_)
 | 
			
		||||
    {
 | 
			
		||||
        JLOG(journal_.trace())
 | 
			
		||||
            << "updateSlotAndSquelch: new slot " << Slice(validator);
 | 
			
		||||
        auto it =
 | 
			
		||||
            slots_
 | 
			
		||||
                .emplace(std::make_pair(
 | 
			
		||||
                    validator,
 | 
			
		||||
                    Slot<clock_type>(
 | 
			
		||||
                        handler_, logs_.journal("Slot"), maxSelectedPeers_)))
 | 
			
		||||
                .first;
 | 
			
		||||
        it->second.update(validator, id, type, callback);
 | 
			
		||||
        auto it = slots_
 | 
			
		||||
                      .emplace(std::make_pair(
 | 
			
		||||
                          validator,
 | 
			
		||||
                          Slot<clock_type>(
 | 
			
		||||
                              handler_,
 | 
			
		||||
                              logs_.journal("Slot"),
 | 
			
		||||
                              maxSelectedPeers_,
 | 
			
		||||
                              isTrusted)))
 | 
			
		||||
                      .first;
 | 
			
		||||
        it->second.update(validator, id, callback);
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
        it->second.update(validator, id, type, callback);
 | 
			
		||||
    {
 | 
			
		||||
        auto it = untrusted_slots_.find(validator);
 | 
			
		||||
        // If we received a message from a validator that is not
 | 
			
		||||
        // selected, and is not squelched, there is nothing to do. It
 | 
			
		||||
        // will be squelched later when `updateValidatorSlot` is called.
 | 
			
		||||
        if (it == untrusted_slots_.end())
 | 
			
		||||
            return;
 | 
			
		||||
 | 
			
		||||
        it->second.update(validator, id, callback);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
void
 | 
			
		||||
Slots<clock_type>::updateValidatorSlot(
 | 
			
		||||
    uint256 const& key,
 | 
			
		||||
    PublicKey const& validator,
 | 
			
		||||
    id_t id,
 | 
			
		||||
    typename Slot<clock_type>::ignored_squelch_callback callback)
 | 
			
		||||
{
 | 
			
		||||
    // We received a message from an already selected validator
 | 
			
		||||
    // we can ignore this message
 | 
			
		||||
    if (untrusted_slots_.find(validator) != untrusted_slots_.end())
 | 
			
		||||
        return;
 | 
			
		||||
 | 
			
		||||
    // We received a message from an already squelched validator.
 | 
			
		||||
    // This could happen in few cases:
 | 
			
		||||
    //      1. It happened so that the squelch for a particular peer expired
 | 
			
		||||
    //      before our local squelch.
 | 
			
		||||
    //      2. We receive a message from a new peer that did not receive the
 | 
			
		||||
    //      squelch request.
 | 
			
		||||
    //      3. The peer is ignoring our squelch request and we have not sent
 | 
			
		||||
    //      the controll message in a while.
 | 
			
		||||
    // In all of these cases we can only send them a squelch request again.
 | 
			
		||||
    if (validatorSquelched(validator))
 | 
			
		||||
    {
 | 
			
		||||
        if (!peerSquelched(validator, id))
 | 
			
		||||
        {
 | 
			
		||||
            squelchValidator(validator, id);
 | 
			
		||||
            handler_.squelch(
 | 
			
		||||
                validator, id, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
 | 
			
		||||
        }
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // update a slot if the message is from a selected untrusted validator
 | 
			
		||||
    if (auto const& it = untrusted_slots_.find(validator);
 | 
			
		||||
        it != untrusted_slots_.end())
 | 
			
		||||
    {
 | 
			
		||||
        it->second.update(validator, id, callback);
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Do we have any available slots for additional untrusted validators?
 | 
			
		||||
    // This could happen in few cases:
 | 
			
		||||
    //      1. We received a message from a new untrusted validator, but we
 | 
			
		||||
    //      are at capacity.
 | 
			
		||||
    //      2. We received a message from a previously squelched validator.
 | 
			
		||||
    // In all of these cases we send a squelch message to all peers.
 | 
			
		||||
    // The validator may still  be considered by the selector. However, it
 | 
			
		||||
    // will be eventually cleaned and squelched
 | 
			
		||||
    if (untrusted_slots_.size() == MAX_UNTRUSTED_SLOTS)
 | 
			
		||||
    {
 | 
			
		||||
        handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (auto const v = updateConsideredValidator(validator, id))
 | 
			
		||||
        untrusted_slots_.emplace(std::make_pair(
 | 
			
		||||
            *v,
 | 
			
		||||
            Slot<clock_type>(
 | 
			
		||||
                handler_, logs_.journal("Slot"), maxSelectedPeers_, false)));
 | 
			
		||||
    // When we reach MAX_UNTRUSTED_SLOTS, don't  explicitly clean them.
 | 
			
		||||
    // Since we stop updating their counters, they will idle, and will be
 | 
			
		||||
    // removed and squelched.
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
void
 | 
			
		||||
Slots<clock_type>::deletePeer(id_t id, bool erase)
 | 
			
		||||
{
 | 
			
		||||
    for (auto& [validator, slot] : slots_)
 | 
			
		||||
        slot.deletePeer(validator, id, erase);
 | 
			
		||||
    auto deletePeer = [&](slots_map& slots) {
 | 
			
		||||
        for (auto& [validator, slot] : slots)
 | 
			
		||||
            slot.deletePeer(validator, id, erase);
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    deletePeer(slots_);
 | 
			
		||||
    deletePeer(untrusted_slots_);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
void
 | 
			
		||||
Slots<clock_type>::deleteIdlePeers()
 | 
			
		||||
{
 | 
			
		||||
    auto now = clock_type::now();
 | 
			
		||||
    auto deleteSlots = [&](slots_map& slots) {
 | 
			
		||||
        auto const now = clock_type::now();
 | 
			
		||||
 | 
			
		||||
    for (auto it = slots_.begin(); it != slots_.end();)
 | 
			
		||||
    {
 | 
			
		||||
        it->second.deleteIdlePeer(it->first);
 | 
			
		||||
        if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT)
 | 
			
		||||
        for (auto it = slots.begin(); it != slots.end();)
 | 
			
		||||
        {
 | 
			
		||||
            JLOG(journal_.trace())
 | 
			
		||||
                << "deleteIdlePeers: deleting idle slot " << Slice(it->first);
 | 
			
		||||
            it = slots_.erase(it);
 | 
			
		||||
            it->second.deleteIdlePeer(it->first);
 | 
			
		||||
            if (now - it->second.getLastSelected() >
 | 
			
		||||
                MAX_UNSQUELCH_EXPIRE_DEFAULT)
 | 
			
		||||
            {
 | 
			
		||||
                JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot "
 | 
			
		||||
                                       << Slice(it->first);
 | 
			
		||||
 | 
			
		||||
                // if an untrusted validator slot idled - peers stopped
 | 
			
		||||
                // sending messages for this validator squelch it
 | 
			
		||||
                if (!it->second.isTrusted_)
 | 
			
		||||
                    handler_.squelchAll(
 | 
			
		||||
                        it->first, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
 | 
			
		||||
 | 
			
		||||
                it = slots.erase(it);
 | 
			
		||||
            }
 | 
			
		||||
            else
 | 
			
		||||
                ++it;
 | 
			
		||||
        }
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    deleteSlots(slots_);
 | 
			
		||||
    deleteSlots(untrusted_slots_);
 | 
			
		||||
 | 
			
		||||
    // remove and squelch all validators that the selector deemed unsuitable
 | 
			
		||||
    // there might be some good validators in this set that "lapsed".
 | 
			
		||||
    // However, since these are untrusted validators we're not concerned
 | 
			
		||||
    for (auto const& validator : cleanConsideredValidators())
 | 
			
		||||
        handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename clock_type>
 | 
			
		||||
void
 | 
			
		||||
Slots<clock_type>::onWrite(beast::PropertyStream::Map& stream) const
 | 
			
		||||
{
 | 
			
		||||
    auto const writeSlot =
 | 
			
		||||
        [](beast::PropertyStream::Set& set,
 | 
			
		||||
           hash_map<PublicKey, Slot<clock_type>> const& slots) {
 | 
			
		||||
            for (auto const& [validator, slot] : slots)
 | 
			
		||||
            {
 | 
			
		||||
                beast::PropertyStream::Map item(set);
 | 
			
		||||
                item["validator"] = toBase58(TokenType::NodePublic, validator);
 | 
			
		||||
                slot.onWrite(item);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
    beast::PropertyStream::Map slots("slots", stream);
 | 
			
		||||
 | 
			
		||||
    {
 | 
			
		||||
        beast::PropertyStream::Set set("trusted", slots);
 | 
			
		||||
        writeSlot(set, slots_);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    {
 | 
			
		||||
        beast::PropertyStream::Set set("untrusted", slots);
 | 
			
		||||
        writeSlot(set, untrusted_slots_);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    {
 | 
			
		||||
        beast::PropertyStream::Set set("considered", slots);
 | 
			
		||||
 | 
			
		||||
        auto const now = clock_type::now();
 | 
			
		||||
 | 
			
		||||
        for (auto const& [validator, info] : considered_validators_)
 | 
			
		||||
        {
 | 
			
		||||
            beast::PropertyStream::Map item(set);
 | 
			
		||||
            item["validator"] = toBase58(TokenType::NodePublic, validator);
 | 
			
		||||
            item["lastMessage"] =
 | 
			
		||||
                std::chrono::duration_cast<std::chrono::seconds>(
 | 
			
		||||
                    now - info.lastMessage)
 | 
			
		||||
                    .count();
 | 
			
		||||
            item["messageCount"] = info.count;
 | 
			
		||||
            item["peers"] = info.peers.size();
 | 
			
		||||
        }
 | 
			
		||||
        else
 | 
			
		||||
            ++it;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -22,12 +22,11 @@
 | 
			
		||||
 | 
			
		||||
#include <xrpld/overlay/ReduceRelayCommon.h>
 | 
			
		||||
 | 
			
		||||
#include <xrpl/basics/Log.h>
 | 
			
		||||
#include <xrpl/beast/utility/Journal.h>
 | 
			
		||||
#include <xrpl/protocol/PublicKey.h>
 | 
			
		||||
 | 
			
		||||
#include <algorithm>
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <functional>
 | 
			
		||||
 | 
			
		||||
namespace ripple {
 | 
			
		||||
 | 
			
		||||
@@ -108,7 +107,7 @@ template <typename clock_type>
 | 
			
		||||
bool
 | 
			
		||||
Squelch<clock_type>::expireSquelch(PublicKey const& validator)
 | 
			
		||||
{
 | 
			
		||||
    auto now = clock_type::now();
 | 
			
		||||
    auto const now = clock_type::now();
 | 
			
		||||
 | 
			
		||||
    auto const& it = squelched_.find(validator);
 | 
			
		||||
    if (it == squelched_.end())
 | 
			
		||||
 
 | 
			
		||||
@@ -578,16 +578,23 @@ OverlayImpl::stop()
 | 
			
		||||
void
 | 
			
		||||
OverlayImpl::onWrite(beast::PropertyStream::Map& stream)
 | 
			
		||||
{
 | 
			
		||||
    beast::PropertyStream::Set set("traffic", stream);
 | 
			
		||||
    auto const stats = m_traffic.getCounts();
 | 
			
		||||
    for (auto const& pair : stats)
 | 
			
		||||
    {
 | 
			
		||||
        beast::PropertyStream::Map item(set);
 | 
			
		||||
        item["category"] = pair.second.name;
 | 
			
		||||
        item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
 | 
			
		||||
        item["messages_in"] = std::to_string(pair.second.messagesIn.load());
 | 
			
		||||
        item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
 | 
			
		||||
        item["messages_out"] = std::to_string(pair.second.messagesOut.load());
 | 
			
		||||
        beast::PropertyStream::Set set("traffic", stream);
 | 
			
		||||
        auto const stats = m_traffic.getCounts();
 | 
			
		||||
        for (auto const& pair : stats)
 | 
			
		||||
        {
 | 
			
		||||
            beast::PropertyStream::Map item(set);
 | 
			
		||||
            item["category"] = pair.second.name;
 | 
			
		||||
            item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
 | 
			
		||||
            item["messages_in"] = std::to_string(pair.second.messagesIn.load());
 | 
			
		||||
            item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
 | 
			
		||||
            item["messages_out"] =
 | 
			
		||||
                std::to_string(pair.second.messagesOut.load());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    {
 | 
			
		||||
        slots_.onWrite(stream);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -1410,12 +1417,21 @@ OverlayImpl::squelch(
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
OverlayImpl::squelchAll(PublicKey const& validator, uint32_t squelchDuration)
 | 
			
		||||
{
 | 
			
		||||
    for_each([&](std::shared_ptr<PeerImp>&& p) {
 | 
			
		||||
        slots_.squelchValidator(validator, p->id());
 | 
			
		||||
        p->send(makeSquelchMessage(validator, true, squelchDuration));
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
OverlayImpl::updateSlotAndSquelch(
 | 
			
		||||
    uint256 const& key,
 | 
			
		||||
    PublicKey const& validator,
 | 
			
		||||
    std::set<Peer::id_t>&& peers,
 | 
			
		||||
    protocol::MessageType type)
 | 
			
		||||
    bool isTrusted)
 | 
			
		||||
{
 | 
			
		||||
    if (!slots_.baseSquelchReady())
 | 
			
		||||
        return;
 | 
			
		||||
@@ -1423,14 +1439,22 @@ OverlayImpl::updateSlotAndSquelch(
 | 
			
		||||
    if (!strand_.running_in_this_thread())
 | 
			
		||||
        return post(
 | 
			
		||||
            strand_,
 | 
			
		||||
            [this, key, validator, peers = std::move(peers), type]() mutable {
 | 
			
		||||
                updateSlotAndSquelch(key, validator, std::move(peers), type);
 | 
			
		||||
            [this,
 | 
			
		||||
             key,
 | 
			
		||||
             validator,
 | 
			
		||||
             peers = std::move(peers),
 | 
			
		||||
             isTrusted]() mutable {
 | 
			
		||||
                updateSlotAndSquelch(
 | 
			
		||||
                    key, validator, std::move(peers), isTrusted);
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
    for (auto id : peers)
 | 
			
		||||
        slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
 | 
			
		||||
            reportInboundTraffic(TrafficCount::squelch_ignored, 0);
 | 
			
		||||
        });
 | 
			
		||||
        slots_.updateSlotAndSquelch(
 | 
			
		||||
            key,
 | 
			
		||||
            validator,
 | 
			
		||||
            id,
 | 
			
		||||
            [&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); },
 | 
			
		||||
            isTrusted);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
@@ -1438,17 +1462,39 @@ OverlayImpl::updateSlotAndSquelch(
 | 
			
		||||
    uint256 const& key,
 | 
			
		||||
    PublicKey const& validator,
 | 
			
		||||
    Peer::id_t peer,
 | 
			
		||||
    protocol::MessageType type)
 | 
			
		||||
    bool isTrusted)
 | 
			
		||||
{
 | 
			
		||||
    if (!slots_.baseSquelchReady())
 | 
			
		||||
        return;
 | 
			
		||||
 | 
			
		||||
    if (!strand_.running_in_this_thread())
 | 
			
		||||
        return post(strand_, [this, key, validator, peer, type]() {
 | 
			
		||||
            updateSlotAndSquelch(key, validator, peer, type);
 | 
			
		||||
        return post(strand_, [this, key, validator, peer, isTrusted]() {
 | 
			
		||||
            updateSlotAndSquelch(key, validator, peer, isTrusted);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
    slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
 | 
			
		||||
    slots_.updateSlotAndSquelch(
 | 
			
		||||
        key,
 | 
			
		||||
        validator,
 | 
			
		||||
        peer,
 | 
			
		||||
        [&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); },
 | 
			
		||||
        isTrusted);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
OverlayImpl::updateValidatorSlot(
 | 
			
		||||
    uint256 const& key,
 | 
			
		||||
    PublicKey const& validator,
 | 
			
		||||
    Peer::id_t peer)
 | 
			
		||||
{
 | 
			
		||||
    if (!slots_.enhancedSquelchReady())
 | 
			
		||||
        return;
 | 
			
		||||
 | 
			
		||||
    if (!strand_.running_in_this_thread())
 | 
			
		||||
        return post(strand_, [this, key, validator, peer]() {
 | 
			
		||||
            updateValidatorSlot(key, validator, peer);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
    slots_.updateValidatorSlot(key, validator, peer, [&]() {
 | 
			
		||||
        reportInboundTraffic(TrafficCount::squelch_ignored, 0);
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -399,14 +399,14 @@ public:
 | 
			
		||||
     * @param key Unique message's key
 | 
			
		||||
     * @param validator Validator's public key
 | 
			
		||||
     * @param peers Peers' id to update the slots for
 | 
			
		||||
     * @param type Received protocol message type
 | 
			
		||||
     * @param isTrusted Indicate if the validator is trusted
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    updateSlotAndSquelch(
 | 
			
		||||
        uint256 const& key,
 | 
			
		||||
        PublicKey const& validator,
 | 
			
		||||
        std::set<Peer::id_t>&& peers,
 | 
			
		||||
        protocol::MessageType type);
 | 
			
		||||
        bool isTrusted);
 | 
			
		||||
 | 
			
		||||
    /** Overload to reduce allocation in case of single peer
 | 
			
		||||
     */
 | 
			
		||||
@@ -415,7 +415,22 @@ public:
 | 
			
		||||
        uint256 const& key,
 | 
			
		||||
        PublicKey const& validator,
 | 
			
		||||
        Peer::id_t peer,
 | 
			
		||||
        protocol::MessageType type);
 | 
			
		||||
        bool isTrusted);
 | 
			
		||||
 | 
			
		||||
    /** Updates the slot information for an untrusted validator. If the
 | 
			
		||||
     * untrusted validator was previously squelched, sends TMSquelch message to
 | 
			
		||||
     * the sender of the message. If there are no untrusted slots available
 | 
			
		||||
     * sends TMSquelch message to all peers to squelch messages from the
 | 
			
		||||
     * validator.
 | 
			
		||||
     * @param key Unique message's key
 | 
			
		||||
     * @param validator Validator's public key
 | 
			
		||||
     * @param peers Peers' id to update the slots for
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    updateValidatorSlot(
 | 
			
		||||
        uint256 const& key,
 | 
			
		||||
        PublicKey const& validator,
 | 
			
		||||
        Peer::id_t peer);
 | 
			
		||||
 | 
			
		||||
    /** Called when the peer is deleted. If the peer was selected to be the
 | 
			
		||||
     * source of messages from the validator then squelched peers have to be
 | 
			
		||||
@@ -451,6 +466,10 @@ private:
 | 
			
		||||
        Peer::id_t const id,
 | 
			
		||||
        std::uint32_t squelchDuration) const override;
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    squelchAll(PublicKey const& validator, std::uint32_t squelchDuration)
 | 
			
		||||
        override;
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    unsquelch(PublicKey const& validator, Peer::id_t id) const override;
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1699,21 +1699,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
 | 
			
		||||
    // suppression for 30 seconds to avoid doing a relatively expensive lookup
 | 
			
		||||
    // every time a spam packet is received
 | 
			
		||||
    PublicKey const publicKey{makeSlice(set.nodepubkey())};
 | 
			
		||||
    auto const isTrusted = app_.validators().trusted(publicKey);
 | 
			
		||||
 | 
			
		||||
    // If the operator has specified that untrusted proposals be dropped then
 | 
			
		||||
    // this happens here I.e. before further wasting CPU verifying the signature
 | 
			
		||||
    // of an untrusted key
 | 
			
		||||
    if (!isTrusted)
 | 
			
		||||
    {
 | 
			
		||||
        // report untrusted proposal messages
 | 
			
		||||
        overlay_.reportInboundTraffic(
 | 
			
		||||
            TrafficCount::category::proposal_untrusted,
 | 
			
		||||
            Message::messageSize(*m));
 | 
			
		||||
 | 
			
		||||
        if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
 | 
			
		||||
            return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    uint256 const proposeHash{set.currenttxhash()};
 | 
			
		||||
    uint256 const prevLedger{set.previousledger()};
 | 
			
		||||
@@ -1728,7 +1713,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
 | 
			
		||||
        publicKey.slice(),
 | 
			
		||||
        sig);
 | 
			
		||||
 | 
			
		||||
    if (auto [added, relayed] =
 | 
			
		||||
    auto const isTrusted = app_.validators().trusted(publicKey);
 | 
			
		||||
 | 
			
		||||
    if (auto const& [added, relayed] =
 | 
			
		||||
            app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
 | 
			
		||||
        !added)
 | 
			
		||||
    {
 | 
			
		||||
@@ -1736,7 +1723,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
 | 
			
		||||
        // receives within IDLED seconds since the message has been relayed.
 | 
			
		||||
        if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
 | 
			
		||||
            overlay_.updateSlotAndSquelch(
 | 
			
		||||
                suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
 | 
			
		||||
                suppression, publicKey, id_, isTrusted);
 | 
			
		||||
 | 
			
		||||
        // report duplicate proposal messages
 | 
			
		||||
        overlay_.reportInboundTraffic(
 | 
			
		||||
@@ -1750,6 +1737,16 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
 | 
			
		||||
 | 
			
		||||
    if (!isTrusted)
 | 
			
		||||
    {
 | 
			
		||||
        overlay_.reportInboundTraffic(
 | 
			
		||||
            TrafficCount::category::proposal_untrusted,
 | 
			
		||||
            Message::messageSize(*m));
 | 
			
		||||
 | 
			
		||||
        // If the operator has specified that untrusted proposals be dropped
 | 
			
		||||
        // then this happens here I.e. before further wasting CPU verifying the
 | 
			
		||||
        // signature of an untrusted key
 | 
			
		||||
        if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
 | 
			
		||||
            return;
 | 
			
		||||
 | 
			
		||||
        if (tracking_.load() == Tracking::diverged)
 | 
			
		||||
        {
 | 
			
		||||
            JLOG(p_journal_.debug())
 | 
			
		||||
@@ -2358,20 +2355,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
 | 
			
		||||
        auto const isTrusted =
 | 
			
		||||
            app_.validators().trusted(val->getSignerPublic());
 | 
			
		||||
 | 
			
		||||
        // If the operator has specified that untrusted validations be
 | 
			
		||||
        // dropped then this happens here I.e. before further wasting CPU
 | 
			
		||||
        // verifying the signature of an untrusted key
 | 
			
		||||
        if (!isTrusted)
 | 
			
		||||
        {
 | 
			
		||||
            // increase untrusted validations received
 | 
			
		||||
            overlay_.reportInboundTraffic(
 | 
			
		||||
                TrafficCount::category::validation_untrusted,
 | 
			
		||||
                Message::messageSize(*m));
 | 
			
		||||
 | 
			
		||||
            if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
 | 
			
		||||
                return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        auto key = sha512Half(makeSlice(m->validation()));
 | 
			
		||||
 | 
			
		||||
        auto [added, relayed] =
 | 
			
		||||
@@ -2384,7 +2367,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
 | 
			
		||||
            // relayed.
 | 
			
		||||
            if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
 | 
			
		||||
                overlay_.updateSlotAndSquelch(
 | 
			
		||||
                    key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
 | 
			
		||||
                    key, val->getSignerPublic(), id_, isTrusted);
 | 
			
		||||
 | 
			
		||||
            // increase duplicate validations received
 | 
			
		||||
            overlay_.reportInboundTraffic(
 | 
			
		||||
@@ -2395,6 +2378,22 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // at this point the message is guaranteed to be unique
 | 
			
		||||
        if (!isTrusted)
 | 
			
		||||
        {
 | 
			
		||||
            overlay_.reportInboundTraffic(
 | 
			
		||||
                TrafficCount::category::validation_untrusted,
 | 
			
		||||
                Message::messageSize(*m));
 | 
			
		||||
 | 
			
		||||
            overlay_.updateValidatorSlot(key, val->getSignerPublic(), id_);
 | 
			
		||||
 | 
			
		||||
            // If the operator has specified that untrusted validations be
 | 
			
		||||
            // dropped then this happens here I.e. before further wasting CPU
 | 
			
		||||
            // verifying the signature of an untrusted key
 | 
			
		||||
            if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
 | 
			
		||||
                return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (!isTrusted && (tracking_.load() == Tracking::diverged))
 | 
			
		||||
        {
 | 
			
		||||
            JLOG(p_journal_.debug())
 | 
			
		||||
@@ -2415,6 +2414,23 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
 | 
			
		||||
                return ret;
 | 
			
		||||
            }();
 | 
			
		||||
 | 
			
		||||
            std::stringstream ss;
 | 
			
		||||
            ss << "PEER_IMP_VALIDATION: "
 | 
			
		||||
                  "ledger_hash: "
 | 
			
		||||
               << val->getLedgerHash() << " is_trusted: " << isTrusted
 | 
			
		||||
               << " master_key: ";
 | 
			
		||||
            auto master =
 | 
			
		||||
                app_.validators().getTrustedKey(val->getSignerPublic());
 | 
			
		||||
            if (master)
 | 
			
		||||
            {
 | 
			
		||||
                ss << toBase58(TokenType::NodePublic, *master);
 | 
			
		||||
            }
 | 
			
		||||
            else
 | 
			
		||||
            {
 | 
			
		||||
                ss << "none";
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            JLOG(p_journal_.debug()) << ss.str();
 | 
			
		||||
            std::weak_ptr<PeerImp> weak = shared_from_this();
 | 
			
		||||
            app_.getJobQueue().addJob(
 | 
			
		||||
                isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
 | 
			
		||||
@@ -3008,7 +3024,7 @@ PeerImp::checkPropose(
 | 
			
		||||
                peerPos.suppressionID(),
 | 
			
		||||
                peerPos.publicKey(),
 | 
			
		||||
                std::move(haveMessage),
 | 
			
		||||
                protocol::mtPROPOSE_LEDGER);
 | 
			
		||||
                isTrusted);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -3044,7 +3060,7 @@ PeerImp::checkValidation(
 | 
			
		||||
                    key,
 | 
			
		||||
                    val->getSignerPublic(),
 | 
			
		||||
                    std::move(haveMessage),
 | 
			
		||||
                    protocol::mtVALIDATION);
 | 
			
		||||
                    val->isTrusted());
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user