Compare commits

...

39 Commits

Author SHA1 Message Date
Vito
09065d90bf forces a copy of reference parameter in OverlayImpl::updateUntrustedValidatorSlot 2025-08-05 12:51:28 +02:00
Vito
45078538e9 Merge branch 'develop' into tapanito/feature/enhanced-squelching 2025-08-05 12:28:59 +02:00
Vito
6a1be03732 ammends documentation changig callback to report 2025-07-31 15:05:45 +02:00
Vito
5b8dd45261 clean up imports, and typos 2025-07-31 12:48:13 +02:00
Vito Tumas
395c64bb52 Merge branch 'develop' into tapanito/feature/enhanced-squelching 2025-07-30 18:23:43 +02:00
Vito
3ca6dda72d improves expired squelch deletion 2025-07-30 18:22:10 +02:00
Vito
762fd3b6a5 improves code readability 2025-07-30 18:22:09 +02:00
Vito
f2b13797d1 adds documentation and improves squelching comments 2025-07-30 18:22:09 +02:00
Vito
4be9e6b284 removes redundant checks before erasing elements 2025-07-30 18:22:08 +02:00
Vito
4ab1e1e163 extends enhanced squelching to squelch a validator squelched by majority of peers 2025-07-30 18:22:08 +02:00
Vito
2ec5add603 adds squelchStore 2025-07-30 18:22:06 +02:00
Vito
8087785204 changes selection parameters and number of untrusted slots
Simulation results revealed that by requiring a minimum number of peers to send a message, valdiations will stop propagating, as the servers that are directly connected to peers will receive a unique message from only a single peer, the validator. Therefore, they will fail to select a slot.
Furthermore, by increasing untrusted slots to 30 we guarantee that some validators will propagate through most of the network.
2025-07-30 18:21:23 +02:00
Vito
a9a9b9976a improves code readability 2025-07-30 18:21:22 +02:00
Vito
eb805654e0 adds logic to reset validator progress and better deletion safeguards
If the validator was idle for a short period, reset it's progress. However, if the validator was idle for a long time, delete and squelch it.
Similarly, if the validator sent a lot of unique messages, but failed to reach peering constraints, squelch it.
2025-07-30 18:21:21 +02:00
Vito
8156a70f0e removes redundant scope 2025-07-30 18:21:20 +02:00
Vito
5cba8d653e adds callback to squelchAll instead of calling slots directly 2025-07-30 18:21:19 +02:00
Vito
99b2aa3702 further improves member function and attribute names 2025-07-30 18:21:18 +02:00
Vito
dcb832ecf9 improves code readability 2025-07-30 18:21:18 +02:00
Vito
5e6189d656 fixes windows tests 2025-07-30 18:21:17 +02:00
Vito
2c47fbf6c8 fixes code formatting 2025-07-30 18:21:16 +02:00
Vito
0ef0826466 decouples tests from squelching implementation 2025-07-30 18:21:16 +02:00
Vito
4960ba0f8c improves code readability 2025-07-30 18:21:15 +02:00
Vito
d22a2b82a7 removes unused test parameters 2025-07-30 18:21:14 +02:00
Vito
ea8763060c removes unused imports 2025-07-30 18:21:13 +02:00
Vito
4e9e245a03 removes unused clock 2025-07-30 18:21:13 +02:00
Vito
450129cc16 improves code readabiliy 2025-07-30 18:21:12 +02:00
Vito
8ee5d129b2 refactors squelching to use instanced clock instead of a static clock 2025-07-30 18:21:11 +02:00
Vito
529687ea73 removes duplicate untrusted slot check 2025-07-30 18:21:11 +02:00
Vito Tumas
a57fa169a8 Update src/xrpld/overlay/detail/OverlayImpl.h
Co-authored-by: Valentin Balaschenko <13349202+vlntb@users.noreply.github.com>
2025-07-30 18:21:10 +02:00
Vito
adfbb48ac6 fixes unittests for windows 2025-07-30 18:21:09 +02:00
Vito
ae38c2a2ce removes member functions from Slots used only for testing, and moves them to tests 2025-07-30 18:21:09 +02:00
Vito
15b43bda0d adds enhanced squelching tests 2025-07-30 18:21:08 +02:00
Vito
0f2fec66dc adds methods to print slot details from command 2025-07-30 18:21:07 +02:00
Vito
cf4f95a9c9 feature: extend squelching to suppress untrusted validator traffic
This feature improves network efficiency by limiting message propagation from untrusted validators.

Squelching currently reduces the volume of duplicate messages from validators but does not address the volume of unique messages from untrusted validators, who may not contribute meaningfully to network progress.

This change introduces a bounded number of slots for untrusted validators, selected based on message frequency. Once selected, their duplicate messages are subject to standard squelching logic, thereby reducing overall message overhead without impacting trusted validator performance.
2025-07-30 18:21:07 +02:00
Vito
2599defe98 adds data strcutures and methods to track red validators 2025-07-30 18:21:06 +02:00
Vito
54678dc2a4 adds method to SquelchHandler to squelch all peers 2025-07-30 18:21:06 +02:00
Vito
6bbd836adc adds methods to track which peers and validators were squelched 2025-07-30 18:21:05 +02:00
Vito
48471d5a2b adds isTrusted parameter to updateSlotAndSquelch method to differentiate messages from trusted validators 2025-07-30 18:21:05 +02:00
Vito
d37e422683 adds config option for enhanced squelching 2025-07-30 18:21:04 +02:00
16 changed files with 3392 additions and 1093 deletions

View File

@@ -17,24 +17,22 @@
*/
//==============================================================================
#include <test/jtx.h>
#include <test/jtx/Env.h>
#include <xrpld/overlay/Message.h>
#include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/ReduceRelayCommon.h>
#include <xrpld/overlay/Slot.h>
#include <xrpld/overlay/Squelch.h>
#include <xrpld/overlay/SquelchStore.h>
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpl/basics/random.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/protocol/SecretKey.h>
#include <xrpl/protocol/messages.h>
#include <boost/thread.hpp>
#include <algorithm>
#include <chrono>
#include <iostream>
#include <iterator>
#include <numeric>
#include <optional>
@@ -44,6 +42,21 @@ namespace test {
using namespace std::chrono;
template <class Clock>
class extended_manual_clock : public beast::manual_clock<Clock>
{
public:
using typename beast::manual_clock<Clock>::duration;
using typename beast::manual_clock<Clock>::time_point;
void
randAdvance(std::chrono::milliseconds min, std::chrono::milliseconds max)
{
auto ms = ripple::rand_int(min.count(), max.count());
this->advance(std::chrono::milliseconds(ms));
}
};
class Link;
using MessageSPtr = std::shared_ptr<Message>;
@@ -54,6 +67,7 @@ using SquelchCB =
std::function<void(PublicKey const&, PeerWPtr const&, std::uint32_t)>;
using UnsquelchCB = std::function<void(PublicKey const&, PeerWPtr const&)>;
using LinkIterCB = std::function<void(Link&, MessageSPtr)>;
using TestStopwatch = extended_manual_clock<std::chrono::steady_clock>;
static constexpr std::uint32_t MAX_PEERS = 10;
static constexpr std::uint32_t MAX_VALIDATORS = 10;
@@ -191,52 +205,6 @@ public:
}
};
/** Manually advanced clock. */
class ManualClock
{
public:
typedef uint64_t rep;
typedef std::milli period;
typedef std::chrono::duration<std::uint32_t, period> duration;
typedef std::chrono::time_point<ManualClock> time_point;
inline static bool const is_steady = false;
static void
advance(duration d) noexcept
{
now_ += d;
}
static void
randAdvance(milliseconds min, milliseconds max)
{
now_ += randDuration(min, max);
}
static void
reset() noexcept
{
now_ = time_point(seconds(0));
}
static time_point
now() noexcept
{
return now_;
}
static duration
randDuration(milliseconds min, milliseconds max)
{
return duration(milliseconds(rand_int(min.count(), max.count())));
}
explicit ManualClock() = default;
private:
inline static time_point now_ = time_point(seconds(0));
};
/** Simulate server's OverlayImpl */
class Overlay
{
@@ -249,12 +217,20 @@ public:
uint256 const& key,
PublicKey const& validator,
Peer::id_t id,
SquelchCB f,
protocol::MessageType type = protocol::mtVALIDATION) = 0;
SquelchCB f) = 0;
virtual void deleteIdlePeers(UnsquelchCB) = 0;
virtual void deletePeer(Peer::id_t, UnsquelchCB) = 0;
TestStopwatch&
clock()
{
return clock_;
}
protected:
TestStopwatch clock_;
};
class Validator;
@@ -457,19 +433,39 @@ private:
std::uint16_t id_ = 0;
};
class BaseSquelchingTestSlots : public reduce_relay::Slots
{
using Slots = reduce_relay::Slots;
public:
BaseSquelchingTestSlots(
Logs& logs,
reduce_relay::SquelchHandler& handler,
Config const& config,
reduce_relay::Slots::clock_type& clock)
: Slots(logs, handler, config, clock)
{
}
Slots::slots_map const&
getSlots() const
{
return trustedSlots_;
}
};
class PeerSim : public PeerPartial, public std::enable_shared_from_this<PeerSim>
{
public:
using id_t = Peer::id_t;
PeerSim(Overlay& overlay, beast::Journal journal)
: overlay_(overlay), squelch_(journal)
: overlay_(overlay), squelchStore_(journal, overlay_.clock())
{
id_ = sid_++;
}
~PeerSim() = default;
id_t
Peer::id_t
id() const override
{
return id_;
@@ -487,7 +483,7 @@ public:
{
auto validator = m->getValidatorKey();
assert(validator);
if (!squelch_.expireSquelch(*validator))
if (squelchStore_.isSquelched(*validator))
return;
overlay_.updateSlotAndSquelch({}, *validator, id(), f);
@@ -499,18 +495,17 @@ public:
{
auto validator = squelch.validatorpubkey();
PublicKey key(Slice(validator.data(), validator.size()));
if (squelch.squelch())
squelch_.addSquelch(
key, std::chrono::seconds{squelch.squelchduration()});
else
squelch_.removeSquelch(key);
squelchStore_.handleSquelch(
key,
squelch.squelch(),
std::chrono::seconds{squelch.squelchduration()});
}
private:
inline static id_t sid_ = 0;
id_t id_;
inline static Peer::id_t sid_ = 0;
Peer::id_t id_;
Overlay& overlay_;
reduce_relay::Squelch<ManualClock> squelch_;
reduce_relay::SquelchStore squelchStore_;
};
class OverlaySim : public Overlay, public reduce_relay::SquelchHandler
@@ -518,10 +513,9 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler
using Peers = std::unordered_map<Peer::id_t, PeerSPtr>;
public:
using id_t = Peer::id_t;
using clock_type = ManualClock;
using clock_type = TestStopwatch;
OverlaySim(Application& app)
: slots_(app.logs(), *this, app.config()), logs_(app.logs())
: slots_(app.logs(), *this, app.config(), clock_), logs_(app.logs())
{
}
@@ -531,15 +525,21 @@ public:
clear()
{
peers_.clear();
ManualClock::advance(hours(1));
clock_.advance(hours(1));
slots_.deleteIdlePeers();
}
std::uint16_t
inState(PublicKey const& validator, reduce_relay::PeerState state)
{
auto res = slots_.inState(validator, state);
return res ? *res : 0;
auto const& it = slots_.getSlots().find(validator);
if (it != slots_.getSlots().end())
return std::count_if(
it->second.getPeers().begin(),
it->second.getPeers().end(),
[&](auto const& it) { return (it.second.state == state); });
return 0;
}
void
@@ -547,15 +547,14 @@ public:
uint256 const& key,
PublicKey const& validator,
Peer::id_t id,
SquelchCB f,
protocol::MessageType type = protocol::mtVALIDATION) override
SquelchCB f) override
{
squelch_ = f;
slots_.updateSlotAndSquelch(key, validator, id, type);
slots_.updateSlotAndSquelch(key, validator, id, true);
}
void
deletePeer(id_t id, UnsquelchCB f) override
deletePeer(Peer::id_t id, UnsquelchCB f) override
{
unsquelch_ = f;
slots_.deletePeer(id, true);
@@ -632,40 +631,55 @@ public:
bool
isCountingState(PublicKey const& validator)
{
return slots_.inState(validator, reduce_relay::SlotState::Counting);
auto const& it = slots_.getSlots().find(validator);
if (it != slots_.getSlots().end())
return it->second.getState() == reduce_relay::SlotState::Counting;
return false;
}
std::set<id_t>
std::set<Peer::id_t>
getSelected(PublicKey const& validator)
{
return slots_.getSelected(validator);
auto const& it = slots_.getSlots().find(validator);
if (it == slots_.getSlots().end())
return {};
std::set<Peer::id_t> r;
for (auto const& [id, info] : it->second.getPeers())
if (info.state == reduce_relay::PeerState::Selected)
r.insert(id);
return r;
}
bool
isSelected(PublicKey const& validator, Peer::id_t peer)
{
auto selected = slots_.getSelected(validator);
auto selected = getSelected(validator);
return selected.find(peer) != selected.end();
}
id_t
Peer::id_t
getSelectedPeer(PublicKey const& validator)
{
auto selected = slots_.getSelected(validator);
auto selected = getSelected(validator);
assert(selected.size());
return *selected.begin();
}
std::unordered_map<
id_t,
std::tuple<
reduce_relay::PeerState,
std::uint16_t,
std::uint32_t,
std::uint32_t>>
std::unordered_map<Peer::id_t, reduce_relay::Slot::PeerInfo>
getPeers(PublicKey const& validator)
{
return slots_.getPeers(validator);
auto const& it = slots_.getSlots().find(validator);
if (it == slots_.getSlots().end())
return {};
auto r = std::unordered_map<Peer::id_t, reduce_relay::Slot::PeerInfo>();
for (auto const& [id, info] : it->second.getPeers())
r.emplace(std::make_pair(id, info));
return r;
}
std::uint16_t
@@ -684,17 +698,31 @@ private:
if (auto it = peers_.find(id); it != peers_.end())
squelch_(validator, it->second, squelchDuration);
}
void
squelchAll(
PublicKey const& validator,
std::uint32_t duration,
std::function<void(Peer::id_t)> callback) override
{
for (auto const& [id, peer] : peers_)
{
squelch_(validator, peer, duration);
callback(id);
}
}
void
unsquelch(PublicKey const& validator, Peer::id_t id) const override
{
if (auto it = peers_.find(id); it != peers_.end())
unsquelch_(validator, it->second);
}
SquelchCB squelch_;
UnsquelchCB unsquelch_;
Peers peers_;
Peers peersCache_;
reduce_relay::Slots<ManualClock> slots_;
BaseSquelchingTestSlots slots_;
Logs& logs_;
};
@@ -826,12 +854,8 @@ public:
LinkIterCB link,
std::uint16_t nValidators = MAX_VALIDATORS,
std::uint32_t nMessages = MAX_MESSAGES,
bool purge = true,
bool resetClock = true)
bool purge = true)
{
if (resetClock)
ManualClock::reset();
if (purge)
{
purgePeers();
@@ -840,7 +864,8 @@ public:
for (int m = 0; m < nMessages; ++m)
{
ManualClock::randAdvance(milliseconds(1800), milliseconds(2200));
overlay_.clock().randAdvance(
milliseconds(1800), milliseconds(2200));
for_rand(0, nValidators, [&](std::uint32_t v) {
validators_[v].for_links(link);
});
@@ -874,8 +899,7 @@ public:
for (auto& [_, v] : peers)
{
(void)_;
if (std::get<reduce_relay::PeerState>(v) ==
reduce_relay::PeerState::Squelched)
if (v.state == reduce_relay::PeerState::Squelched)
return false;
}
}
@@ -887,10 +911,9 @@ private:
std::vector<Validator> validators_;
};
class reduce_relay_test : public beast::unit_test::suite
class base_squelch_test : public beast::unit_test::suite
{
using Slot = reduce_relay::Slot<ManualClock>;
using id_t = Peer::id_t;
using Slot = reduce_relay::Slot;
protected:
void
@@ -901,8 +924,7 @@ protected:
<< "num peers " << (int)network_.overlay().getNumPeers()
<< std::endl;
for (auto& [k, v] : peers)
std::cout << k << ":" << (int)std::get<reduce_relay::PeerState>(v)
<< " ";
std::cout << k << ":" << to_string(v.state) << " ";
std::cout << std::endl;
}
@@ -940,7 +962,7 @@ protected:
Peer::id_t peer_;
std::uint16_t validator_;
std::optional<PublicKey> key_;
time_point<ManualClock> time_;
TestStopwatch::time_point time_;
bool handled_ = false;
};
@@ -952,12 +974,12 @@ protected:
{
std::unordered_map<EventType, Event> events{
{LinkDown, {}}, {PeerDisconnected, {}}};
time_point<ManualClock> lastCheck = ManualClock::now();
auto lastCheck = network_.overlay().clock().now();
network_.reset();
network_.propagate([&](Link& link, MessageSPtr m) {
auto& validator = link.validator();
auto now = ManualClock::now();
auto const now = network_.overlay().clock().now();
bool squelched = false;
std::stringstream str;
@@ -981,7 +1003,8 @@ protected:
str << s << " ";
if (log)
std::cout
<< (double)reduce_relay::epoch<milliseconds>(now)
<< (double)std::chrono::duration_cast<milliseconds>(
now.time_since_epoch())
.count() /
1000.
<< " random, squelched, validator: " << validator.id()
@@ -1073,10 +1096,17 @@ protected:
event.isSelected_ =
network_.overlay().isSelected(*event.key_, event.peer_);
auto peers = network_.overlay().getPeers(*event.key_);
auto d = reduce_relay::epoch<milliseconds>(now).count() -
std::get<3>(peers[event.peer_]);
auto d =
std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch())
.count() -
std::chrono::duration_cast<std::chrono::milliseconds>(
peers[event.peer_].lastMessage.time_since_epoch())
.count();
mustHandle = event.isSelected_ &&
d > milliseconds(reduce_relay::IDLED).count() &&
d > milliseconds(reduce_relay::PEER_IDLED).count() &&
network_.overlay().inState(
*event.key_, reduce_relay::PeerState::Squelched) >
0 &&
@@ -1098,7 +1128,7 @@ protected:
}
if (event.state_ == State::WaitReset ||
(event.state_ == State::On &&
(now - event.time_ > (reduce_relay::IDLED + seconds(2)))))
(now - event.time_ > (reduce_relay::PEER_IDLED + seconds(2)))))
{
bool handled =
event.state_ == State::WaitReset || !event.handled_;
@@ -1128,7 +1158,6 @@ protected:
checkCounting(PublicKey const& validator, bool isCountingState)
{
auto countingState = network_.overlay().isCountingState(validator);
BEAST_EXPECT(countingState == isCountingState);
return countingState == isCountingState;
}
@@ -1159,7 +1188,7 @@ protected:
testPeerUnsquelchedTooSoon(bool log)
{
doTest("Peer Unsquelched Too Soon", log, [this](bool log) {
BEAST_EXPECT(propagateNoSquelch(log, 1, false, false, false));
BEAST_EXPECT(propagateNoSquelch(log, 1, false, false));
});
}
@@ -1169,17 +1198,17 @@ protected:
void
testPeerUnsquelched(bool log)
{
ManualClock::advance(seconds(601));
network_.overlay().clock().advance(seconds(601));
doTest("Peer Unsquelched", log, [this](bool log) {
BEAST_EXPECT(propagateNoSquelch(log, 2, true, true, false));
BEAST_EXPECT(propagateNoSquelch(log, 2, true, true));
});
}
/** Propagate enough messages to generate one squelch event */
bool
propagateAndSquelch(bool log, bool purge = true, bool resetClock = true)
propagateAndSquelch(bool log, bool purge = true)
{
int n = 0;
int squelchEvents = 0;
network_.propagate(
[&](Link& link, MessageSPtr message) {
std::uint16_t squelched = 0;
@@ -1199,21 +1228,21 @@ protected:
env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
n++;
squelchEvents++;
}
},
1,
reduce_relay::MAX_MESSAGE_THRESHOLD + 2,
purge,
resetClock);
purge);
auto selected = network_.overlay().getSelected(network_.validator(0));
BEAST_EXPECT(
selected.size() ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
BEAST_EXPECT(n == 1); // only one selection round
BEAST_EXPECT(squelchEvents == 1); // only one selection round
auto res = checkCounting(network_.validator(0), false);
BEAST_EXPECT(res);
return n == 1 && res;
return squelchEvents == 1 && res;
}
/** Send fewer message so that squelch event is not generated */
@@ -1222,8 +1251,7 @@ protected:
bool log,
std::uint16_t nMessages,
bool countingState,
bool purge = true,
bool resetClock = true)
bool purge = true)
{
bool squelched = false;
network_.propagate(
@@ -1239,9 +1267,9 @@ protected:
},
1,
nMessages,
purge,
resetClock);
purge);
auto res = checkCounting(network_.validator(0), countingState);
BEAST_EXPECT(res);
return !squelched && res;
}
@@ -1252,9 +1280,9 @@ protected:
testNewPeer(bool log)
{
doTest("New Peer", log, [this](bool log) {
BEAST_EXPECT(propagateAndSquelch(log, true, false));
BEAST_EXPECT(propagateAndSquelch(log, true));
network_.addPeer();
BEAST_EXPECT(propagateNoSquelch(log, 1, true, false, false));
BEAST_EXPECT(propagateNoSquelch(log, 1, true, false));
});
}
@@ -1264,8 +1292,8 @@ protected:
testSelectedPeerDisconnects(bool log)
{
doTest("Selected Peer Disconnects", log, [this](bool log) {
ManualClock::advance(seconds(601));
BEAST_EXPECT(propagateAndSquelch(log, true, false));
network_.overlay().clock().advance(seconds(601));
BEAST_EXPECT(propagateAndSquelch(log, true));
auto id = network_.overlay().getSelectedPeer(network_.validator(0));
std::uint16_t unsquelched = 0;
network_.overlay().deletePeer(
@@ -1288,15 +1316,16 @@ protected:
testSelectedPeerStopsRelaying(bool log)
{
doTest("Selected Peer Stops Relaying", log, [this](bool log) {
ManualClock::advance(seconds(601));
BEAST_EXPECT(propagateAndSquelch(log, true, false));
ManualClock::advance(reduce_relay::IDLED + seconds(1));
network_.overlay().clock().advance(seconds(601));
BEAST_EXPECT(propagateAndSquelch(log, true));
network_.overlay().clock().advance(
reduce_relay::PEER_IDLED + seconds(1));
std::uint16_t unsquelched = 0;
network_.overlay().deleteIdlePeers(
[&](PublicKey const& key, PeerWPtr const& peer) {
unsquelched++;
});
auto peers = network_.overlay().getPeers(network_.validator(0));
BEAST_EXPECT(
unsquelched ==
MAX_PEERS -
@@ -1313,12 +1342,11 @@ protected:
testSquelchedPeerDisconnects(bool log)
{
doTest("Squelched Peer Disconnects", log, [this](bool log) {
ManualClock::advance(seconds(601));
BEAST_EXPECT(propagateAndSquelch(log, true, false));
network_.overlay().clock().advance(seconds(601));
BEAST_EXPECT(propagateAndSquelch(log, true));
auto peers = network_.overlay().getPeers(network_.validator(0));
auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) {
return std::get<reduce_relay::PeerState>(it.second) ==
reduce_relay::PeerState::Squelched;
return it.second.state == reduce_relay::PeerState::Squelched;
});
assert(it != peers.end());
std::uint16_t unsquelched = 0;
@@ -1469,29 +1497,34 @@ vp_base_squelch_max_selected_peers=2
testBaseSquelchReady(bool log)
{
doTest("BaseSquelchReady", log, [&](bool log) {
ManualClock::reset();
auto createSlots = [&](bool baseSquelchEnabled)
-> reduce_relay::Slots<ManualClock> {
auto createSlots =
[&](bool baseSquelchEnabled,
TestStopwatch stopwatch) -> reduce_relay::Slots {
env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
baseSquelchEnabled;
return reduce_relay::Slots<ManualClock>(
env_.app().logs(), network_.overlay(), env_.app().config());
return reduce_relay::Slots(
env_.app().logs(),
network_.overlay(),
env_.app().config(),
stopwatch);
};
TestStopwatch stopwatch;
// base squelching must not be ready if squelching is disabled
BEAST_EXPECT(!createSlots(false).baseSquelchReady());
BEAST_EXPECT(!createSlots(false, stopwatch).baseSquelchReady());
// base squelch must not be ready as not enough time passed from
// bootup
BEAST_EXPECT(!createSlots(true).baseSquelchReady());
BEAST_EXPECT(!createSlots(true, stopwatch).baseSquelchReady());
ManualClock::advance(reduce_relay::WAIT_ON_BOOTUP + minutes{1});
stopwatch.advance(reduce_relay::WAIT_ON_BOOTUP + minutes{1});
// base squelch enabled and bootup time passed
BEAST_EXPECT(createSlots(true).baseSquelchReady());
BEAST_EXPECT(createSlots(true, stopwatch).baseSquelchReady());
// even if time passed, base squelching must not be ready if turned
// off in the config
BEAST_EXPECT(!createSlots(false).baseSquelchReady());
BEAST_EXPECT(!createSlots(false, stopwatch).baseSquelchReady());
});
}
@@ -1514,7 +1547,7 @@ vp_base_squelch_max_selected_peers=2
auto peers = network_.overlay().getPeers(network_.validator(0));
// first message changes Slot state to Counting and is not counted,
// hence '-1'.
BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
BEAST_EXPECT(peers[0].count == (nMessages - 1));
// add duplicate
uint256 key(nMessages - 1);
network_.overlay().updateSlotAndSquelch(
@@ -1524,9 +1557,10 @@ vp_base_squelch_max_selected_peers=2
[&](PublicKey const&, PeerWPtr, std::uint32_t) {});
// confirm the same number of messages
peers = network_.overlay().getPeers(network_.validator(0));
BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
BEAST_EXPECT(peers[0].count == (nMessages - 1));
// advance the clock
ManualClock::advance(reduce_relay::IDLED + seconds(1));
network_.overlay().clock().advance(
reduce_relay::PEER_IDLED + seconds(1));
network_.overlay().updateSlotAndSquelch(
key,
network_.validator(0),
@@ -1534,7 +1568,7 @@ vp_base_squelch_max_selected_peers=2
[&](PublicKey const&, PeerWPtr, std::uint32_t) {});
peers = network_.overlay().getPeers(network_.validator(0));
// confirm message number increased
BEAST_EXPECT(std::get<1>(peers[0]) == nMessages);
BEAST_EXPECT(peers[0].count == nMessages);
});
}
@@ -1550,6 +1584,15 @@ vp_base_squelch_max_selected_peers=2
if (duration > maxDuration_)
maxDuration_ = duration;
}
void
squelchAll(
PublicKey const&,
std::uint32_t,
std::function<void(Peer::id_t)>) override
{
}
void
unsquelch(PublicKey const&, Peer::id_t) const override
{
@@ -1566,8 +1609,11 @@ vp_base_squelch_max_selected_peers=2
auto run = [&](int npeers) {
handler.maxDuration_ = 0;
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), handler, env_.app().config());
reduce_relay::Slots slots(
env_.app().logs(),
handler,
env_.app().config(),
network_.overlay().clock());
// 1st message from a new peer switches the slot
// to counting state and resets the counts of all peers +
// MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold
@@ -1582,14 +1628,11 @@ vp_base_squelch_max_selected_peers=2
std::uint64_t mid = m * 1000 + peer;
uint256 const message{mid};
slots.updateSlotAndSquelch(
message,
validator,
peer,
protocol::MessageType::mtVALIDATION);
message, validator, peer, true);
}
}
// make Slot's internal hash router expire all messages
ManualClock::advance(hours(1));
network_.overlay().clock().advance(hours(1));
};
using namespace reduce_relay;
@@ -1703,7 +1746,7 @@ vp_base_squelch_max_selected_peers=2
Network network_;
public:
reduce_relay_test()
base_squelch_test()
: env_(*this, jtx::envconfig([](std::unique_ptr<Config> cfg) {
cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = true;
cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6;
@@ -1732,7 +1775,7 @@ public:
}
};
class reduce_relay_simulate_test : public reduce_relay_test
class base_squelch_simulate_test : public base_squelch_test
{
void
testRandom(bool log)
@@ -1748,8 +1791,8 @@ class reduce_relay_simulate_test : public reduce_relay_test
}
};
BEAST_DEFINE_TESTSUITE(reduce_relay, ripple_data, ripple);
BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, ripple_data, ripple);
BEAST_DEFINE_TESTSUITE(base_squelch, ripple_data, ripple);
BEAST_DEFINE_TESTSUITE_MANUAL(base_squelch_simulate, ripple_data, ripple);
} // namespace test

View File

@@ -0,0 +1,980 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx/Env.h>
#include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/ReduceRelayCommon.h>
#include <xrpld/overlay/Slot.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/protocol/SecretKey.h>
#include <chrono>
#include <cstdint>
#include <functional>
#include <optional>
#include <vector>
namespace ripple {
namespace test {
class TestHandler : public reduce_relay::SquelchHandler
{
public:
using squelch_method =
std::function<void(PublicKey const&, Peer::id_t, std::uint32_t)>;
using squelchAll_method = std::function<
void(PublicKey const&, std::uint32_t, std::function<void(Peer::id_t)>)>;
using unsquelch_method = std::function<void(PublicKey const&, Peer::id_t)>;
squelch_method squelch_f_;
squelchAll_method squelchAll_f_;
unsquelch_method unsquelch_f_;
TestHandler(
squelch_method const& squelch_f,
squelchAll_method const& squelchAll_f,
unsquelch_method const& unsquelch_f)
: squelch_f_(squelch_f)
, squelchAll_f_(squelchAll_f)
, unsquelch_f_(unsquelch_f)
{
}
TestHandler(TestHandler& copy)
{
squelch_f_ = copy.squelch_f_;
squelchAll_f_ = copy.squelchAll_f_;
unsquelch_f_ = copy.unsquelch_f_;
}
void
squelch(PublicKey const& validator, Peer::id_t peer, std::uint32_t duration)
const override
{
squelch_f_(validator, peer, duration);
}
void
squelchAll(
PublicKey const& validator,
std::uint32_t duration,
std::function<void(Peer::id_t)> callback) override
{
squelchAll_f_(validator, duration, callback);
}
void
unsquelch(PublicKey const& validator, Peer::id_t peer) const override
{
unsquelch_f_(validator, peer);
}
};
class EnhancedSquelchingTestSlots : public reduce_relay::Slots
{
using Slots = reduce_relay::Slots;
public:
EnhancedSquelchingTestSlots(
Logs& logs,
reduce_relay::SquelchHandler& handler,
Config const& config,
reduce_relay::Slots::clock_type& clock)
: Slots(logs, handler, config, clock)
{
}
Slots::slots_map const&
getSlots(bool trusted) const
{
if (trusted)
return trustedSlots_;
return untrustedSlots_;
}
hash_map<PublicKey, ValidatorInfo> const&
getConsideredValidators()
{
return consideredValidators_;
}
std::optional<PublicKey>
updateConsideredValidator(PublicKey const& validator, Peer::id_t peerID)
{
return Slots::updateConsideredValidator(validator, peerID);
}
void
squelchValidator(PublicKey const& validatorKey, Peer::id_t peerID)
{
Slots::registerSquelchedValidator(validatorKey, peerID);
}
bool
validatorSquelched(PublicKey const& validatorKey)
{
return Slots::expireAndIsValidatorSquelched(validatorKey);
}
bool
peerSquelched(PublicKey const& validatorKey, Peer::id_t peerID)
{
return Slots::expireAndIsPeerSquelched(validatorKey, peerID);
}
};
class enhanced_squelch_test : public beast::unit_test::suite
{
public:
TestHandler::squelch_method noop_squelch =
[&](PublicKey const&, Peer::id_t, std::uint32_t) {
BEAST_EXPECTS(false, "unexpected call to squelch handler");
};
TestHandler::squelchAll_method noop_squelchAll =
[&](PublicKey const&, std::uint32_t, std::function<void(Peer::id_t)>) {
BEAST_EXPECTS(false, "unexpected call to squelchAll handler");
};
TestHandler::unsquelch_method noop_unsquelch = [&](PublicKey const&,
Peer::id_t) {
BEAST_EXPECTS(false, "unexpected call to unsquelch handler");
};
// noop_handler is passed as a place holder Handler to slots
TestHandler noop_handler = {
noop_squelch,
noop_squelchAll,
noop_unsquelch,
};
jtx::Env env_;
enhanced_squelch_test() : env_(*this)
{
env_.app().config().VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = true;
}
void
testConfig()
{
testcase("Test Config - enabled enhanced squelching");
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_enhanced_squelch_enable=1
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == true);
toLoad = R"rippleConfig(
[reduce_relay]
vp_enhanced_squelch_enable=0
)rippleConfig";
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == false);
toLoad = R"rippleConfig(
[reduce_relay]
)rippleConfig";
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == false);
}
/** Tests tracking for squelched validators and peers */
void
testSquelchTracking()
{
testcase("squelchTracking");
Peer::id_t const squelchedPeerID = 0;
Peer::id_t const newPeerID = 1;
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), noop_handler, env_.app().config(), stopwatch);
auto const publicKey = randomKeyPair(KeyType::ed25519).first;
// a new key should not be squelched
BEAST_EXPECTS(
!slots.validatorSquelched(publicKey), "validator squelched");
slots.squelchValidator(publicKey, squelchedPeerID);
// after squelching a peer, the validator must be squelched
BEAST_EXPECTS(
slots.validatorSquelched(publicKey), "validator not squelched");
// the peer must also be squelched
BEAST_EXPECTS(
slots.peerSquelched(publicKey, squelchedPeerID),
"peer not squelched");
// a new peer must not be squelched
BEAST_EXPECTS(
!slots.peerSquelched(publicKey, newPeerID), "new peer squelched");
// advance the manual clock to after expiration
stopwatch.advance(
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT +
std::chrono::seconds{11});
// validator squelch should expire
BEAST_EXPECTS(
!slots.validatorSquelched(publicKey),
"validator squelched after expiry");
// peer squelch should also expire
BEAST_EXPECTS(
!slots.peerSquelched(publicKey, squelchedPeerID),
"validator squelched after expiry");
}
void
testUpdateValidatorSlot_newValidator()
{
testcase("updateValidatorSlot_newValidator");
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), noop_handler, env_.app().config(), stopwatch);
Peer::id_t const peerID = 1;
auto const validator = randomKeyPair(KeyType::ed25519).first;
uint256 const message{0};
slots.updateUntrustedValidatorSlot(message, validator, peerID);
// adding untrusted slot does not effect trusted slots
BEAST_EXPECTS(
slots.getSlots(true).size() == 0, "trusted slots changed");
// we expect that the validator was not added to untrusted slots
BEAST_EXPECTS(
slots.getSlots(false).size() == 0, "untrusted slot changed");
// we expect that the validator was added to th consideration list
BEAST_EXPECTS(
slots.getConsideredValidators().contains(validator),
"new validator was not considered");
}
void
testUpdateValidatorSlot_squelchedValidator()
{
testcase("testUpdateValidatorSlot_squelchedValidator");
Peer::id_t const squelchedPeerID = 0;
Peer::id_t const newPeerID = 1;
auto const validator = randomKeyPair(KeyType::ed25519).first;
TestHandler::squelch_method const squelch_f =
[&](PublicKey const& key, Peer::id_t id, std::uint32_t duration) {
BEAST_EXPECTS(
key == validator,
"squelch called for unknown validator key");
BEAST_EXPECTS(
id == newPeerID, "squelch called for the wrong peer");
};
TestHandler handler{squelch_f, noop_squelchAll, noop_unsquelch};
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), handler, env_.app().config(), stopwatch);
slots.squelchValidator(validator, squelchedPeerID);
// this should not trigger squelch assertions, the peer is squelched
slots.updateUntrustedValidatorSlot(
sha512Half(validator), validator, squelchedPeerID);
slots.updateUntrustedValidatorSlot(
sha512Half(validator), validator, newPeerID);
// the squelched peer remained squelched
BEAST_EXPECTS(
slots.peerSquelched(validator, squelchedPeerID),
"peer not squelched");
// because the validator was squelched, the new peer was also squelched
BEAST_EXPECTS(
slots.peerSquelched(validator, newPeerID),
"new peer was not squelched");
// a squelched validator must not be considered
BEAST_EXPECTS(
!slots.getConsideredValidators().contains(validator),
"squelched validator was added for consideration");
}
void
testUpdateValidatorSlot_slotsFull()
{
testcase("updateValidatorSlot_slotsFull");
Peer::id_t const peerID = 1;
// while there are open untrusted slots, no calls should be made to
// squelch any validators
TestHandler handler{noop_handler};
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), handler, env_.app().config(), stopwatch);
// saturate validator slots
auto const validators = fillUntrustedSlots(slots);
// adding untrusted slot does not effect trusted slots
BEAST_EXPECTS(
slots.getSlots(true).size() == 0, "trusted slots changed");
// simulate additional messages from already selected validators
for (auto const& validator : validators)
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD; ++i)
slots.updateUntrustedValidatorSlot(
sha512Half(validator) + static_cast<uint256>(i),
validator,
peerID);
// an untrusted slot was added for each validator
BEAST_EXPECT(
slots.getSlots(false).size() == reduce_relay::MAX_UNTRUSTED_SLOTS);
for (auto const& validator : validators)
BEAST_EXPECTS(
!slots.validatorSquelched(validator),
"selected validator was squelched");
auto const newValidator = randomKeyPair(KeyType::ed25519).first;
// once slots are full squelchAll must be called for new peer/validator
handler.squelchAll_f_ = [&](PublicKey const& key,
std::uint32_t,
std::function<void(Peer::id_t)> callback) {
BEAST_EXPECTS(
key == newValidator, "unexpected validator squelched");
callback(peerID);
};
slots.updateUntrustedValidatorSlot(
sha512Half(newValidator), newValidator, peerID);
// Once the slots are saturated every other validator is squelched
BEAST_EXPECTS(
slots.validatorSquelched(newValidator),
"untrusted validator not squelched");
BEAST_EXPECTS(
slots.peerSquelched(newValidator, peerID),
"peer for untrusted validator not squelched");
}
void
testDeleteIdlePeers_deleteIdleSlots()
{
testcase("deleteIdlePeers");
TestHandler handler{noop_handler};
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), handler, env_.app().config(), stopwatch);
auto keys = fillUntrustedSlots(slots);
// verify that squelchAll is called for each idled slot validator
handler.squelchAll_f_ = [&](PublicKey const& actualKey,
std::uint32_t duration,
std::function<void(Peer::id_t)> callback) {
for (auto it = keys.begin(); it != keys.end(); ++it)
{
if (*it == actualKey)
{
keys.erase(it);
return;
}
}
BEAST_EXPECTS(false, "unexpected key passed to squelchAll");
};
BEAST_EXPECTS(
slots.getSlots(false).size() == reduce_relay::MAX_UNTRUSTED_SLOTS,
"unexpected number of untrusted slots");
// advance the manual clock to after slot expiration
stopwatch.advance(
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT +
std::chrono::seconds{1});
slots.deleteIdlePeers();
BEAST_EXPECTS(
slots.getSlots(false).size() == 0,
"unexpected number of untrusted slots");
BEAST_EXPECTS(keys.empty(), "not all validators were squelched");
}
void
testDeleteIdlePeers_deleteIdleUntrustedPeer()
{
testcase("deleteIdleUntrustedPeer");
Peer::id_t const peerID = 1;
Peer::id_t const peerID2 = 2;
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), noop_handler, env_.app().config(), stopwatch);
// fill one untrustd validator slot
auto const validator = fillUntrustedSlots(slots, 1)[0];
BEAST_EXPECTS(
slots.getSlots(false).size() == 1,
"unexpected number of untrusted slots");
slots.updateSlotAndSquelch(
sha512Half(validator) + static_cast<uint256>(100),
validator,
peerID,
false);
slots.updateSlotAndSquelch(
sha512Half(validator) + static_cast<uint256>(100),
validator,
peerID2,
false);
slots.deletePeer(peerID, true);
auto const slotPeers = getUntrustedSlotPeers(validator, slots);
BEAST_EXPECTS(
slotPeers.size() == 1, "untrusted validator slot is missing");
BEAST_EXPECTS(
!slotPeers.contains(peerID),
"peer was not removed from untrusted slots");
BEAST_EXPECTS(
slotPeers.contains(peerID2),
"peer was removed from untrusted slots");
}
/** Test that untrusted validator slots are correctly updated by
* updateSlotAndSquelch
*/
void
testUpdateSlotAndSquelch_untrustedValidator()
{
testcase("updateUntrsutedValidatorSlot");
TestHandler handler{noop_handler};
handler.squelch_f_ = [](PublicKey const&, Peer::id_t, std::uint32_t) {};
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), handler, env_.app().config(), stopwatch);
// peers that will be source of validator messages
std::vector<Peer::id_t> peers = {};
// prepare n+1 peers, we expect the n+1st peer will be squelched
for (int i = 0; i <
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS + 1;
++i)
peers.push_back(i);
auto const validator = fillUntrustedSlots(slots, 1)[0];
// Squelching logic resets all counters each time a new peer is added
// Therfore we need to populate counters for each peer before sending
// new messages
for (auto const& peer : peers)
{
auto const now = stopwatch.now();
slots.updateSlotAndSquelch(
sha512Half(validator) +
static_cast<uint256>(now.time_since_epoch().count()),
validator,
peer,
false);
stopwatch.advance(std::chrono::milliseconds{10});
}
// simulate new, unique validator messages sent by peers
for (auto const& peer : peers)
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i)
{
auto const now = stopwatch.now();
slots.updateSlotAndSquelch(
sha512Half(validator) +
static_cast<uint256>(now.time_since_epoch().count()),
validator,
peer,
false);
stopwatch.advance(std::chrono::milliseconds{10});
}
auto const slotPeers = getUntrustedSlotPeers(validator, slots);
BEAST_EXPECTS(
slotPeers.size() ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS +
1,
"untrusted validator slot is missing");
int selected = 0;
int squelched = 0;
for (auto const& [_, info] : slotPeers)
{
switch (info.state)
{
case reduce_relay::PeerState::Selected:
++selected;
break;
case reduce_relay::PeerState::Squelched:
++squelched;
break;
case reduce_relay::PeerState::Counting:
BEAST_EXPECTS(
false, "peer should not be in counting state");
}
}
BEAST_EXPECTS(squelched == 1, "expected one squelched peer");
BEAST_EXPECTS(
selected ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS,
"wrong number of peers selected");
}
void
testUpdateConsideredValidator_new()
{
testcase("testUpdateConsideredValidator_new");
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), noop_handler, env_.app().config(), stopwatch);
// insert some random validator key
auto const validator = randomKeyPair(KeyType::ed25519).first;
Peer::id_t const peerID = 0;
Peer::id_t const peerID2 = 1;
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
BEAST_EXPECTS(
slots.getConsideredValidators().contains(validator),
"new validator was not added for consideration");
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
// expect that a peer will be registered once as a message source
BEAST_EXPECTS(
slots.getConsideredValidators().at(validator).peers.size() == 1,
"duplicate peer was registered");
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID2),
"validator was selected with insufficient number of peers");
// expect that each distinct peer will be registered
BEAST_EXPECTS(
slots.getConsideredValidators().at(validator).peers.size() == 2,
"distinct peers were not registered");
}
void
testUpdateConsideredValidator_idle()
{
testcase("testUpdateConsideredValidator_idle");
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), noop_handler, env_.app().config(), stopwatch);
// insert some random validator key
auto const validator = randomKeyPair(KeyType::ed25519).first;
Peer::id_t peerID = 0;
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
BEAST_EXPECTS(
slots.getConsideredValidators().contains(validator),
"new validator was not added for consideration");
auto const state = slots.getConsideredValidators().at(validator);
// simulate a validator sending a new message before the idle timer
stopwatch.advance(reduce_relay::PEER_IDLED - std::chrono::seconds(1));
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
auto const newState = slots.getConsideredValidators().at(validator);
BEAST_EXPECTS(
state.count + 1 == newState.count,
"non-idling validator was updated");
// simulate a validator idling
stopwatch.advance(reduce_relay::PEER_IDLED + std::chrono::seconds(1));
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
}
void
testUpdateConsideredValidator_selectQualifying()
{
testcase("testUpdateConsideredValidator_selectQualifying");
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), noop_handler, env_.app().config(), stopwatch);
// insert some random validator key
auto const validator = randomKeyPair(KeyType::ed25519).first;
Peer::id_t peerID = 0;
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD - 1; ++i)
{
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected before reaching message threshold");
stopwatch.advance(
reduce_relay::PEER_IDLED - std::chrono::seconds(1));
}
auto const consideredValidator =
slots.updateConsideredValidator(validator, peerID);
BEAST_EXPECTS(
consideredValidator && *consideredValidator == validator,
"expected validator was not selected");
// expect that selected peer was removed
BEAST_EXPECTS(
!slots.getConsideredValidators().contains(validator),
"selected validator was not removed from considered list");
}
void
testCleanConsideredValidators_resetIdle()
{
testcase("testCleanConsideredValidators_resetIdle");
auto const validator = randomKeyPair(KeyType::ed25519).first;
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), noop_handler, env_.app().config(), stopwatch);
// send enough messages for a slot to meet peer requirements
for (int i = 0;
i < env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS;
++i)
slots.updateUntrustedValidatorSlot(
sha512Half(validator) + static_cast<uint256>(i), validator, i);
// send enough messages from some peer to be one message away from
// meeting the selection criteria
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD -
(env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS +
1);
++i)
slots.updateUntrustedValidatorSlot(
sha512Half(validator) + static_cast<uint256>(i), validator, 0);
BEAST_EXPECTS(
slots.getConsideredValidators().at(validator).count ==
reduce_relay::MAX_MESSAGE_THRESHOLD - 1,
"considered validator information is in an invalid state");
BEAST_EXPECTS(
slots.getConsideredValidators().at(validator).peers.size() ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS,
"considered validator information is in an invalid state");
stopwatch.advance(reduce_relay::PEER_IDLED + std::chrono::seconds{1});
// deleteIdlePeers must reset the progress of a validator that idled
slots.deleteIdlePeers();
slots.updateUntrustedValidatorSlot(
sha512Half(validator) + static_cast<uint256>(1), validator, 0);
// we expect that the validator was not selected
BEAST_EXPECTS(
slots.getSlots(false).size() == 0, "untrusted slot was created");
BEAST_EXPECTS(
slots.getConsideredValidators().at(validator).count == 1,
"considered validator information is in an invalid state");
BEAST_EXPECTS(
slots.getConsideredValidators().at(validator).peers.size() == 1,
"considered validator information is in an invalid state");
}
void
testCleanConsideredValidators_deletePoorlyConnected()
{
testcase("cleanConsideredValidators_deletePoorlyConnected");
auto const validator = randomKeyPair(KeyType::ed25519).first;
Peer::id_t const peerID = 0;
TestHandler handler{noop_handler};
// verify that squelchAll is called for poorly connected validator
handler.squelchAll_f_ = [&](PublicKey const& actualKey,
std::uint32_t duration,
std::function<void(Peer::id_t)> callback) {
BEAST_EXPECTS(
actualKey == validator, "unexpected key passed to squelchAll");
callback(peerID);
};
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), handler, env_.app().config(), stopwatch);
// send enough messages from a single peer
for (int i = 0; i < 2 * reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i)
slots.updateUntrustedValidatorSlot(
sha512Half(validator) + static_cast<uint256>(i),
validator,
peerID);
stopwatch.advance(reduce_relay::PEER_IDLED + std::chrono::seconds{1});
// deleteIdlePeers must squelch the validator as it failed to reach
// peering requirements
slots.deleteIdlePeers();
BEAST_EXPECTS(
slots.getConsideredValidators().size() == 0,
"poorly connected validator was not deleted");
}
void
testCleanConsideredValidators_deleteSilent()
{
testcase("cleanConsideredValidators_deleteSilent");
// insert some random validator key
auto const idleValidator = randomKeyPair(KeyType::ed25519).first;
auto const validator = randomKeyPair(KeyType::ed25519).first;
Peer::id_t const peerID = 0;
TestHandler handler{noop_handler};
// verify that squelchAll is called for idle validator
handler.squelchAll_f_ = [&](PublicKey const& actualKey,
std::uint32_t duration,
std::function<void(Peer::id_t)> callback) {
BEAST_EXPECTS(
actualKey == idleValidator,
"unexpected key passed to squelchAll");
callback(peerID);
};
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), handler, env_.app().config(), stopwatch);
BEAST_EXPECTS(
!slots.updateConsideredValidator(idleValidator, peerID),
"validator was selected with insufficient number of peers");
BEAST_EXPECTS(
slots.getConsideredValidators().contains(idleValidator),
"new validator was not added for consideration");
// simulate a validator idling
stopwatch.advance(
reduce_relay::MAX_UNTRUSTED_VALIDATOR_IDLE +
std::chrono::seconds(1));
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
slots.deleteIdlePeers();
BEAST_EXPECTS(
!slots.getConsideredValidators().contains(idleValidator),
"late validator was not removed");
BEAST_EXPECTS(
slots.getConsideredValidators().contains(validator),
"timely validator was removed");
}
void
testSquelchUntrustedValidator_consideredListCleared()
{
testcase("testSquelchUntrustedValidator");
auto const validator = randomKeyPair(KeyType::ed25519).first;
Peer::id_t const peerID = 0;
TestHandler handler{noop_handler};
// verify that squelchAll is called for idle validator
handler.squelchAll_f_ = [&](PublicKey const& actualKey,
std::uint32_t duration,
std::function<void(Peer::id_t)> callback) {
BEAST_EXPECTS(
actualKey == validator, "unexpected key passed to squelchAll");
};
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), handler, env_.app().config(), stopwatch);
// add the validator to the considered list
slots.updateUntrustedValidatorSlot(
sha512Half(validator), validator, peerID);
BEAST_EXPECTS(
slots.getConsideredValidators().contains(validator),
"validator was not added to considered list");
slots.squelchUntrustedValidator(validator);
BEAST_EXPECTS(
!slots.getConsideredValidators().contains(validator),
"validator was not removed from considered list");
}
void
testSquelchUntrustedValidator_slotEvicted()
{
testcase("testSquelchUntrustedValidator_slotEvicted");
TestHandler handler{noop_handler};
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), handler, env_.app().config(), stopwatch);
// assign a slot to the untrusted validator
auto const validators = fillUntrustedSlots(slots, 1);
// verify that squelchAll is called for idle validator
handler.squelchAll_f_ = [&](PublicKey const& actualKey,
std::uint32_t duration,
std::function<void(Peer::id_t)> callback) {
BEAST_EXPECTS(
actualKey == validators[0],
"unexpected key passed to squelchAll");
};
BEAST_EXPECTS(
slots.getSlots(false).contains(validators[0]),
"a slot was not assigned to a validator");
slots.squelchUntrustedValidator(validators[0]);
BEAST_EXPECTS(
!slots.getSlots(false).contains(validators[0]),
"a slot was not evicted");
}
private:
/** A helper method to fill untrusted slots of a given Slots instance
* with random validator messages*/
std::vector<PublicKey>
fillUntrustedSlots(
EnhancedSquelchingTestSlots& slots,
int64_t maxSlots = reduce_relay::MAX_UNTRUSTED_SLOTS)
{
std::vector<PublicKey> keys;
for (int i = 0; i < maxSlots; ++i)
{
auto const validator = randomKeyPair(KeyType::ed25519).first;
keys.push_back(validator);
for (int j = 0; j <
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS;
++j)
// send enough messages so that a validator slot is selected
for (int k = 0; k < reduce_relay::MAX_MESSAGE_THRESHOLD; ++k)
slots.updateUntrustedValidatorSlot(
sha512Half(validator) + static_cast<uint256>(k),
validator,
j);
}
return keys;
}
std::unordered_map<Peer::id_t, reduce_relay::Slot::PeerInfo>
getUntrustedSlotPeers(
PublicKey const& validator,
EnhancedSquelchingTestSlots const& slots)
{
auto const& it = slots.getSlots(false).find(validator);
if (it == slots.getSlots(false).end())
return {};
auto r = std::unordered_map<Peer::id_t, reduce_relay::Slot::PeerInfo>();
for (auto const& [id, info] : it->second.getPeers())
r.emplace(std::make_pair(id, info));
return r;
}
void
run() override
{
testConfig();
testSquelchTracking();
testUpdateValidatorSlot_newValidator();
testUpdateValidatorSlot_slotsFull();
testUpdateValidatorSlot_squelchedValidator();
testDeleteIdlePeers_deleteIdleSlots();
testDeleteIdlePeers_deleteIdleUntrustedPeer();
testUpdateSlotAndSquelch_untrustedValidator();
testUpdateConsideredValidator_new();
testUpdateConsideredValidator_idle();
testUpdateConsideredValidator_selectQualifying();
testCleanConsideredValidators_deleteSilent();
testCleanConsideredValidators_resetIdle();
testCleanConsideredValidators_deletePoorlyConnected();
testSquelchUntrustedValidator_consideredListCleared();
testSquelchUntrustedValidator_slotEvicted();
}
};
BEAST_DEFINE_TESTSUITE(enhanced_squelch, overlay, ripple);
} // namespace test
} // namespace ripple

View File

@@ -0,0 +1,164 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx/Env.h>
#include <xrpld/overlay/ReduceRelayCommon.h>
#include <xrpld/overlay/SquelchStore.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/protocol/PublicKey.h>
#include <chrono>
namespace ripple {
namespace test {
class TestSquelchStore : public reduce_relay::SquelchStore
{
public:
TestSquelchStore(beast::Journal journal, TestStopwatch& clock)
: reduce_relay::SquelchStore(journal, clock)
{
}
hash_map<PublicKey, TestStopwatch::time_point> const&
getSquelched() const
{
return squelched_;
}
};
class squelch_store_test : public beast::unit_test::suite
{
using seconds = std::chrono::seconds;
public:
jtx::Env env_;
squelch_store_test() : env_(*this)
{
}
void
testHandleSquelch()
{
testcase("SquelchStore handleSquelch");
TestStopwatch clock;
auto store = TestSquelchStore(env_.journal, clock);
auto const validator = randomKeyPair(KeyType::ed25519).first;
// attempt to squelch the peer with a too small duration
store.handleSquelch(
validator, true, reduce_relay::MIN_UNSQUELCH_EXPIRE - seconds{1});
// the peer must not be squelched
BEAST_EXPECTS(!store.isSquelched(validator), "peer is squelched");
// attempt to squelch the peer with a too big duration
store.handleSquelch(
validator,
true,
reduce_relay::MAX_UNSQUELCH_EXPIRE_PEERS + seconds{1});
// the peer must not be squelched
BEAST_EXPECTS(!store.isSquelched(validator), "peer is squelched");
// squelch the peer with a good duration
store.handleSquelch(
validator, true, reduce_relay::MIN_UNSQUELCH_EXPIRE + seconds{1});
// the peer for the validator should be squelched
BEAST_EXPECTS(
store.isSquelched(validator),
"peer and validator are not squelched");
// unsquelch the validator
store.handleSquelch(validator, false, seconds{0});
BEAST_EXPECTS(!store.isSquelched(validator), "peer is squelched");
}
void
testIsSquelched()
{
testcase("SquelchStore IsSquelched");
TestStopwatch clock;
auto store = TestSquelchStore(env_.journal, clock);
auto const validator = randomKeyPair(KeyType::ed25519).first;
auto const duration = reduce_relay::MIN_UNSQUELCH_EXPIRE + seconds{1};
store.handleSquelch(
validator, true, reduce_relay::MIN_UNSQUELCH_EXPIRE + seconds{1});
BEAST_EXPECTS(
store.isSquelched(validator),
"peer and validator are not squelched");
clock.advance(duration + seconds{1});
// the peer with short squelch duration must be not squelched
BEAST_EXPECTS(
!store.isSquelched(validator), "peer and validator are squelched");
}
void
testClearExpiredSquelches()
{
testcase("SquelchStore testClearExpiredSquelches");
TestStopwatch clock;
auto store = TestSquelchStore(env_.journal, clock);
auto const validator = randomKeyPair(KeyType::ed25519).first;
auto const duration = reduce_relay::MIN_UNSQUELCH_EXPIRE + seconds{1};
store.handleSquelch(validator, true, duration);
BEAST_EXPECTS(
store.getSquelched().size() == 1,
"validators were not registered in the store");
clock.advance(duration + seconds{1});
auto const validator2 = randomKeyPair(KeyType::ed25519).first;
auto const duration2 = reduce_relay::MIN_UNSQUELCH_EXPIRE + seconds{2};
store.handleSquelch(validator2, true, duration2);
BEAST_EXPECTS(
!store.getSquelched().contains(validator),
"expired squelch was not deleted");
BEAST_EXPECTS(
store.getSquelched().contains(validator2),
"validators were not registered in the store");
}
void
run() override
{
testHandleSquelch();
testIsSquelched();
testClearExpiredSquelches();
}
};
BEAST_DEFINE_TESTSUITE(squelch_store, ripple_data, ripple);
} // namespace test
} // namespace ripple

View File

@@ -254,6 +254,9 @@ public:
std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 5;
///////////////// END OF TEMPORARY CODE BLOCK /////////////////////
// Enable enhanced squelching of unique untrusted validator messages
bool VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = false;
// Transaction reduce-relay feature
bool TX_REDUCE_RELAY_ENABLE = false;
// If tx reduce-relay feature is disabled

View File

@@ -775,6 +775,9 @@ Config::loadFromString(std::string const& fileContents)
"greater than or equal to 3");
///////////////// !!END OF TEMPORARY CODE BLOCK!! /////////////////////
VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE =
sec.value_or("vp_enhanced_squelch_enable", false);
TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false);
TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false);
TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20);

View File

@@ -72,7 +72,6 @@ Previous-Ledger: q4aKbP7sd5wv+EXArwCmQiWZhq9AwBl2p/hCtpGJNsc=
##### Example HTTP Upgrade Response (Success)
```
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
@@ -102,9 +101,9 @@ Content-Type: application/json
#### Standard Fields
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `User-Agent` | :heavy_check_mark: | |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `User-Agent` | :heavy_check_mark: | |
The `User-Agent` field indicates the version of the software that the
peer that is making the HTTP request is using. No semantic meaning is
@@ -113,9 +112,9 @@ specify the version of the software that is used.
See [RFC2616 &sect;14.43](https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.43).
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Server` | | :heavy_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Server` | | :heavy_check_mark: |
The `Server` field indicates the version of the software that the
peer that is processing the HTTP request is using. No semantic meaning is
@@ -124,18 +123,18 @@ specify the version of the software that is used.
See [RFC2616 &sect;14.38](https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.38).
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Connection` | :heavy_check_mark: | :heavy_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Connection` | :heavy_check_mark: | :heavy_check_mark: |
The `Connection` field should have a value of `Upgrade` to indicate that a
request to upgrade the connection is being performed.
See [RFC2616 &sect;14.10](https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.10).
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Upgrade` | :heavy_check_mark: | :heavy_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Upgrade` | :heavy_check_mark: | :heavy_check_mark: |
The `Upgrade` field is part of the standard connection upgrade mechanism and
must be present in both requests and responses. It is used to negotiate the
@@ -156,12 +155,11 @@ equal to 2 and the minor is greater than or equal to 0.
See [RFC 2616 &sect;14.42](https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.42)
#### Custom Fields
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Connect-As` | :heavy_check_mark: | :heavy_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Connect-As` | :heavy_check_mark: | :heavy_check_mark: |
The mandatory `Connect-As` field is used to specify that type of connection
that is being requested.
@@ -175,10 +173,9 @@ elements specified in the request. If a server processing a request does not
recognize any of the connection types, the request should fail with an
appropriate HTTP error code (e.g. by sending an HTTP 400 "Bad Request" response).
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Remote-IP` | :white_check_mark: | :white_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Remote-IP` | :white_check_mark: | :white_check_mark: |
The optional `Remote-IP` field contains the string representation of the IP
address of the remote end of the connection as seen from the peer that is
@@ -187,10 +184,9 @@ sending the field.
By observing values of this field from a sufficient number of different
servers, a peer making outgoing connections can deduce its own IP address.
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Local-IP` | :white_check_mark: | :white_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Local-IP` | :white_check_mark: | :white_check_mark: |
The optional `Local-IP` field contains the string representation of the IP
address that the peer sending the field believes to be its own.
@@ -198,10 +194,9 @@ address that the peer sending the field believes to be its own.
Servers receiving this field can detect IP address mismatches, which may
indicate a potential man-in-the-middle attack.
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Network-ID` | :white_check_mark: | :white_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Network-ID` | :white_check_mark: | :white_check_mark: |
The optional `Network-ID` can be used to identify to which of several
[parallel networks](https://xrpl.org/parallel-networks.html) the server
@@ -217,10 +212,9 @@ If a server configured to join one network receives a connection request from a
server configured to join another network, the request should fail with an
appropriate HTTP error code (e.g. by sending an HTTP 400 "Bad Request" response).
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Network-Time` | :white_check_mark: | :white_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Network-Time` | :white_check_mark: | :white_check_mark: |
The optional `Network-Time` field reports the current [time](https://xrpl.org/basic-data-types.html#specifying-time)
according to sender's internal clock.
@@ -232,20 +226,18 @@ each other with an appropriate HTTP error code (e.g. by sending an HTTP 400
It is highly recommended that servers synchronize their clocks using time
synchronization software. For more on this topic, please visit [ntp.org](http://www.ntp.org/).
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Public-Key` | :heavy_check_mark: | :heavy_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Public-Key` | :heavy_check_mark: | :heavy_check_mark: |
The mandatory `Public-Key` field identifies the sending server's public key,
encoded in base58 using the standard encoding for node public keys.
See: https://xrpl.org/base58-encodings.html
See: <https://xrpl.org/base58-encodings.html>
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Server-Domain` | :white_check_mark: | :white_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Server-Domain` | :white_check_mark: | :white_check_mark: |
The optional `Server-Domain` field allows a server to report the domain that
it is operating under. The value is configured by the server administrator in
@@ -259,10 +251,9 @@ under the specified domain and locating the public key of this server under the
Sending a malformed domain will prevent a connection from being established.
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Session-Signature` | :heavy_check_mark: | :heavy_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Session-Signature` | :heavy_check_mark: | :heavy_check_mark: |
The `Session-Signature` field is mandatory and is used to secure the peer link
against certain types of attack. For more details see "Session Signature" below.
@@ -272,36 +263,35 @@ should support both **Base64** and **HEX** encoding for this value.
For more details on this field, please see **Session Signature** below.
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Crawl` | :white_check_mark: | :white_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Crawl` | :white_check_mark: | :white_check_mark: |
The optional `Crawl` field can be used by a server to indicate whether peers
should include it in crawl reports.
The field can take two values:
- **`Public`**: The server's IP address and port should be included in crawl
reports.
- **`Private`**: The server's IP address and port should not be included in
crawl reports. _This is the default, if the field is omitted._
For more on the Peer Crawler, please visit https://xrpl.org/peer-crawler.html.
For more on the Peer Crawler, please visit <https://xrpl.org/peer-crawler.html>.
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Closed-Ledger` | :white_check_mark: | :white_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Closed-Ledger` | :white_check_mark: | :white_check_mark: |
If present, identifies the hash of the last ledger that the sending server
considers to be closed.
The value is encoded as **HEX**, but implementations should support both
**Base64** and **HEX** encoding for this value for legacy purposes.
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Previous-Ledger` | :white_check_mark: | :white_check_mark: |
| Field Name | Request | Response |
|--------------------- |:-----------------: |:-----------------: |
| `Previous-Ledger` | :white_check_mark: | :white_check_mark: |
If present, identifies the hash of the parent ledger that the sending server
considers to be closed.
@@ -317,7 +307,6 @@ and values in both requests and responses.
Implementations should not reject requests because of the presence of fields
that they do not understand.
### Session Signature
Even for SSL/TLS encrypted connections, it is possible for an attacker to mount
@@ -365,10 +354,52 @@ transferred between A and B and will not be able to intelligently tamper with th
message stream between Alice and Bob, although she may be still be able to inject
delays or terminate the link.
## Peer-to-Peer Traffic Routing
# Ripple Clustering #
### Squelching
A cluster consists of more than one Ripple server under common
Validator Squelching is a network feature that reduces redundant message traffic by intelligently selecting a small subset of peers to listen to for each validator. Messages from non-selected peers are temporarily ignored, or "squelched." This process significantly cuts down on processing overhead and provides dynamic fault tolerance by allowing the network to ignore misbehaving peers without a permanent ban. The system continuously re-evaluates peer performance to adapt to changing network conditions.
### Components
The squelching architecture is built on five key classes:
- `SquelchStore.h`: A low-level, timed key-value store that maps a validator to its squelch expiration timestamp.
- `Slot.h/Slot`: Manages the state for a single validator, tracking all peers that relay its messages. It operates in a Counting state to gather peer performance data and a Selected state after choosing the best peers and squelching the rest. It handles peer disconnections and idleness to keep the selection optimal.
- `Slot.h/Slots`: The central container that manages all active Slot instances. It applies different policies for trusted and untrusted validators, evaluates candidates for the limited untrusted slots, and runs periodic cleanup routines.
- `OverlayImpl.h`: Integrates the squelching system with the network, capturing events and dispatching them to the Slots container in a thread-safe manner.
- `PeerImp.h` - Handles squelch messages, and calls `OverlayImpl.h` when it received proposal or validation messages.
### Component Dependency
The component dependencies follow a clear hierarchy from the network layer down to individual peers:
- `OverlayImpl`: The top-level component that owns a single instance of Slots, and a currently connected Peers.
- `Slots`: This central orchestrator owns and manages a collection of many Slot instances.
- `Slot`: Each Slot represents a single validator and manages the state of all PeerImp instances that relay messages for it.
- `PeerImp`: Represents a connected peer and owns its own instance of SquelchStore to manage its local squelch state for various validators.
### The Squelching Lifecycle
When a message from a validator arrives, it is dispatched to the appropriate Slot. The Slot, initially in a Counting state, tracks message volume from each peer. Once enough data is gathered, it triggers a selection, randomly choosing a small number of the best-performing peers. The Slot then instructs a SquelchHandler to squelch all non-selected peers for a calculated duration and transitions to a Selected state. The system continuously monitors for network changes, such as a selected peer disconnecting, which causes the Slot to reset to the Counting state and begin a new evaluation.
### Trusted vs. Untrusted Validators
The system applies different policies based on validator trust status:
- **Trusted Validators**: Are granted a Slot immediately to optimize traffic from known-good sources.
- **Untrusted Validators**: Are handled more cautiously, especially when the Enhanced Squelching feature is enabled. They must compete for a fixed number of limited slots by first proving their reliability in a "consideration pool." Validators that fail to gain a slot or become idle are aggressively squelched across all peers. This can also be triggered by a network-wide consensus to ignore a specific untrusted validator.
This dual-policy approach optimizes trusted traffic while robustly protecting the network from potentially malicious or unknown validators.
## XRP Ledger Clustering
A cluster consists of more than one XRP Ledger server under common
administration that share load information, distribute cryptography
operations, and provide greater response consistency.
@@ -378,7 +409,7 @@ Cluster nodes share information about their internal load status. Cluster
nodes do not have to verify the cryptographic signatures on messages
received from other cluster nodes.
## Configuration ##
### Configuration
A server's public key can be determined from the output of the `server_info`
command. The key is in the `pubkey_node` value, and is a text string
@@ -404,7 +435,7 @@ New spokes can be added as follows:
- Restart each hub, one by one
- Restart the spoke
## Transaction Behavior ##
### Transaction Behavior
When a transaction is received from a cluster member, several normal checks
are bypassed:
@@ -420,7 +451,7 @@ does not meet its current relay fee. It is preferable to keep the cluster
in agreement and permit confirmation from one cluster member to more
reliably indicate the transaction's acceptance by the cluster.
## Server Load Information ##
### Server Load Information
Cluster members exchange information on their server's load level. The load
level is essentially the amount by which the normal fee levels are multiplied
@@ -431,7 +462,7 @@ fee, is the highest of its local load level, the network load level, and the
cluster load level. The cluster load level is the median load level reported
by a cluster member.
## Gossip ##
### Gossip
Gossip is the mechanism by which cluster members share information about
endpoints (typically IPv4 addresses) that are imposing unusually high load
@@ -446,7 +477,7 @@ the servers in a cluster. With gossip, if he chooses to use the same IP
address to impose load on more than one server, he will find that the amount
of load he can impose before getting disconnected is much lower.
## Monitoring ##
### Monitoring
The `peers` command will report on the status of the cluster. The `cluster`
object will contain one entry for each member of the cluster (either configured

View File

@@ -21,6 +21,7 @@
#define RIPPLE_OVERLAY_REDUCERELAYCOMMON_H_INCLUDED
#include <chrono>
#include <cstdint>
namespace ripple {
@@ -39,19 +40,31 @@ static constexpr auto MIN_UNSQUELCH_EXPIRE = std::chrono::seconds{300};
static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT = std::chrono::seconds{600};
static constexpr auto SQUELCH_PER_PEER = std::chrono::seconds(10);
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS = std::chrono::seconds{3600};
// No message received threshold before identifying a peer as idled
static constexpr auto IDLED = std::chrono::seconds{8};
static constexpr auto PEER_IDLED = std::chrono::seconds{8};
// Message count threshold to start selecting peers as the source
// of messages from the validator. We add peers who reach
// MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS
// reach MAX_MESSAGE_THRESHOLD.
static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 19;
static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 20;
// Max selected peers to choose as the source of messages from validator
static constexpr uint16_t MAX_SELECTED_PEERS = 5;
// Max number of untrusted slots the server will maintain
static constexpr uint16_t MAX_UNTRUSTED_SLOTS = 30;
// The maximum of seconds an untrusted validator can go without sending a
// validation message. After this, a validator may be squelched
static constexpr auto MAX_UNTRUSTED_VALIDATOR_IDLE = std::chrono::seconds{30};
// Wait before reduce-relay feature is enabled on boot up to let
// the server establish peer connections
static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10};
// Maximum size of the aggregated transaction hashes per peer.
// Once we get to high tps throughput, this cap will prevent
// TMTransactions from exceeding the current protocol message

File diff suppressed because it is too large Load Diff

View File

@@ -1,129 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_SQUELCH_H_INCLUDED
#define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED
#include <xrpld/overlay/ReduceRelayCommon.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/PublicKey.h>
#include <algorithm>
#include <chrono>
#include <functional>
namespace ripple {
namespace reduce_relay {
/** Maintains squelching of relaying messages from validators */
template <typename clock_type>
class Squelch
{
using time_point = typename clock_type::time_point;
public:
explicit Squelch(beast::Journal journal) : journal_(journal)
{
}
virtual ~Squelch() = default;
/** Squelch validation/proposal relaying for the validator
* @param validator The validator's public key
* @param squelchDuration Squelch duration in seconds
* @return false if invalid squelch duration
*/
bool
addSquelch(
PublicKey const& validator,
std::chrono::seconds const& squelchDuration);
/** Remove the squelch
* @param validator The validator's public key
*/
void
removeSquelch(PublicKey const& validator);
/** Remove expired squelch
* @param validator Validator's public key
* @return true if removed or doesn't exist, false if still active
*/
bool
expireSquelch(PublicKey const& validator);
private:
/** Maintains the list of squelched relaying to downstream peers.
* Expiration time is included in the TMSquelch message. */
hash_map<PublicKey, time_point> squelched_;
beast::Journal const journal_;
};
template <typename clock_type>
bool
Squelch<clock_type>::addSquelch(
PublicKey const& validator,
std::chrono::seconds const& squelchDuration)
{
if (squelchDuration >= MIN_UNSQUELCH_EXPIRE &&
squelchDuration <= MAX_UNSQUELCH_EXPIRE_PEERS)
{
squelched_[validator] = clock_type::now() + squelchDuration;
return true;
}
JLOG(journal_.error()) << "squelch: invalid squelch duration "
<< squelchDuration.count();
// unsquelch if invalid duration
removeSquelch(validator);
return false;
}
template <typename clock_type>
void
Squelch<clock_type>::removeSquelch(PublicKey const& validator)
{
squelched_.erase(validator);
}
template <typename clock_type>
bool
Squelch<clock_type>::expireSquelch(PublicKey const& validator)
{
auto now = clock_type::now();
auto const& it = squelched_.find(validator);
if (it == squelched_.end())
return true;
else if (it->second > now)
return false;
// squelch expired
squelched_.erase(it);
return true;
}
} // namespace reduce_relay
} // namespace ripple
#endif // RIPPLED_SQUELCH_H

View File

@@ -0,0 +1,146 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_SQUELCH_H_INCLUDED
#define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/PublicKey.h>
#include <chrono>
namespace ripple {
namespace reduce_relay {
/**
* @brief Manages the temporary suppression ("squelching") of validators.
*
* @details This class provides a mechanism to temporarily ignore messages from
* specific validators for a defined duration. It tracks which
* validators are currently squelched and handles the
* expiration of the squelch period. The use of an
* abstract clock allows for deterministic testing of time-based
* squelch logic.
*/
class SquelchStore
{
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
using time_point = typename clock_type::time_point;
public:
explicit SquelchStore(beast::Journal journal, clock_type& clock)
: journal_(journal), clock_(clock)
{
}
virtual ~SquelchStore() = default;
/**
* @brief Manages the squelch status of a validator.
*
* @details This function acts as the primary public interface for
* controlling a validator's squelch state. Based on the `squelch` flag, it
* either adds a new squelch entry for the specified duration or removes an
* existing one. This function also clears all expired squelches.
*
* @param validator The public key of the validator to manage.
* @param squelch If `true`, the validator will be squelched. If `false`,
* any existing squelch will be removed.
* @param duration The duration in seconds for the squelch. This value is
* only used when `squelch` is `true`.
*/
void
handleSquelch(
PublicKey const& validator,
bool squelch,
std::chrono::seconds duration);
/**
* @brief Checks if a validator is currently squelched.
*
* @details This function checks if the validator's squelch has expired.
*
* @param validator The public key of the validator to check.
* @return `true` if a non-expired squelch entry exists for the
* validator, `false` otherwise.
*/
bool
isSquelched(PublicKey const& validator) const;
// The following field is protected for unit tests.
protected:
/**
* @brief The core data structure mapping a validator's public key to the
* time point when their squelch expires.
*/
hash_map<PublicKey, time_point> squelched_;
private:
/**
* @brief Internal implementation to add or update a squelch entry.
*
* @details Calculates the expiration time point by adding the duration to
* the current time and inserts or overwrites the entry for the validator in
* the `squelched_` map.
*
* @param validator The public key of the validator to squelch.
* @param squelchDuration The duration for which the validator should be
* squelched.
*/
void
add(PublicKey const& validator,
std::chrono::seconds const& squelchDuration);
/**
* @brief Internal implementation to remove a squelch entry.
*
* @details Erases the squelch entry for the given validator from the
* `squelched_` map, effectively unsquelching it.
*
* @param validator The public key of the validator to unsquelch.
*/
void
remove(PublicKey const& validator);
/**
* @brief Internal implementation to remove all expired squelches.
*
* @details Erases all squelch entries whose expiration is in the past.
*/
void
removeExpired();
/**
* @brief The logging interface used by this store.
*/
beast::Journal const journal_;
/**
* @brief A reference to the clock used for all time-based operations,
* allowing for deterministic testing via dependency injection.
*/
clock_type& clock_;
};
} // namespace reduce_relay
} // namespace ripple
#endif // RIPPLED_SQUELCH_H

View File

@@ -24,10 +24,12 @@
#include <xrpld/app/rdb/RelationalDatabase.h>
#include <xrpld/app/rdb/Wallet.h>
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/detail/ConnectAttempt.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/TrafficCount.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/overlay/predicates.h>
#include <xrpld/peerfinder/make_Manager.h>
#include <xrpld/rpc/handlers/GetCounts.h>
#include <xrpld/rpc/json_body.h>
@@ -39,9 +41,7 @@
#include <xrpl/protocol/STTx.h>
#include <xrpl/server/SimpleWriter.h>
#include <boost/algorithm/string/predicate.hpp>
#include "xrpld/overlay/detail/TrafficCount.h"
#include <functional>
namespace ripple {
@@ -142,7 +142,7 @@ OverlayImpl::OverlayImpl(
, m_resolver(resolver)
, next_id_(1)
, timer_count_(0)
, slots_(app.logs(), *this, app.config())
, slots_(app.logs(), *this, app.config(), stopwatch())
, m_stats(
std::bind(&OverlayImpl::collect_metrics, this),
collector,
@@ -578,17 +578,22 @@ OverlayImpl::stop()
void
OverlayImpl::onWrite(beast::PropertyStream::Map& stream)
{
beast::PropertyStream::Set set("traffic", stream);
auto const stats = m_traffic.getCounts();
for (auto const& pair : stats)
{
beast::PropertyStream::Map item(set);
item["category"] = pair.second.name;
item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
item["messages_in"] = std::to_string(pair.second.messagesIn.load());
item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
item["messages_out"] = std::to_string(pair.second.messagesOut.load());
beast::PropertyStream::Set set("traffic", stream);
auto const stats = m_traffic.getCounts();
for (auto const& pair : stats)
{
beast::PropertyStream::Map item(set);
item["category"] = pair.second.name;
item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
item["messages_in"] = std::to_string(pair.second.messagesIn.load());
item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
item["messages_out"] =
std::to_string(pair.second.messagesOut.load());
}
}
slots_.onWrite(stream);
}
//------------------------------------------------------------------------------
@@ -1410,12 +1415,24 @@ OverlayImpl::squelch(
}
}
void
OverlayImpl::squelchAll(
PublicKey const& validator,
uint32_t squelchDuration,
std::function<void(Peer::id_t)> report)
{
for_each([&](std::shared_ptr<PeerImp>&& p) {
p->send(makeSquelchMessage(validator, true, squelchDuration));
report(p->id());
});
}
void
OverlayImpl::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
std::set<Peer::id_t>&& peers,
protocol::MessageType type)
bool isTrusted)
{
if (!slots_.baseSquelchReady())
return;
@@ -1428,14 +1445,18 @@ OverlayImpl::updateSlotAndSquelch(
key = key,
validator = validator,
peers = std::move(peers),
type]() mutable {
updateSlotAndSquelch(key, validator, std::move(peers), type);
isTrusted]() mutable {
updateSlotAndSquelch(
key, validator, std::move(peers), isTrusted);
});
for (auto id : peers)
slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
});
slots_.updateSlotAndSquelch(
key,
validator,
id,
[&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); },
isTrusted);
}
void
@@ -1443,7 +1464,7 @@ OverlayImpl::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
Peer::id_t peer,
protocol::MessageType type)
bool isTrusted)
{
if (!slots_.baseSquelchReady())
return;
@@ -1452,15 +1473,64 @@ OverlayImpl::updateSlotAndSquelch(
return post(
strand_,
// Must capture copies of reference parameters (i.e. key, validator)
[this, key = key, validator = validator, peer, type]() {
updateSlotAndSquelch(key, validator, peer, type);
[this, key = key, validator = validator, peer, isTrusted]() {
updateSlotAndSquelch(key, validator, peer, isTrusted);
});
slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
slots_.updateSlotAndSquelch(
key,
validator,
peer,
[&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); },
isTrusted);
}
void
OverlayImpl::updateUntrustedValidatorSlot(
uint256 const& key,
PublicKey const& validator,
Peer::id_t peer)
{
if (!slots_.enhancedSquelchReady())
return;
if (!strand_.running_in_this_thread())
return post(
strand_,
// Must capture copies of reference parameters (i.e. key, validator)
[this, key = key, validator = validator, peer]() {
updateUntrustedValidatorSlot(key, validator, peer);
});
slots_.updateUntrustedValidatorSlot(key, validator, peer, [&]() {
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
});
}
void
OverlayImpl::handleUntrustedSquelch(PublicKey const& validator)
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(&OverlayImpl::handleUntrustedSquelch, this, validator));
auto count = 0;
// we can get the total number of peers with size(), however that would have
// to acquire another lock on peers. Instead, count the number of peers in
// the same loop, as we're already iterating all peers.
auto total = 0;
for_each([&](std::shared_ptr<PeerImp>&& p) {
++total;
if (p->isSquelched(validator))
++count;
});
// if majority of peers squelched the validator
if (count >= total - 1)
slots_.squelchUntrustedValidator(validator);
}
void
OverlayImpl::deletePeer(Peer::id_t id)
{

View File

@@ -24,6 +24,7 @@
#include <xrpld/core/Job.h>
#include <xrpld/overlay/Message.h>
#include <xrpld/overlay/Overlay.h>
#include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/Slot.h>
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpld/overlay/detail/TrafficCount.h>
@@ -48,6 +49,7 @@
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
@@ -103,7 +105,7 @@ private:
boost::asio::io_service& io_service_;
std::optional<boost::asio::io_service::work> work_;
boost::asio::io_service::strand strand_;
mutable std::recursive_mutex mutex_; // VFALCO use std::mutex
std::recursive_mutex mutable mutex_; // VFALCO use std::mutex
std::condition_variable_any cond_;
std::weak_ptr<Timer> timer_;
boost::container::flat_map<Child*, std::weak_ptr<Child>> list_;
@@ -122,7 +124,7 @@ private:
std::atomic<uint64_t> peerDisconnects_{0};
std::atomic<uint64_t> peerDisconnectsCharges_{0};
reduce_relay::Slots<UptimeClock> slots_;
reduce_relay::Slots slots_;
// Transaction reduce-relay metrics
metrics::TxMetrics txMetrics_;
@@ -392,35 +394,90 @@ public:
return setup_.networkID;
}
/** Updates message count for validator/peer. Sends TMSquelch if the number
* of messages for N peers reaches threshold T. A message is counted
* if a peer receives the message for the first time and if
* the message has been relayed.
* @param key Unique message's key
* @param validator Validator's public key
* @param peers Peers' id to update the slots for
* @param type Received protocol message type
/**
* @brief Processes a message from a validator received via multiple peers.
*
* @details This function serves as a thread-safe entry point to the
* squelching system.
*
* @param key The unique hash of the message.
* @param validator The public key of the validator.
* @param peers A set of peer IDs that relayed this message.
* @param isTrusted `true` if the message is from a trusted validator.
*/
void
updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
std::set<Peer::id_t>&& peers,
protocol::MessageType type);
bool isTrusted);
/** Overload to reduce allocation in case of single peer
/**
* @brief Processes a message from a validator received via a single peer.
*
* @details This function is a thread-safe entry point for handling a
* message from a single peer. It ensures the squelching feature is ready
* and serializes the call onto the `strand_`. It then invokes the
* underlying `Slots::updateSlotAndSquelch` method to process the message.
*
* @param key The unique hash of the message.
* @param validator The public key of the validator.
* @param peer The ID of the peer that relayed this message.
* @param isTrusted `true` if the message is from a trusted validator.
*/
void
updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
Peer::id_t peer,
protocol::MessageType type);
bool isTrusted);
/** Called when the peer is deleted. If the peer was selected to be the
* source of messages from the validator then squelched peers have to be
* unsquelched.
* @param id Peer's id
/**
* @brief Processes a message specifically for the untrusted validator slot
* logic.
*
* @details This function is the thread-safe entry point for the enhanced
* squelching feature, which manages a limited number of slots for
* untrusted validators. It ensures the feature is ready, posts the work to
* the `strand_`, and then calls the underlying
* `Slots::updateUntrustedValidatorSlot` to handle the slot admission and
* evaluation logic.
*
* @param key The unique hash of the message.
* @param validator The public key of the untrusted validator.
* @param peer The ID of the peer that relayed this message.
*/
void
updateUntrustedValidatorSlot(
uint256 const& key,
PublicKey const& validator,
Peer::id_t peer);
/**
* @brief Handles a squelch message for an untrusted validator.
*
* @details This function is called when this node receives a message
* indicating that a peer is squelching an untrusted validator. It
* tallies how many of its own connected peers have also squelched the
* validator. If a majority of peers agree, this node takes definitive local
* action by calling `Slots::squelchUntrustedValidator`, effectively joining
* the consensus to silence the validator.
*
* @param validator The public key of the untrusted validator being
* squelched.
*/
void
handleUntrustedSquelch(PublicKey const& validator);
/**
* @brief Handles the deletion of a peer from the overlay network.
*
* @details This function provides a thread-safe entry point for removing a
* peer. It ensures the operation is executed on the correct strand and
* then delegates the logic to `Slots::deletePeer`, which notifies all
* active slots about the peer's removal.
*
* @param id The ID of the peer to be deleted.
*/
void
deletePeer(Peer::id_t id);
@@ -451,6 +508,12 @@ private:
Peer::id_t const id,
std::uint32_t squelchDuration) const override;
void
squelchAll(
PublicKey const& validator,
std::uint32_t squelchDuration,
std::function<void(Peer::id_t)>) override;
void
unsquelch(PublicKey const& validator, Peer::id_t id) const override;
@@ -477,7 +540,7 @@ private:
/** Handles validator list requests.
Using a /vl/<hex-encoded public key> URL, will retrieve the
latest valdiator list (or UNL) that this node has for that
latest validator list (or UNL) that this node has for that
public key, if the node trusts that public key.
@return true if the request was handled.
@@ -555,8 +618,13 @@ private:
void
sendTxQueue();
/** Check if peers stopped relaying messages
* and if slots stopped receiving messages from the validator */
/**
* @brief Triggers the cleanup of idle peers and stale slots.
*
* @details This function is a thread-safe wrapper that executes
* `Slots::deleteIdlePeers` to perform the necessary cleanup of inactive
* peers, stale slots, and unviable validator candidates.
*/
void
deleteIdlePeers();

View File

@@ -29,6 +29,7 @@
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/app/tx/apply.h>
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/ReduceRelayCommon.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/perflog/PerfLog.h>
@@ -44,6 +45,7 @@
#include <boost/beast/core/ostream.hpp>
#include <algorithm>
#include <chrono>
#include <memory>
#include <mutex>
#include <numeric>
@@ -95,7 +97,7 @@ PeerImp::PeerImp(
, publicKey_(publicKey)
, lastPingTime_(clock_type::now())
, creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
, squelchStore_(app_.journal("SquelchStore"), stopwatch())
, usage_(consumer)
, fee_{Resource::feeTrivialPeer, ""}
, slot_(slot)
@@ -246,8 +248,8 @@ PeerImp::send(std::shared_ptr<Message> const& m)
if (detaching_)
return;
auto validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator))
auto const validator = m->getValidatorKey();
if (validator && isSquelched(*validator))
{
overlay_.reportOutboundTraffic(
TrafficCount::category::squelch_suppressed,
@@ -265,7 +267,7 @@ PeerImp::send(std::shared_ptr<Message> const& m)
TrafficCount::category::total,
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
auto sendq_size = send_queue_.size();
auto const sendq_size = send_queue_.size();
if (sendq_size < Tuning::targetSendQueue)
{
@@ -570,6 +572,12 @@ PeerImp::hasRange(std::uint32_t uMin, std::uint32_t uMax)
(uMax <= maxLedger_);
}
bool
PeerImp::isSquelched(PublicKey const& validator) const
{
return squelchStore_.isSquelched(validator);
}
//------------------------------------------------------------------------------
void
@@ -1699,21 +1707,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
// suppression for 30 seconds to avoid doing a relatively expensive lookup
// every time a spam packet is received
PublicKey const publicKey{makeSlice(set.nodepubkey())};
auto const isTrusted = app_.validators().trusted(publicKey);
// If the operator has specified that untrusted proposals be dropped then
// this happens here I.e. before further wasting CPU verifying the signature
// of an untrusted key
if (!isTrusted)
{
// report untrusted proposal messages
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_untrusted,
Message::messageSize(*m));
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
return;
}
uint256 const proposeHash{set.currenttxhash()};
uint256 const prevLedger{set.previousledger()};
@@ -1728,15 +1721,18 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
publicKey.slice(),
sig);
if (auto [added, relayed] =
auto const isTrusted = app_.validators().trusted(publicKey);
if (auto const& [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
// receives within IDLED seconds since the message has been relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
if (relayed &&
(stopwatch().now() - *relayed) < reduce_relay::PEER_IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
suppression, publicKey, id_, isTrusted);
// report duplicate proposal messages
overlay_.reportInboundTraffic(
@@ -1750,6 +1746,16 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
if (!isTrusted)
{
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_untrusted,
Message::messageSize(*m));
// If the operator has specified that untrusted proposals be dropped
// then this happens here I.e. before further wasting CPU verifying the
// signature of an untrusted key
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
return;
if (tracking_.load() == Tracking::diverged)
{
JLOG(p_journal_.debug())
@@ -2358,20 +2364,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
auto const isTrusted =
app_.validators().trusted(val->getSignerPublic());
// If the operator has specified that untrusted validations be
// dropped then this happens here I.e. before further wasting CPU
// verifying the signature of an untrusted key
if (!isTrusted)
{
// increase untrusted validations received
overlay_.reportInboundTraffic(
TrafficCount::category::validation_untrusted,
Message::messageSize(*m));
if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
return;
}
auto key = sha512Half(makeSlice(m->validation()));
auto [added, relayed] =
@@ -2382,9 +2374,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
if (relayed &&
(stopwatch().now() - *relayed) < reduce_relay::PEER_IDLED)
overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
key, val->getSignerPublic(), id_, isTrusted);
// increase duplicate validations received
overlay_.reportInboundTraffic(
@@ -2395,6 +2388,25 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
return;
}
// at this point the message is guaranteed to be unique
if (!isTrusted)
{
overlay_.reportInboundTraffic(
TrafficCount::category::validation_untrusted,
Message::messageSize(*m));
// If the operator has specified that untrusted validations be
// dropped then this happens here I.e. before further wasting CPU
// verifying the signature of an untrusted key
// TODO: Deprecate RELAY_UNTRUSTED_VALIDATIONS config once enhanced
// squelching is the defacto routing algorithm.
if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
return;
overlay_.updateUntrustedValidatorSlot(
key, val->getSignerPublic(), id_);
}
if (!isTrusted && (tracking_.load() == Tracking::diverged))
{
JLOG(p_journal_.debug())
@@ -2704,6 +2716,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
fee_.update(Resource::feeInvalidData, "squelch no pubkey");
return;
}
auto validator = m->validatorpubkey();
auto const slice{makeSlice(validator)};
if (!publicKeyType(slice))
@@ -2721,15 +2734,27 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
return;
}
std::uint32_t duration =
m->has_squelchduration() ? m->squelchduration() : 0;
if (!m->squelch())
squelch_.removeSquelch(key);
else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
auto duration = std::chrono::seconds{
m->has_squelchduration() ? m->squelchduration() : 0};
if (m->squelch() &&
(duration < reduce_relay::MIN_UNSQUELCH_EXPIRE ||
duration > reduce_relay::MAX_UNSQUELCH_EXPIRE_PEERS))
{
fee_.update(Resource::feeInvalidData, "squelch duration");
return;
}
JLOG(p_journal_.debug())
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
<< "onMessage: TMSquelch " << (!m->squelch() ? "un" : "")
<< "squelch message; validator: " << slice << "peer: " << id()
<< " duration: " << duration.count();
squelchStore_.handleSquelch(key, m->squelch(), duration);
// if the squelch is for an untrusted validator
if (m->squelch() && !app_.validators().trusted(key))
overlay_.handleUntrustedSquelch(key);
}
//--------------------------------------------------------------------------
@@ -3013,7 +3038,7 @@ PeerImp::checkPropose(
peerPos.suppressionID(),
peerPos.publicKey(),
std::move(haveMessage),
protocol::mtPROPOSE_LEDGER);
isTrusted);
}
}
@@ -3049,7 +3074,7 @@ PeerImp::checkValidation(
key,
val->getSignerPublic(),
std::move(haveMessage),
protocol::mtVALIDATION);
val->isTrusted());
}
}
}

View File

@@ -23,7 +23,8 @@
#include <xrpld/app/consensus/RCLCxPeerPos.h>
#include <xrpld/app/ledger/detail/LedgerReplayMsgHandler.h>
#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/overlay/Squelch.h>
#include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/SquelchStore.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/overlay/detail/ProtocolVersion.h>
#include <xrpld/peerfinder/PeerfinderManager.h>
@@ -116,7 +117,7 @@ private:
clock_type::time_point lastPingTime_;
clock_type::time_point const creationTime_;
reduce_relay::Squelch<UptimeClock> squelch_;
reduce_relay::SquelchStore squelchStore_;
// Notes on thread locking:
//
@@ -440,6 +441,13 @@ public:
return txReduceRelayEnabled_;
}
/** Check if a given validator is squelched.
* @param validator Validator's public key
* @return true if squelch exists and it is not expired. False otherwise.
*/
bool
isSquelched(PublicKey const& validator) const;
private:
void
close();
@@ -680,7 +688,7 @@ PeerImp::PeerImp(
, publicKey_(publicKey)
, lastPingTime_(clock_type::now())
, creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
, squelchStore_(app_.journal("SquelchStore"), stopwatch())
, usage_(usage)
, fee_{Resource::feeTrivialPeer}
, slot_(std::move(slot))

View File

@@ -0,0 +1,722 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/ReduceRelayCommon.h>
#include <xrpld/overlay/Slot.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/UnorderedContainers.h>
#include <xrpl/basics/chrono.h>
#include <xrpl/basics/random.h>
#include <xrpl/beast/container/aged_unordered_map.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/PropertyStream.h>
#include <xrpl/protocol/PublicKey.h>
#include <chrono>
#include <cstddef>
#include <optional>
#include <sstream>
#include <string>
#include <vector>
namespace ripple {
namespace reduce_relay {
void
Slot::deleteIdlePeer(PublicKey const& validator)
{
using namespace std::chrono;
auto const now = clock_.now();
for (auto it = peers_.begin(); it != peers_.end();)
{
auto const& peer = it->second;
auto const id = it->first;
++it;
if (now - peer.lastMessage > reduce_relay::PEER_IDLED)
{
JLOG(journal_.trace())
<< "deleteIdlePeer: deleting idle peer "
<< formatLogMessage(validator, id)
<< " peer_state: " << to_string(peer.state)
<< " idle for: " << (now - peer.lastMessage).count();
deletePeer(validator, id, false);
}
}
}
void
Slot::update(
PublicKey const& validator,
Peer::id_t id,
ignored_squelch_callback report)
{
using namespace std::chrono;
auto const now = clock_.now();
auto const it = peers_.find(id);
// First message from this peer
if (it == peers_.end())
{
JLOG(journal_.trace())
<< "update: adding new slot" << formatLogMessage(validator, id);
peers_.emplace(std::make_pair(
id,
PeerInfo{
.state = PeerState::Counting,
.count = 0,
.expire = now,
.lastMessage = now,
.timesSelected = 0}));
initCounting();
return;
}
// Message from a peer with expired squelch
if (it->second.state == PeerState::Squelched && now > it->second.expire)
{
JLOG(journal_.trace())
<< "update: squelch expired" << formatLogMessage(validator, id);
it->second.state = PeerState::Counting;
it->second.lastMessage = now;
initCounting();
return;
}
auto& peer = it->second;
peer.lastMessage = now;
// report if we received a message from a squelched peer
if (peer.state == PeerState::Squelched)
report();
if (getState() != SlotState::Counting || peer.state == PeerState::Squelched)
return;
if (++peer.count > reduce_relay::MIN_MESSAGE_THRESHOLD)
considered_.insert(id);
if (peer.count == (reduce_relay::MAX_MESSAGE_THRESHOLD + 1))
++reachedThreshold_;
if (now - lastSelected_ > 2 * reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT)
{
JLOG(journal_.warn())
<< "update: resetting due to inactivity"
<< formatLogMessage(validator, id) << " inactive for: "
<< duration_cast<seconds>(now - lastSelected_).count();
initCounting();
return;
}
if (reachedThreshold_ == maxSelectedPeers_)
{
// Randomly select maxSelectedPeers_ peers from considered.
// Exclude peers that have been idling > IDLED -
// it's possible that deleteIdlePeer() has not been called yet.
// If number of remaining peers != maxSelectedPeers_
// then reset the Counting state and let deleteIdlePeer() handle
// idled peers.
std::unordered_set<Peer::id_t> selected;
std::stringstream str;
while (selected.size() != maxSelectedPeers_ && considered_.size() != 0)
{
auto const i =
considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1);
auto const it = std::next(considered_.begin(), i);
auto const id = *it;
considered_.erase(it);
auto const& peersIt = peers_.find(id);
if (peersIt == peers_.end())
{
JLOG(journal_.error()) << "update: peer not found"
<< formatLogMessage(validator, id);
continue;
}
if (now - peersIt->second.lastMessage < reduce_relay::PEER_IDLED)
{
selected.insert(id);
str << id << " ";
}
}
if (selected.size() != maxSelectedPeers_)
{
JLOG(journal_.error()) << "update: selection failed"
<< formatLogMessage(validator, std::nullopt);
initCounting();
return;
}
lastSelected_ = now;
JLOG(journal_.trace()) << "update: selected peers "
<< formatLogMessage(validator, std::nullopt)
<< " peers: " << str.str();
XRPL_ASSERT(
peers_.size() >= maxSelectedPeers_,
"ripple::reduce_relay::Slot::update : minimum peers");
// squelch peers which are not selected and
// not already squelched
str.clear();
for (auto& [k, v] : peers_)
{
v.count = 0;
if (selected.find(k) != selected.end())
{
v.state = PeerState::Selected;
++v.timesSelected;
}
else if (v.state != PeerState::Squelched)
{
if (journal_.trace())
str << k << " ";
v.state = PeerState::Squelched;
std::chrono::seconds duration =
getSquelchDuration(peers_.size() - maxSelectedPeers_);
v.expire = now + duration;
handler_.squelch(validator, k, duration.count());
}
}
JLOG(journal_.trace()) << "update: squelched peers "
<< formatLogMessage(validator, std::nullopt)
<< " peers: " << str.str();
considered_.clear();
reachedThreshold_ = 0;
state_ = SlotState::Selected;
}
}
std::chrono::seconds
Slot::getSquelchDuration(std::size_t npeers) const
{
using namespace std::chrono;
auto m = std::max(
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT,
seconds{reduce_relay::SQUELCH_PER_PEER * npeers});
if (m > reduce_relay::MAX_UNSQUELCH_EXPIRE_PEERS)
{
m = reduce_relay::MAX_UNSQUELCH_EXPIRE_PEERS;
JLOG(journal_.warn())
<< "getSquelchDuration: unexpected squelch duration " << npeers;
}
return seconds{
ripple::rand_int(reduce_relay::MIN_UNSQUELCH_EXPIRE / 1s, m / 1s)};
}
void
Slot::deletePeer(PublicKey const& validator, Peer::id_t id, bool erase)
{
auto it = peers_.find(id);
if (it == peers_.end())
return;
std::vector<Peer::id_t> toUnsquelch;
auto const now = clock_.now();
if (it->second.state == PeerState::Selected)
{
JLOG(journal_.debug())
<< "deletePeer: unsquelching selected peer "
<< formatLogMessage(validator, id)
<< " peer_state: " << to_string(it->second.state)
<< " considered: " << (considered_.find(id) != considered_.end())
<< " erase: " << erase;
for (auto& [k, v] : peers_)
{
if (v.state == PeerState::Squelched)
toUnsquelch.push_back(k);
v.state = PeerState::Counting;
v.count = 0;
v.expire = now;
}
considered_.clear();
reachedThreshold_ = 0;
state_ = SlotState::Counting;
}
else if (considered_.contains(id))
{
if (it->second.count > reduce_relay::MAX_MESSAGE_THRESHOLD)
--reachedThreshold_;
considered_.erase(id);
}
it->second.lastMessage = now;
it->second.count = 0;
if (erase)
peers_.erase(it);
// Must be after peers_.erase(it)
for (auto const& k : toUnsquelch)
handler_.unsquelch(validator, k);
}
void
Slot::onWrite(beast::PropertyStream::Map& stream) const
{
auto const now = clock_.now();
stream["state"] = to_string(getState());
stream["reachedThreshold"] = reachedThreshold_;
stream["considered"] = considered_.size();
stream["lastSelected"] =
duration_cast<std::chrono::seconds>(now - lastSelected_).count();
stream["isTrusted"] = isTrusted_;
beast::PropertyStream::Set peers("peers", stream);
for (auto const& [id, info] : peers_)
{
beast::PropertyStream::Map item(peers);
item["id"] = id;
item["count"] = info.count;
item["expire"] =
duration_cast<std::chrono::seconds>(info.expire - now).count();
item["lastMessage"] =
duration_cast<std::chrono::seconds>(now - info.lastMessage).count();
item["timesSelected"] = info.timesSelected;
item["state"] = to_string(info.state);
}
}
void
Slot::initCounting()
{
state_ = SlotState::Counting;
considered_.clear();
reachedThreshold_ = 0;
for (auto& [_, peer] : peers_)
{
(void)_;
peer.count = 0;
}
}
std::string
Slot::formatLogMessage(PublicKey const& validator, std::optional<Peer::id_t> id)
const
{
std::stringstream ss;
ss << "validator: " << toBase58(TokenType::NodePublic, validator);
if (id)
ss << " peer: " << *id;
ss << " trusted: " << isTrusted_;
ss << " slot_state: " << to_string(getState());
return ss.str();
}
// --------------------------------- Slots --------------------------------- //
bool
Slots::reduceRelayReady()
{
if (!reduceRelayReady_)
reduceRelayReady_ =
std::chrono::duration_cast<std::chrono::minutes>(
clock_.now().time_since_epoch()) > reduce_relay::WAIT_ON_BOOTUP;
return reduceRelayReady_;
}
void
Slots::registerSquelchedValidator(
PublicKey const& validatorKey,
Peer::id_t peerID)
{
peersWithSquelchedValidators_[validatorKey].insert(peerID);
}
bool
Slots::expireAndIsValidatorSquelched(PublicKey const& validatorKey)
{
beast::expire(
peersWithSquelchedValidators_,
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT);
return peersWithSquelchedValidators_.find(validatorKey) !=
peersWithSquelchedValidators_.end();
}
bool
Slots::expireAndIsPeerSquelched(
PublicKey const& validatorKey,
Peer::id_t peerID)
{
beast::expire(
peersWithSquelchedValidators_,
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT);
auto const it = peersWithSquelchedValidators_.find(validatorKey);
// if validator was not squelched, the peer was also not squelched
if (it == peersWithSquelchedValidators_.end())
return false;
// if a peer is found the squelch for it has not expired
return it->second.find(peerID) != it->second.end();
}
bool
Slots::expireAndIsPeerMessageCached(uint256 const& key, Peer::id_t id)
{
beast::expire(peersWithMessage_, reduce_relay::PEER_IDLED);
// return false if the ID was not inserted
if (key.isNonZero())
return !peersWithMessage_[key].insert(id).second;
return false;
}
void
Slots::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
Peer::id_t id,
typename Slot::ignored_squelch_callback report,
bool isTrusted)
{
if (expireAndIsPeerMessageCached(key, id))
return;
// If we receive a message from a trusted validator either update an
// existing slot or insert a new one. If we are not running enhanced
// squelching also deduplicate untrusted validator messages
if (isTrusted || !enhancedSquelchEnabled_)
{
// if enhanced squelching is disabled, keep untrusted validator slots
// separately from trusted ones
auto it = (isTrusted ? trustedSlots_ : untrustedSlots_)
.emplace(std::make_pair(
validator,
Slot(
handler_,
logs_.journal("Slot"),
maxSelectedPeers_,
isTrusted,
clock_)))
.first;
it->second.update(validator, id, report);
}
else
{
auto it = untrustedSlots_.find(validator);
// If we received a message from a validator that is not
// selected, and is not squelched, there is nothing to do. It
// will be squelched later when `updateValidatorSlot` is called.
if (it == untrustedSlots_.end())
return;
it->second.update(validator, id, report);
}
}
void
Slots::updateUntrustedValidatorSlot(
uint256 const& key,
PublicKey const& validator,
Peer::id_t id,
typename Slot::ignored_squelch_callback report)
{
// We received a message from an already selected validator
// we can ignore this message
if (untrustedSlots_.find(validator) != untrustedSlots_.end())
return;
// Did we receive a message from an already squelched validator?
// This could happen in few cases:
// 1. It happened so that the squelch for a particular peer expired
// before our local squelch.
// 2. We receive a message from a new peer that did not receive the
// squelch request.
// 3. The peer is ignoring our squelch request and we have not sent
// the control message in a while.
// In all of these cases we can only send them a squelch request again.
if (expireAndIsValidatorSquelched(validator))
{
if (!expireAndIsPeerSquelched(validator, id))
{
JLOG(journal_.debug())
<< "updateUntrustedValidatorSlot: received a message from a "
"squelched validator "
<< "validator: " << toBase58(TokenType::NodePublic, validator)
<< " peer: " << id;
registerSquelchedValidator(validator, id);
handler_.squelch(
validator,
id,
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
}
return;
}
// Do we have any available slots for additional untrusted validators?
// This could happen in few cases:
// 1. We received a message from a new untrusted validator, but we
// are at capacity.
// 2. We received a message from a previously squelched validator.
// In all of these cases we send a squelch message to all peers.
// The validator may still be considered by the selector. However, it
// will be eventually cleaned and squelched
if (untrustedSlots_.size() == reduce_relay::MAX_UNTRUSTED_SLOTS)
{
JLOG(journal_.debug())
<< "updateUntrustedValidatorSlot: slots full squelching validator "
<< "validator: " << toBase58(TokenType::NodePublic, validator);
handler_.squelchAll(
validator,
MAX_UNSQUELCH_EXPIRE_DEFAULT.count(),
[&](Peer::id_t id) { registerSquelchedValidator(validator, id); });
return;
}
if (auto const v = updateConsideredValidator(validator, id))
{
JLOG(journal_.debug())
<< "updateUntrustedValidatorSlot: selected untrusted validator "
<< "validator: " << toBase58(TokenType::NodePublic, *v);
untrustedSlots_.emplace(std::make_pair(
*v,
Slot(
handler_,
logs_.journal("Slot"),
maxSelectedPeers_,
false,
clock_)));
}
// When we reach MAX_UNTRUSTED_SLOTS, don't explicitly clean them.
// Since we stop updating their counters, they will idle, and will be
// removed and squelched.
}
std::optional<PublicKey>
Slots::updateConsideredValidator(PublicKey const& validator, Peer::id_t peer)
{
auto const now = clock_.now();
auto it = consideredValidators_.find(validator);
if (it == consideredValidators_.end())
{
consideredValidators_.emplace(std::make_pair(
validator,
ValidatorInfo{
.count = 1,
.lastMessage = now,
.peers = {peer},
}));
return std::nullopt;
}
it->second.peers.insert(peer);
it->second.lastMessage = now;
++it->second.count;
// if the validator has not met selection criteria yet
if (it->second.count < reduce_relay::MAX_MESSAGE_THRESHOLD)
return std::nullopt;
auto const key = it->first;
consideredValidators_.erase(it);
return key;
}
void
Slots::squelchUntrustedValidator(PublicKey const& validator)
{
JLOG(journal_.info())
<< "squelchUntrustedValidator: squelching untrusted validator: "
<< toBase58(TokenType::NodePublic, validator);
// to prevent the validator from being reinserted squelch the validator
// before removing the validator from consideration and slots
handler_.squelchAll(
validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count(), [&](Peer::id_t id) {
registerSquelchedValidator(validator, id);
});
consideredValidators_.erase(validator);
untrustedSlots_.erase(validator);
}
void
Slots::deletePeer(Peer::id_t id, bool erase)
{
auto const f = [&](slots_map& slots) {
for (auto& [validator, slot] : slots)
slot.deletePeer(validator, id, erase);
};
f(trustedSlots_);
f(untrustedSlots_);
}
void
Slots::deleteIdlePeers()
{
auto const f = [&](slots_map& slots) {
auto const now = clock_.now();
for (auto it = slots.begin(); it != slots.end();)
{
auto const& validator = it->first;
auto& slot = it->second;
slot.deleteIdlePeer(validator);
// delete the slot if the untrusted slot no longer meets the
// selection critera or it has not been selected for a while
if ((!slot.isTrusted_ &&
slot.getPeers().size() < maxSelectedPeers_) ||
now - it->second.getLastSelected() >
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT)
{
JLOG(journal_.trace())
<< "deleteIdlePeers: deleting "
<< (slot.isTrusted_ ? "trusted" : "untrusted") << " slot "
<< toBase58(TokenType::NodePublic, it->first) << " reason: "
<< (now - it->second.getLastSelected() >
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT
? " inactive "
: " insufficient peers");
// if an untrusted validator slot idled - peers stopped
// sending messages for this validator squelch it
if (!it->second.isTrusted_)
handler_.squelchAll(
it->first,
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT.count(),
[&](Peer::id_t id) {
registerSquelchedValidator(it->first, id);
});
it = slots.erase(it);
}
else
++it;
}
};
f(trustedSlots_);
f(untrustedSlots_);
// remove and squelch all validators that the selector deemed unsuitable
// there might be some good validators in this set that "lapsed".
// However, since these are untrusted validators we're not concerned
for (auto const& validator : cleanConsideredValidators())
handler_.squelchAll(
validator,
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT.count(),
[&](Peer::id_t id) { registerSquelchedValidator(validator, id); });
}
std::vector<PublicKey>
Slots::cleanConsideredValidators()
{
auto const now = clock_.now();
std::vector<PublicKey> keys;
std::stringstream ss;
for (auto it = consideredValidators_.begin();
it != consideredValidators_.end();)
{
if (now - it->second.lastMessage >
reduce_relay::MAX_UNTRUSTED_VALIDATOR_IDLE)
{
keys.push_back(it->first);
ss << " " << toBase58(TokenType::NodePublic, it->first);
it = consideredValidators_.erase(it);
}
// Due to some reason the validator idled, reset their progress
else if (now - it->second.lastMessage > reduce_relay::PEER_IDLED)
{
it->second.reset();
++it;
}
else
++it;
}
if (keys.size() > 0)
{
JLOG(journal_.info())
<< "cleanConsideredValidators: removed considered validators "
<< ss.str();
}
return keys;
}
void
Slots::onWrite(beast::PropertyStream::Map& stream) const
{
auto const writeSlot = [](beast::PropertyStream::Set& set,
hash_map<PublicKey, Slot> const& slots) {
for (auto const& [validator, slot] : slots)
{
beast::PropertyStream::Map item(set);
item["validator"] = toBase58(TokenType::NodePublic, validator);
slot.onWrite(item);
}
};
beast::PropertyStream::Map slots("slots", stream);
{
beast::PropertyStream::Set set("trusted", slots);
writeSlot(set, trustedSlots_);
}
{
beast::PropertyStream::Set set("untrusted", slots);
writeSlot(set, untrustedSlots_);
}
{
beast::PropertyStream::Set set("considered", slots);
auto const now = clock_.now();
for (auto const& [validator, info] : consideredValidators_)
{
beast::PropertyStream::Map item(set);
item["validator"] = toBase58(TokenType::NodePublic, validator);
item["lastMessage"] =
std::chrono::duration_cast<std::chrono::seconds>(
now - info.lastMessage)
.count();
item["messageCount"] = info.count;
item["peers"] = info.peers.size();
}
}
}
} // namespace reduce_relay
} // namespace ripple

View File

@@ -0,0 +1,101 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpld/overlay/ReduceRelayCommon.h>
#include <xrpld/overlay/SquelchStore.h>
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/PublicKey.h>
#include <chrono>
#include <unordered_map>
namespace ripple {
namespace reduce_relay {
void
SquelchStore::handleSquelch(
PublicKey const& validator,
bool squelch,
std::chrono::seconds duration)
{
// Remove all expired squelches. This call is here, as it is on the least
// critical execution path, that does not require periodic cleanup calls.
removeExpired();
if (squelch)
{
// This should never trigger. The squelch duration is validated in
// PeerImp.onMessage(TMSquelch). However, if somehow invalid duration is
// passed, log is as an error
if ((duration < reduce_relay::MIN_UNSQUELCH_EXPIRE ||
duration > reduce_relay::MAX_UNSQUELCH_EXPIRE_PEERS))
{
JLOG(journal_.error())
<< "SquelchStore: invalid squelch duration validator: "
<< Slice(validator) << " duration: " << duration.count();
return;
}
add(validator, duration);
return;
}
remove(validator);
}
bool
SquelchStore::isSquelched(PublicKey const& validator) const
{
auto const now = clock_.now();
auto const it = squelched_.find(validator);
if (it == squelched_.end())
return false;
return it->second > now;
}
void
SquelchStore::add(
PublicKey const& validator,
std::chrono::seconds const& duration)
{
squelched_[validator] = clock_.now() + duration;
}
void
SquelchStore::remove(PublicKey const& validator)
{
squelched_.erase(validator);
}
void
SquelchStore::removeExpired()
{
auto const now = clock_.now();
std::erase_if(
squelched_, [&](auto const& entry) { return entry.second < now; });
}
} // namespace reduce_relay
} // namespace ripple