Compare commits

...

15 Commits

Author SHA1 Message Date
Vito
c142e1d86b better early validation logging 2025-06-03 11:43:49 +02:00
Vito
feef06393a extra logging 2025-06-02 23:41:27 +02:00
Vito
e80f46f296 extra peer logging 2025-06-02 22:14:14 +02:00
Vito
72b19a1c89 better logging 2025-06-02 11:53:20 +02:00
Vito
2a9c38693a fixes unittests for windows 2025-05-28 17:08:48 +02:00
Vito Tumas
261cf0c74c Merge branch 'develop' into tapanito/feature/enhanced-squelching 2025-05-28 16:38:16 +02:00
Vito
a038b70bf4 removes member functions from Slots used only for testing, and moves them to tests 2025-05-28 16:35:41 +02:00
Vito
d4c6910c8b adds enhanced squelching tests 2025-05-28 16:33:42 +02:00
Vito
9ecb457e55 adds methods to print slot details from command 2025-05-28 16:30:59 +02:00
Vito
71871bb9b6 feature: extend squelching to suppress untrusted validator traffic
This feature improves network efficiency by limiting message propagation from untrusted validators.

Squelching currently reduces the volume of duplicate messages from validators but does not address the volume of unique messages from untrusted validators, who may not contribute meaningfully to network progress.

This change introduces a bounded number of slots for untrusted validators, selected based on message frequency. Once selected, their duplicate messages are subject to standard squelching logic, thereby reducing overall message overhead without impacting trusted validator performance.
2025-05-28 16:29:33 +02:00
Vito
ba536ebfd8 adds data strcutures and methods to track red validators 2025-05-28 16:25:15 +02:00
Vito
1e02961a63 adds method to SquelchHandler to squelch all peers 2025-05-28 16:21:47 +02:00
Vito
5613dab898 adds methods to track which peers and validators were squelched 2025-05-28 16:20:01 +02:00
Vito
69aec23e1b adds isTrusted parameter to updateSlotAndSquelch method to differentiate messages from trusted validators 2025-05-28 16:09:23 +02:00
Vito
34c3591554 adds config option for enhanced squelching 2025-05-28 15:57:10 +02:00
11 changed files with 1584 additions and 352 deletions

View File

@@ -17,8 +17,8 @@
*/
//==============================================================================
#include <test/jtx.h>
#include <test/jtx/Env.h>
#include <test/overlay/clock.h>
#include <xrpld/overlay/Message.h>
#include <xrpld/overlay/Peer.h>
@@ -33,8 +33,8 @@
#include <boost/thread.hpp>
#include <chrono>
#include <iostream>
#include <algorithm>
#include <iterator>
#include <numeric>
#include <optional>
@@ -191,52 +191,6 @@ public:
}
};
/** Manually advanced clock. */
class ManualClock
{
public:
typedef uint64_t rep;
typedef std::milli period;
typedef std::chrono::duration<std::uint32_t, period> duration;
typedef std::chrono::time_point<ManualClock> time_point;
inline static bool const is_steady = false;
static void
advance(duration d) noexcept
{
now_ += d;
}
static void
randAdvance(milliseconds min, milliseconds max)
{
now_ += randDuration(min, max);
}
static void
reset() noexcept
{
now_ = time_point(seconds(0));
}
static time_point
now() noexcept
{
return now_;
}
static duration
randDuration(milliseconds min, milliseconds max)
{
return duration(milliseconds(rand_int(min.count(), max.count())));
}
explicit ManualClock() = default;
private:
inline static time_point now_ = time_point(seconds(0));
};
/** Simulate server's OverlayImpl */
class Overlay
{
@@ -249,8 +203,7 @@ public:
uint256 const& key,
PublicKey const& validator,
Peer::id_t id,
SquelchCB f,
protocol::MessageType type = protocol::mtVALIDATION) = 0;
SquelchCB f) = 0;
virtual void deleteIdlePeers(UnsquelchCB) = 0;
@@ -538,8 +491,14 @@ public:
std::uint16_t
inState(PublicKey const& validator, reduce_relay::PeerState state)
{
auto res = slots_.inState(validator, state);
return res ? *res : 0;
auto const& it = slots_.slots_.find(validator);
if (it != slots_.slots_.end())
return std::count_if(
it->second.peers_.begin(),
it->second.peers_.end(),
[&](auto const& it) { return (it.second.state == state); });
return 0;
}
void
@@ -547,11 +506,10 @@ public:
uint256 const& key,
PublicKey const& validator,
Peer::id_t id,
SquelchCB f,
protocol::MessageType type = protocol::mtVALIDATION) override
SquelchCB f) override
{
squelch_ = f;
slots_.updateSlotAndSquelch(key, validator, id, type);
slots_.updateSlotAndSquelch(key, validator, id, true);
}
void
@@ -632,40 +590,56 @@ public:
bool
isCountingState(PublicKey const& validator)
{
return slots_.inState(validator, reduce_relay::SlotState::Counting);
auto const& it = slots_.slots_.find(validator);
if (it != slots_.slots_.end())
return it->second.state_ == reduce_relay::SlotState::Counting;
return false;
}
std::set<id_t>
getSelected(PublicKey const& validator)
{
return slots_.getSelected(validator);
auto const& it = slots_.slots_.find(validator);
if (it == slots_.slots_.end())
return {};
std::set<id_t> r;
for (auto const& [id, info] : it->second.peers_)
if (info.state == reduce_relay::PeerState::Selected)
r.insert(id);
return r;
}
bool
isSelected(PublicKey const& validator, Peer::id_t peer)
{
auto selected = slots_.getSelected(validator);
auto selected = getSelected(validator);
return selected.find(peer) != selected.end();
}
id_t
getSelectedPeer(PublicKey const& validator)
{
auto selected = slots_.getSelected(validator);
auto selected = getSelected(validator);
assert(selected.size());
return *selected.begin();
}
std::unordered_map<
id_t,
std::tuple<
reduce_relay::PeerState,
std::uint16_t,
std::uint32_t,
std::uint32_t>>
std::unordered_map<id_t, reduce_relay::Slot<clock_type>::PeerInfo>
getPeers(PublicKey const& validator)
{
return slots_.getPeers(validator);
auto const& it = slots_.slots_.find(validator);
if (it == slots_.slots_.end())
return {};
auto r = std::
unordered_map<id_t, reduce_relay::Slot<clock_type>::PeerInfo>();
for (auto const& [id, info] : it->second.peers_)
r.emplace(std::make_pair(id, info));
return r;
}
std::uint16_t
@@ -684,6 +658,13 @@ private:
if (auto it = peers_.find(id); it != peers_.end())
squelch_(validator, it->second, squelchDuration);
}
void
squelchAll(PublicKey const& validator, std::uint32_t duration) override
{
for (auto const& [id, peer] : peers_)
squelch_(validator, peer, duration);
}
void
unsquelch(PublicKey const& validator, Peer::id_t id) const override
{
@@ -874,8 +855,7 @@ public:
for (auto& [_, v] : peers)
{
(void)_;
if (std::get<reduce_relay::PeerState>(v) ==
reduce_relay::PeerState::Squelched)
if (v.state == reduce_relay::PeerState::Squelched)
return false;
}
}
@@ -887,7 +867,7 @@ private:
std::vector<Validator> validators_;
};
class reduce_relay_test : public beast::unit_test::suite
class base_squelch_test : public beast::unit_test::suite
{
using Slot = reduce_relay::Slot<ManualClock>;
using id_t = Peer::id_t;
@@ -901,8 +881,7 @@ protected:
<< "num peers " << (int)network_.overlay().getNumPeers()
<< std::endl;
for (auto& [k, v] : peers)
std::cout << k << ":" << (int)std::get<reduce_relay::PeerState>(v)
<< " ";
std::cout << k << ":" << to_string(v.state) << " ";
std::cout << std::endl;
}
@@ -1074,7 +1053,10 @@ protected:
network_.overlay().isSelected(*event.key_, event.peer_);
auto peers = network_.overlay().getPeers(*event.key_);
auto d = reduce_relay::epoch<milliseconds>(now).count() -
std::get<3>(peers[event.peer_]);
reduce_relay::epoch<milliseconds>(
peers[event.peer_].lastMessage)
.count();
mustHandle = event.isSelected_ &&
d > milliseconds(reduce_relay::IDLED).count() &&
network_.overlay().inState(
@@ -1296,7 +1278,7 @@ protected:
[&](PublicKey const& key, PeerWPtr const& peer) {
unsquelched++;
});
auto peers = network_.overlay().getPeers(network_.validator(0));
BEAST_EXPECT(
unsquelched ==
MAX_PEERS -
@@ -1317,8 +1299,7 @@ protected:
BEAST_EXPECT(propagateAndSquelch(log, true, false));
auto peers = network_.overlay().getPeers(network_.validator(0));
auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) {
return std::get<reduce_relay::PeerState>(it.second) ==
reduce_relay::PeerState::Squelched;
return it.second.state == reduce_relay::PeerState::Squelched;
});
assert(it != peers.end());
std::uint16_t unsquelched = 0;
@@ -1514,7 +1495,7 @@ vp_base_squelch_max_selected_peers=2
auto peers = network_.overlay().getPeers(network_.validator(0));
// first message changes Slot state to Counting and is not counted,
// hence '-1'.
BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
BEAST_EXPECT(peers[0].count == (nMessages - 1));
// add duplicate
uint256 key(nMessages - 1);
network_.overlay().updateSlotAndSquelch(
@@ -1524,7 +1505,7 @@ vp_base_squelch_max_selected_peers=2
[&](PublicKey const&, PeerWPtr, std::uint32_t) {});
// confirm the same number of messages
peers = network_.overlay().getPeers(network_.validator(0));
BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
BEAST_EXPECT(peers[0].count == (nMessages - 1));
// advance the clock
ManualClock::advance(reduce_relay::IDLED + seconds(1));
network_.overlay().updateSlotAndSquelch(
@@ -1534,7 +1515,7 @@ vp_base_squelch_max_selected_peers=2
[&](PublicKey const&, PeerWPtr, std::uint32_t) {});
peers = network_.overlay().getPeers(network_.validator(0));
// confirm message number increased
BEAST_EXPECT(std::get<1>(peers[0]) == nMessages);
BEAST_EXPECT(peers[0].count == nMessages);
});
}
@@ -1550,6 +1531,12 @@ vp_base_squelch_max_selected_peers=2
if (duration > maxDuration_)
maxDuration_ = duration;
}
void
squelchAll(PublicKey const&, std::uint32_t) override
{
}
void
unsquelch(PublicKey const&, Peer::id_t) const override
{
@@ -1582,10 +1569,7 @@ vp_base_squelch_max_selected_peers=2
std::uint64_t mid = m * 1000 + peer;
uint256 const message{mid};
slots.updateSlotAndSquelch(
message,
validator,
peer,
protocol::MessageType::mtVALIDATION);
message, validator, peer, true);
}
}
// make Slot's internal hash router expire all messages
@@ -1703,7 +1687,7 @@ vp_base_squelch_max_selected_peers=2
Network network_;
public:
reduce_relay_test()
base_squelch_test()
: env_(*this, jtx::envconfig([](std::unique_ptr<Config> cfg) {
cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = true;
cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6;
@@ -1716,7 +1700,7 @@ public:
void
run() override
{
bool log = false;
bool log = true;
testConfig(log);
testInitialRound(log);
testPeerUnsquelchedTooSoon(log);
@@ -1732,7 +1716,7 @@ public:
}
};
class reduce_relay_simulate_test : public reduce_relay_test
class base_squelch_simulate_test : public base_squelch_test
{
void
testRandom(bool log)
@@ -1748,8 +1732,8 @@ class reduce_relay_simulate_test : public reduce_relay_test
}
};
BEAST_DEFINE_TESTSUITE(reduce_relay, ripple_data, ripple);
BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, ripple_data, ripple);
BEAST_DEFINE_TESTSUITE(base_squelch, ripple_data, ripple);
BEAST_DEFINE_TESTSUITE_MANUAL(base_squelch_simulate, ripple_data, ripple);
} // namespace test

83
src/test/overlay/clock.h Normal file
View File

@@ -0,0 +1,83 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_TEST_OVERLAY_CLOCK_H_INCLUDED
#define RIPPLE_TEST_OVERLAY_CLOCK_H_INCLUDED
#include <xrpl/basics/random.h>
#include <chrono>
#include <cstdint>
#include <ratio>
namespace ripple {
namespace test {
using namespace std::chrono;
/** Manually advanced clock. */
class ManualClock
{
public:
typedef uint64_t rep;
typedef std::milli period;
typedef std::chrono::duration<std::uint32_t, period> duration;
typedef std::chrono::time_point<ManualClock> time_point;
inline static bool const is_steady = false;
static void
advance(duration d) noexcept
{
now_ += d;
}
static void
randAdvance(milliseconds min, milliseconds max)
{
now_ += randDuration(min, max);
}
static void
reset() noexcept
{
now_ = time_point(seconds(0));
}
static time_point
now() noexcept
{
return now_;
}
static duration
randDuration(milliseconds min, milliseconds max)
{
return duration(milliseconds(rand_int(min.count(), max.count())));
}
explicit ManualClock() = default;
private:
inline static time_point now_ = time_point(seconds(0));
};
} // namespace test
} // namespace ripple
#endif

View File

@@ -0,0 +1,759 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx/Env.h>
#include <xrpld/overlay/Slot.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/protocol/SecretKey.h>
#include "test/overlay/clock.h"
#include "xrpld/overlay/Peer.h"
#include "xrpld/overlay/ReduceRelayCommon.h"
#include <chrono>
#include <cstdint>
#include <functional>
#include <optional>
#include <vector>
namespace ripple {
namespace test {
class TestHandler : public reduce_relay::SquelchHandler
{
public:
using squelch_method =
std::function<void(PublicKey const&, Peer::id_t, std::uint32_t)>;
using squelchAll_method =
std::function<void(PublicKey const&, std::uint32_t)>;
using unsquelch_method = std::function<void(PublicKey const&, Peer::id_t)>;
squelch_method squelch_f_;
squelchAll_method squelchAll_f_;
unsquelch_method unsquelch_f_;
TestHandler(
squelch_method const& squelch_f,
squelchAll_method const& squelchAll_f,
unsquelch_method const& unsquelch_f)
: squelch_f_(squelch_f)
, squelchAll_f_(squelchAll_f)
, unsquelch_f_(unsquelch_f)
{
}
TestHandler(TestHandler& copy)
{
squelch_f_ = copy.squelch_f_;
squelchAll_f_ = copy.squelchAll_f_;
unsquelch_f_ = copy.unsquelch_f_;
}
void
squelch(PublicKey const& validator, Peer::id_t peer, std::uint32_t duration)
const override
{
squelch_f_(validator, peer, duration);
}
void
squelchAll(PublicKey const& validator, std::uint32_t duration) override
{
squelchAll_f_(validator, duration);
}
void
unsquelch(PublicKey const& validator, Peer::id_t peer) const override
{
unsquelch_f_(validator, peer);
}
};
class enhanced_squelch_test : public beast::unit_test::suite
{
public:
TestHandler::squelch_method noop_squelch =
[&](PublicKey const&, Peer::id_t, std::uint32_t) {
BEAST_EXPECTS(false, "unexpected call to squelch handler");
};
TestHandler::squelchAll_method noop_squelchAll = [&](PublicKey const&,
std::uint32_t) {
BEAST_EXPECTS(false, "unexpected call to squelchAll handler");
};
TestHandler::unsquelch_method noop_unsquelch = [&](PublicKey const&,
Peer::id_t) {
BEAST_EXPECTS(false, "unexpected call to unsquelch handler");
};
// noop_handler is passed as a place holder Handler to slots
TestHandler noop_handler = {
noop_squelch,
noop_squelchAll,
noop_unsquelch,
};
jtx::Env env_;
enhanced_squelch_test() : env_(*this)
{
env_.app().config().VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = true;
}
void
testConfig()
{
testcase("Test Config - enabled enhanced squelching");
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_enhanced_squelch_enable=1
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == true);
toLoad = R"rippleConfig(
[reduce_relay]
vp_enhanced_squelch_enable=0
)rippleConfig";
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == false);
toLoad = R"rippleConfig(
[reduce_relay]
)rippleConfig";
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == false);
}
/** Tests tracking for squelched validators and peers */
void
testSquelchTracking()
{
testcase("squelchTracking");
Peer::id_t squelchedPeerID = 0;
Peer::id_t newPeerID = 1;
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), noop_handler, env_.app().config());
auto const publicKey = randomKeyPair(KeyType::ed25519).first;
// a new key should not be squelched
BEAST_EXPECTS(
!slots.validatorSquelched(publicKey), "validator squelched");
slots.squelchValidator(publicKey, squelchedPeerID);
// after squelching a peer, the validator must be squelched
BEAST_EXPECTS(
slots.validatorSquelched(publicKey), "validator not squelched");
// the peer must also be squelched
BEAST_EXPECTS(
slots.peerSquelched(publicKey, squelchedPeerID),
"peer not squelched");
// a new peer must not be squelched
BEAST_EXPECTS(
!slots.peerSquelched(publicKey, newPeerID), "new peer squelched");
// advance the manual clock to after expiration
ManualClock::advance(
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT +
std::chrono::seconds{11});
// validator squelch should expire
BEAST_EXPECTS(
!slots.validatorSquelched(publicKey),
"validator squelched after expiry");
// peer squelch should also expire
BEAST_EXPECTS(
!slots.peerSquelched(publicKey, squelchedPeerID),
"validator squelched after expiry");
}
void
testUpdateValidatorSlot_newValidator()
{
testcase("updateValidatorSlot_newValidator");
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), noop_handler, env_.app().config());
Peer::id_t const peerID = 1;
auto const validator = randomKeyPair(KeyType::ed25519).first;
uint256 message{0};
slots.updateValidatorSlot(message, validator, peerID);
// adding untrusted slot does not effect trusted slots
BEAST_EXPECTS(slots.slots_.size() == 0, "trusted slots changed");
// we expect that the validator was not added to untrusted slots
BEAST_EXPECTS(
slots.untrusted_slots_.size() == 0, "untrusted slot changed");
// we expect that the validator was added to th consideration list
BEAST_EXPECTS(
slots.considered_validators_.contains(validator),
"new validator was not considered");
}
void
testUpdateValidatorSlot_squelchedValidator()
{
testcase("testUpdateValidatorSlot_squelchedValidator");
Peer::id_t squelchedPeerID = 0;
Peer::id_t newPeerID = 1;
auto const validator = randomKeyPair(KeyType::ed25519).first;
TestHandler::squelch_method const squelch_f =
[&](PublicKey const& key, Peer::id_t id, std::uint32_t duration) {
BEAST_EXPECTS(
key == validator,
"squelch called for unknown validator key");
BEAST_EXPECTS(
id == newPeerID, "squelch called for the wrong peer");
};
TestHandler handler{squelch_f, noop_squelchAll, noop_unsquelch};
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), handler, env_.app().config());
slots.squelchValidator(validator, squelchedPeerID);
// this should not trigger squelch assertions, the peer is squelched
slots.updateValidatorSlot(
sha512Half(validator), validator, squelchedPeerID);
slots.updateValidatorSlot(sha512Half(validator), validator, newPeerID);
// the squelched peer remained squelched
BEAST_EXPECTS(
slots.peerSquelched(validator, squelchedPeerID),
"peer not squelched");
// because the validator was squelched, the new peer was also squelched
BEAST_EXPECTS(
slots.peerSquelched(validator, newPeerID),
"new peer was not squelched");
// a squelched validator must not be considered
BEAST_EXPECTS(
!slots.considered_validators_.contains(validator),
"squelched validator was added for consideration");
}
void
testUpdateValidatorSlot_slotsFull()
{
testcase("updateValidatorSlot_slotsFull");
Peer::id_t const peerID = 1;
// while there are open untrusted slots, no calls should be made to
// squelch any validators
TestHandler handler{noop_handler};
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), handler, env_.app().config());
// saturate validator slots
auto const validators = fillUntrustedSlots(slots);
// adding untrusted slot does not effect trusted slots
BEAST_EXPECTS(slots.slots_.size() == 0, "trusted slots changed");
// simulate additional messages from already selected validators
for (auto const& validator : validators)
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD; ++i)
slots.updateValidatorSlot(
sha512Half(validator) + static_cast<uint256>(i),
validator,
peerID);
// an untrusted slot was added for each validator
BEAST_EXPECT(
slots.untrusted_slots_.size() ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
for (auto const& validator : validators)
BEAST_EXPECTS(
!slots.validatorSquelched(validator),
"selected validator was squelched");
auto const newValidator = randomKeyPair(KeyType::ed25519).first;
// once slots are full squelchAll must be called for new peer/validator
handler.squelchAll_f_ = [&](PublicKey const& key, std::uint32_t) {
BEAST_EXPECTS(
key == newValidator, "unexpected validator squelched");
slots.squelchValidator(key, peerID);
};
slots.updateValidatorSlot(
sha512Half(newValidator), newValidator, peerID);
// Once the slots are saturated every other validator is squelched
BEAST_EXPECTS(
slots.validatorSquelched(newValidator),
"untrusted validator not squelched");
BEAST_EXPECTS(
slots.peerSquelched(newValidator, peerID),
"peer for untrusted validator not squelched");
}
void
testDeleteIdlePeers_deleteIdleSlots()
{
testcase("deleteIdlePeers");
TestHandler handler{noop_handler};
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), handler, env_.app().config());
auto keys = fillUntrustedSlots(slots);
// verify that squelchAll is called for each idled slot validator
handler.squelchAll_f_ = [&](PublicKey const& actualKey,
std::uint32_t duration) {
for (auto it = keys.begin(); it != keys.end(); ++it)
{
if (*it == actualKey)
{
keys.erase(it);
return;
}
}
BEAST_EXPECTS(false, "unexpected key passed to squelchAll");
};
BEAST_EXPECTS(
slots.untrusted_slots_.size() ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS,
"unexpected number of untrusted slots");
// advance the manual clock to after slot expiration
ManualClock::advance(
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT +
std::chrono::seconds{1});
slots.deleteIdlePeers();
BEAST_EXPECTS(
slots.untrusted_slots_.size() == 0,
"unexpected number of untrusted slots");
BEAST_EXPECTS(keys.empty(), "not all validators were squelched");
}
void
testDeleteIdlePeers_deleteIdleUntrustedPeer()
{
testcase("deleteIdleUntrustedPeer");
Peer::id_t const peerID = 1;
Peer::id_t const peerID2 = 2;
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), noop_handler, env_.app().config());
// fill one untrustd validator slot
auto const validator = fillUntrustedSlots(slots, 1)[0];
BEAST_EXPECTS(
slots.untrusted_slots_.size() == 1,
"unexpected number of untrusted slots");
slots.updateSlotAndSquelch(
sha512Half(validator) + static_cast<uint256>(100),
validator,
peerID,
false);
slots.updateSlotAndSquelch(
sha512Half(validator) + static_cast<uint256>(100),
validator,
peerID2,
false);
slots.deletePeer(peerID, true);
auto const slotPeers = getUntrustedSlotPeers(validator, slots);
BEAST_EXPECTS(
slotPeers.size() == 1, "untrusted validator slot is missing");
BEAST_EXPECTS(
!slotPeers.contains(peerID),
"peer was not removed from untrusted slots");
BEAST_EXPECTS(
slotPeers.contains(peerID2),
"peer was removed from untrusted slots");
}
/** Test that untrusted validator slots are correctly updated by
* updateSlotAndSquelch
*/
void
testUpdateSlotAndSquelch_untrustedValidator()
{
testcase("updateUntrsutedValidatorSlot");
TestHandler handler{noop_handler};
handler.squelch_f_ = [](PublicKey const&, Peer::id_t, std::uint32_t) {};
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), handler, env_.app().config());
// peers that will be source of validator messages
std::vector<Peer::id_t> peers = {};
// prepare n+1 peers, we expect the n+1st peer will be squelched
for (int i = 0; i <
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS + 1;
++i)
peers.push_back(i);
auto const validator = fillUntrustedSlots(slots, 1)[0];
// Squelching logic resets all counters each time a new peer is added
// Therfore we need to populate counters for each peer before sending
// new messages
for (auto const& peer : peers)
{
auto const now = ManualClock::now();
slots.updateSlotAndSquelch(
sha512Half(validator) +
static_cast<uint256>(now.time_since_epoch().count()),
validator,
peer,
false);
ManualClock::advance(std::chrono::milliseconds{10});
}
// simulate new, unique validator messages sent by peers
for (auto const& peer : peers)
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i)
{
auto const now = ManualClock::now();
slots.updateSlotAndSquelch(
sha512Half(validator) +
static_cast<uint256>(now.time_since_epoch().count()),
validator,
peer,
false);
ManualClock::advance(std::chrono::milliseconds{10});
}
auto const slotPeers = getUntrustedSlotPeers(validator, slots);
BEAST_EXPECTS(
slotPeers.size() ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS +
1,
"untrusted validator slot is missing");
int selected = 0;
int squelched = 0;
for (auto const& [_, info] : slotPeers)
{
switch (info.state)
{
case reduce_relay::PeerState::Selected:
++selected;
break;
case reduce_relay::PeerState::Squelched:
++squelched;
break;
case reduce_relay::PeerState::Counting:
BEAST_EXPECTS(
false, "peer should not be in counting state");
}
}
BEAST_EXPECTS(squelched == 1, "expected one squelched peer");
BEAST_EXPECTS(
selected ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS,
"wrong number of peers selected");
}
void
testUpdateConsideredValidator_newValidator()
{
testcase("testUpdateConsideredValidator_newValidator");
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), noop_handler, env_.app().config());
// insert some random validator key
auto const validator = randomKeyPair(KeyType::ed25519).first;
Peer::id_t const peerID = 0;
Peer::id_t const peerID2 = 1;
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
BEAST_EXPECTS(
slots.considered_validators_.contains(validator),
"new validator was not added for consideration");
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
// expect that a peer will be registered once as a message source
BEAST_EXPECTS(
slots.considered_validators_.at(validator).peers.size() == 1,
"duplicate peer was registered");
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID2),
"validator was selected with insufficient number of peers");
// expect that each distinct peer will be registered
BEAST_EXPECTS(
slots.considered_validators_.at(validator).peers.size() == 2,
"distinct peers were not registered");
}
void
testUpdateConsideredValidator_idleValidator()
{
testcase("testUpdateConsideredValidator_idleValidator");
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), noop_handler, env_.app().config());
// insert some random validator key
auto const validator = randomKeyPair(KeyType::ed25519).first;
Peer::id_t peerID = 0;
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
BEAST_EXPECTS(
slots.considered_validators_.contains(validator),
"new validator was not added for consideration");
auto const state = slots.considered_validators_.at(validator);
// simulate a validator sending a new message before the idle timer
ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1));
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
auto const newState = slots.considered_validators_.at(validator);
BEAST_EXPECTS(
state.count + 1 == newState.count,
"non-idling validator was updated");
// simulate a validator idling
ManualClock::advance(reduce_relay::IDLED + std::chrono::seconds(1));
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
auto const idleState = slots.considered_validators_.at(validator);
// we expect that an idling validator will not be updated
BEAST_EXPECTS(
newState.count == idleState.count, "idling validator was updated");
}
void
testUpdateConsideredValidator_selectQualifyingValidator()
{
testcase("testUpdateConsideredValidator_selectQualifyingValidator");
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), noop_handler, env_.app().config());
// insert some random validator key
auto const validator = randomKeyPair(KeyType::ed25519).first;
auto const validator2 = randomKeyPair(KeyType::ed25519).first;
Peer::id_t peerID = 0;
Peer::id_t peerID2 =
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS;
// a validator that sends only unique messages, but only from one peer
// must not be selected
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i)
{
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected before reaching message threshold");
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator2, peerID),
"validator was selected before reaching message threshold");
ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1));
}
// as long as the peer criteria is not met, the validator most not be
// selected
for (int i = 1; i <
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS - 1;
++i)
{
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, i),
"validator was selected before reaching enough peers");
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator2, i),
"validator was selected before reaching enough peers");
ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1));
}
auto const consideredValidator =
slots.updateConsideredValidator(validator, peerID2);
BEAST_EXPECTS(
consideredValidator && *consideredValidator == validator,
"expected validator was not selected");
// expect that selected peer was removed
BEAST_EXPECTS(
!slots.considered_validators_.contains(validator),
"selected validator was not removed from considered list");
BEAST_EXPECTS(
slots.considered_validators_.contains(validator2),
"unqualified validator was removed from considered list");
}
void
testCleanConsideredValidators_deleteIdleValidator()
{
testcase("cleanConsideredValidators_deleteIdleValidator");
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), noop_handler, env_.app().config());
// insert some random validator key
auto const lateValidator = randomKeyPair(KeyType::ed25519).first;
auto const validator = randomKeyPair(KeyType::ed25519).first;
Peer::id_t peerID = 0;
BEAST_EXPECTS(
!slots.updateConsideredValidator(lateValidator, peerID),
"validator was selected with insufficient number of peers");
BEAST_EXPECTS(
slots.considered_validators_.contains(lateValidator),
"new validator was not added for consideration");
// simulate a validator idling
ManualClock::advance(reduce_relay::IDLED + std::chrono::seconds(1));
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
auto const invalidValidators = slots.cleanConsideredValidators();
BEAST_EXPECTS(
invalidValidators.size() == 1,
"unexpected number of invalid validators");
BEAST_EXPECTS(
invalidValidators[0] == lateValidator, "removed invalid validator");
BEAST_EXPECTS(
!slots.considered_validators_.contains(lateValidator),
"late validator was not removed");
BEAST_EXPECTS(
slots.considered_validators_.contains(validator),
"timely validator was removed");
}
private:
/** A helper method to fill untrusted slots of a given Slots instance
* with random validator messages*/
std::vector<PublicKey>
fillUntrustedSlots(
reduce_relay::Slots<ManualClock>& slots,
int64_t maxSlots = reduce_relay::MAX_UNTRUSTED_SLOTS)
{
std::vector<PublicKey> keys;
for (int i = 0; i < maxSlots; ++i)
{
auto const validator = randomKeyPair(KeyType::ed25519).first;
keys.push_back(validator);
for (int j = 0; j <
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS;
++j)
// send enough messages so that a validator slot is selected
for (int k = 0; k < reduce_relay::MAX_MESSAGE_THRESHOLD; ++k)
slots.updateValidatorSlot(
sha512Half(validator) + static_cast<uint256>(k),
validator,
j);
}
return keys;
}
std::unordered_map<Peer::id_t, reduce_relay::Slot<ManualClock>::PeerInfo>
getUntrustedSlotPeers(
PublicKey const& validator,
reduce_relay::Slots<ManualClock> const& slots)
{
auto const& it = slots.untrusted_slots_.find(validator);
if (it == slots.untrusted_slots_.end())
return {};
auto r = std::unordered_map<
Peer::id_t,
reduce_relay::Slot<ManualClock>::PeerInfo>();
for (auto const& [id, info] : it->second.peers_)
r.emplace(std::make_pair(id, info));
return r;
}
void
run() override
{
testConfig();
testSquelchTracking();
testUpdateValidatorSlot_newValidator();
testUpdateValidatorSlot_slotsFull();
testUpdateValidatorSlot_squelchedValidator();
testDeleteIdlePeers_deleteIdleSlots();
testDeleteIdlePeers_deleteIdleUntrustedPeer();
testUpdateSlotAndSquelch_untrustedValidator();
testUpdateConsideredValidator_newValidator();
testUpdateConsideredValidator_idleValidator();
testUpdateConsideredValidator_selectQualifyingValidator();
testCleanConsideredValidators_deleteIdleValidator();
}
};
BEAST_DEFINE_TESTSUITE(enhanced_squelch, overlay, ripple);
} // namespace test
} // namespace ripple

View File

@@ -254,6 +254,9 @@ public:
std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 5;
///////////////// END OF TEMPORARY CODE BLOCK /////////////////////
// Enable enhanced squelching of unique untrusted validator messages
bool VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = false;
// Transaction reduce-relay feature
bool TX_REDUCE_RELAY_ENABLE = false;
// If tx reduce-relay feature is disabled

View File

@@ -775,6 +775,9 @@ Config::loadFromString(std::string const& fileContents)
"greater than or equal to 3");
///////////////// !!END OF TEMPORARY CODE BLOCK!! /////////////////////
VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE =
sec.value_or("vp_enhanced_squelch_enable", false);
TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false);
TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false);
TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20);

View File

@@ -40,7 +40,7 @@ static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT = std::chrono::seconds{600};
static constexpr auto SQUELCH_PER_PEER = std::chrono::seconds(10);
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS = std::chrono::seconds{3600};
// No message received threshold before identifying a peer as idled
static constexpr auto IDLED = std::chrono::seconds{8};
static constexpr auto IDLED = std::chrono::seconds{5};
// Message count threshold to start selecting peers as the source
// of messages from the validator. We add peers who reach
// MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS
@@ -49,6 +49,8 @@ static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 19;
static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 20;
// Max selected peers to choose as the source of messages from validator
static constexpr uint16_t MAX_SELECTED_PEERS = 5;
// Max number of untrusted slots the server will maintain
static constexpr uint16_t MAX_UNTRUSTED_SLOTS = 5;
// Wait before reduce-relay feature is enabled on boot up to let
// the server establish peer connections
static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10};

View File

@@ -29,17 +29,25 @@
#include <xrpl/basics/random.h>
#include <xrpl/beast/container/aged_unordered_map.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/PropertyStream.h>
#include <xrpl/protocol/PublicKey.h>
#include <xrpl/protocol/messages.h>
#include <algorithm>
#include <chrono>
#include <cstddef>
#include <iostream>
#include <optional>
#include <set>
#include <tuple>
#include <unordered_map>
#include <unordered_set>
namespace ripple {
// used to make private members of Slots class accessible for testing
namespace test {
class enhanced_squelch_test;
class base_squelch_test;
class OverlaySim;
} // namespace test
namespace reduce_relay {
@@ -52,12 +60,42 @@ enum class PeerState : uint8_t {
Selected, // selected to relay, counting if Slot in Counting
Squelched, // squelched, doesn't relay
};
inline std::string
to_string(PeerState state)
{
switch (state)
{
case PeerState::Counting:
return "counting";
case PeerState::Selected:
return "selected";
case PeerState::Squelched:
return "squelched";
default:
return "unknown";
}
}
/** Slot's State */
enum class SlotState : uint8_t {
Counting, // counting messages
Selected, // peers selected, stop counting
};
inline std::string
to_string(SlotState state)
{
switch (state)
{
case SlotState::Counting:
return "counting";
case SlotState::Selected:
return "selected";
default:
return "unknown";
}
}
template <typename Unit, typename TP>
Unit
epoch(TP const& t)
@@ -75,7 +113,7 @@ public:
virtual ~SquelchHandler()
{
}
/** Squelch handler
/** Squelch handler for a single peer
* @param validator Public key of the source validator
* @param id Peer's id to squelch
* @param duration Squelch duration in seconds
@@ -83,6 +121,15 @@ public:
virtual void
squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration)
const = 0;
/** Squelch for all peers, the method must call slots.squelchValidator
* to register that a (validator,peer) was squelched
* @param validator Public key of the source validator
* @param duration Squelch duration in seconds
*/
virtual void
squelchAll(PublicKey const& validator, std::uint32_t duration) = 0;
/** Unsquelch handler
* @param validator Public key of the source validator
* @param id Peer's id to unsquelch
@@ -104,8 +151,10 @@ public:
template <typename clock_type>
class Slot final
{
private:
friend class Slots<clock_type>;
friend class test::enhanced_squelch_test;
friend class test::OverlaySim;
using id_t = Peer::id_t;
using time_point = typename clock_type::time_point;
@@ -121,13 +170,15 @@ private:
Slot(
SquelchHandler const& handler,
beast::Journal journal,
uint16_t maxSelectedPeers)
uint16_t maxSelectedPeers,
bool isTrusted)
: reachedThreshold_(0)
, lastSelected_(clock_type::now())
, state_(SlotState::Counting)
, handler_(handler)
, journal_(journal)
, maxSelectedPeers_(maxSelectedPeers)
, isTrusted_(isTrusted)
{
}
@@ -139,22 +190,19 @@ private:
* MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the
* number of considered peers who reached MAX_MESSAGE_THRESHOLD is
* maxSelectedPeers_ then randomly select maxSelectedPeers_ from
* considered peers, and call squelch handler for each peer, which is not
* selected and not already in Squelched state. Set the state for those
* peers to Squelched and reset the count of all peers. Set slot's state to
* Selected. Message count is not updated when the slot is in Selected
* state.
* considered peers, and call squelch handler for each peer, which is
* not selected and not already in Squelched state. Set the state for
* those peers to Squelched and reset the count of all peers. Set slot's
* state to Selected. Message count is not updated when the slot is in
* Selected state.
* @param validator Public key of the source validator
* @param id Peer id which received the message
* @param type Message type (Validation and Propose Set only,
* others are ignored, future use)
* @param callback A callback to report ignored squelches
*/
void
update(
PublicKey const& validator,
id_t id,
protocol::MessageType type,
ignored_squelch_callback callback);
/** Handle peer deletion when a peer disconnects.
@@ -177,32 +225,6 @@ private:
return lastSelected_;
}
/** Return number of peers in state */
std::uint16_t
inState(PeerState state) const;
/** Return number of peers not in state */
std::uint16_t
notInState(PeerState state) const;
/** Return Slot's state */
SlotState
getState() const
{
return state_;
}
/** Return selected peers */
std::set<id_t>
getSelected() const;
/** Get peers info. Return map of peer's state, count, squelch
* expiration milsec, and last message time milsec.
*/
std::
unordered_map<id_t, std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
getPeers() const;
/** Check if peers stopped relaying messages. If a peer is
* selected peer then call unsquelch handler for all
* currently squelched peers and switch the slot to
@@ -220,7 +242,6 @@ private:
std::chrono::seconds
getSquelchDuration(std::size_t npeers);
private:
/** Reset counts of peers in Selected or Counting state */
void
resetCounts();
@@ -229,13 +250,18 @@ private:
void
initCounting();
void
onWrite(beast::PropertyStream::Map& stream) const;
/** Data maintained for each peer */
struct PeerInfo
{
PeerState state; // peer's state
std::size_t count; // message count
time_point expire; // squelch expiration time
time_point lastMessage; // time last message received
PeerState state; // peer's state
std::size_t count; // message count
time_point expire; // squelch expiration time
time_point lastMessage; // time last message received
std::size_t timesSelected; // number of times the peer was selected
std::size_t timesCloseToThreshold;
};
std::unordered_map<id_t, PeerInfo> peers_; // peer's data
@@ -257,6 +283,9 @@ private:
// the maximum number of peers that should be selected as a validator
// message source
uint16_t const maxSelectedPeers_;
// indicate if the slot is for a trusted validator
bool const isTrusted_;
};
template <typename clock_type>
@@ -287,7 +316,6 @@ void
Slot<clock_type>::update(
PublicKey const& validator,
id_t id,
protocol::MessageType type,
ignored_squelch_callback callback)
{
using namespace std::chrono;
@@ -298,8 +326,15 @@ Slot<clock_type>::update(
{
JLOG(journal_.trace())
<< "update: adding peer " << Slice(validator) << " " << id;
peers_.emplace(
std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now}));
peers_.emplace(std::make_pair(
id,
PeerInfo{
.state = PeerState::Counting,
.count = 0,
.expire = now,
.lastMessage = now,
.timesSelected = 0,
.timesCloseToThreshold = 0}));
initCounting();
return;
}
@@ -321,8 +356,10 @@ Slot<clock_type>::update(
<< " slot state " << static_cast<int>(state_) << " peer state "
<< static_cast<int>(peer.state) << " count " << peer.count << " last "
<< duration_cast<milliseconds>(now - peer.lastMessage).count()
<< " pool " << considered_.size() << " threshold " << reachedThreshold_
<< " " << (type == protocol::mtVALIDATION ? "validation" : "proposal");
<< " pool " << considered_.size() << " threshold " << reachedThreshold_;
if (now - peer.lastMessage - IDLED <= milliseconds{500})
++peer.timesCloseToThreshold;
peer.lastMessage = now;
@@ -349,6 +386,16 @@ Slot<clock_type>::update(
if (reachedThreshold_ == maxSelectedPeers_)
{
for (auto const& [id, info] : peers_)
{
if (info.state == PeerState::Selected &&
info.count < MIN_MESSAGE_THRESHOLD)
{
JLOG(journal_.debug())
<< "update: previously selected peer " << id
<< " failed to reach a threshold with: " << info.count;
}
}
// Randomly select maxSelectedPeers_ peers from considered.
// Exclude peers that have been idling > IDLED -
// it's possible that deleteIdlePeer() has not been called yet.
@@ -403,9 +450,18 @@ Slot<clock_type>::update(
v.count = 0;
if (selected.find(k) != selected.end())
{
v.state = PeerState::Selected;
++v.timesSelected;
}
else if (v.state != PeerState::Squelched)
{
if (v.state == PeerState::Selected)
{
JLOG(journal_.debug())
<< "squelching previously selected peer";
}
if (journal_.trace())
str << k << " ";
v.state = PeerState::Squelched;
@@ -482,6 +538,35 @@ Slot<clock_type>::deletePeer(PublicKey const& validator, id_t id, bool erase)
}
}
template <typename clock_type>
void
Slot<clock_type>::onWrite(beast::PropertyStream::Map& stream) const
{
auto const now = clock_type::now();
stream["state"] = to_string(state_);
stream["reachedThreshold"] = reachedThreshold_;
stream["considered"] = considered_.size();
stream["lastSelected"] =
duration_cast<std::chrono::seconds>(now - lastSelected_).count();
stream["isTrusted"] = isTrusted_;
beast::PropertyStream::Set peers("peers", stream);
for (auto const& [id, info] : peers_)
{
beast::PropertyStream::Map item(peers);
item["id"] = id;
item["count"] = info.count;
item["expire"] =
duration_cast<std::chrono::seconds>(info.expire - now).count();
item["lastMessage"] =
duration_cast<std::chrono::seconds>(now - info.lastMessage).count();
item["timesSelected"] = info.timesSelected;
item["timesCloseToThreshold"] = info.timesCloseToThreshold;
item["state"] = to_string(info.state);
}
}
template <typename clock_type>
void
Slot<clock_type>::resetCounts()
@@ -503,65 +588,18 @@ Slot<clock_type>::initCounting()
resetCounts();
}
template <typename clock_type>
std::uint16_t
Slot<clock_type>::inState(PeerState state) const
{
return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) {
return (it.second.state == state);
});
}
template <typename clock_type>
std::uint16_t
Slot<clock_type>::notInState(PeerState state) const
{
return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) {
return (it.second.state != state);
});
}
template <typename clock_type>
std::set<typename Peer::id_t>
Slot<clock_type>::getSelected() const
{
std::set<id_t> r;
for (auto const& [id, info] : peers_)
if (info.state == PeerState::Selected)
r.insert(id);
return r;
}
template <typename clock_type>
std::unordered_map<
typename Peer::id_t,
std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
Slot<clock_type>::getPeers() const
{
using namespace std::chrono;
auto r = std::unordered_map<
id_t,
std::tuple<PeerState, std::uint16_t, std::uint32_t, std::uint32_t>>();
for (auto const& [id, info] : peers_)
r.emplace(std::make_pair(
id,
std::move(std::make_tuple(
info.state,
info.count,
epoch<milliseconds>(info.expire).count(),
epoch<milliseconds>(info.lastMessage).count()))));
return r;
}
/** Slots is a container for validator's Slot and handles Slot update
* when a message is received from a validator. It also handles Slot aging
* and checks for peers which are disconnected or stopped relaying the messages.
* and checks for peers which are disconnected or stopped relaying the
* messages.
*/
template <typename clock_type>
class Slots final
{
friend class test::enhanced_squelch_test;
friend class test::base_squelch_test;
friend class test::OverlaySim;
using time_point = typename clock_type::time_point;
using id_t = typename Peer::id_t;
using messages = beast::aged_unordered_map<
@@ -569,6 +607,12 @@ class Slots final
std::unordered_set<Peer::id_t>,
clock_type,
hardened_hash<strong_hash>>;
using validators = beast::aged_unordered_map<
PublicKey,
std::unordered_set<Peer::id_t>,
clock_type,
hardened_hash<strong_hash>>;
using slots_map = hash_map<PublicKey, Slot<clock_type>>;
public:
/**
@@ -576,14 +620,17 @@ public:
* @param handler Squelch/unsquelch implementation
* @param config reference to the global config
*/
Slots(Logs& logs, SquelchHandler const& handler, Config const& config)
Slots(Logs& logs, SquelchHandler& handler, Config const& config)
: handler_(handler)
, logs_(logs)
, journal_(logs.journal("Slots"))
, baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
, maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS)
, enhancedSquelchEnabled_(
config.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE)
{
}
~Slots() = default;
/** Check if base squelching feature is enabled and ready */
@@ -593,6 +640,13 @@ public:
return baseSquelchEnabled_ && reduceRelayReady();
}
/** Check if enhanced squelching feature is enabled and ready */
bool
enhancedSquelchReady()
{
return enhancedSquelchEnabled_ && reduceRelayReady();
}
/** Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup */
bool
reduceRelayReady()
@@ -605,108 +659,71 @@ public:
return reduceRelayReady_;
}
/** Calls Slot::update of Slot associated with the validator, with a noop
* callback.
/** Updates untrusted validator slot. Do not call for trusted
* validators. The caller must ensure passed messages are unique.
* @param key Message hash
* @param validator Validator public key
* @param id The ID of the peer that sent the message
*/
void
updateValidatorSlot(uint256 const& key, PublicKey const& validator, id_t id)
{
updateValidatorSlot(key, validator, id, []() {});
}
/** Updates untrusted validator slot. Do not call for trusted
* validators. The caller must ensure passed messages are unique.
* @param key Message hash
* @param validator Validator public key
* @param id The ID of the peer that sent the message
* @param callback A callback to report ignored validations
*/
void
updateValidatorSlot(
uint256 const& key,
PublicKey const& validator,
id_t id,
typename Slot<clock_type>::ignored_squelch_callback callback);
/** Calls Slot::update of Slot associated with the validator, with a
* noop callback.
* @param key Message's hash
* @param validator Validator's public key
* @param id Peer's id which received the message
* @param type Received protocol message type
* @param isTrusted Boolean to indicate if the message is from a trusted
* validator
*/
void
updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type)
bool isTrusted)
{
updateSlotAndSquelch(key, validator, id, type, []() {});
updateSlotAndSquelch(key, validator, id, []() {}, isTrusted);
}
/** Calls Slot::update of Slot associated with the validator.
* @param key Message's hash
* @param validator Validator's public key
* @param id Peer's id which received the message
* @param type Received protocol message type
* @param callback A callback to report ignored validations
* @param isTrusted Boolean to indicate if the message is from a trusted
* validator
*/
void
updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type,
typename Slot<clock_type>::ignored_squelch_callback callback);
typename Slot<clock_type>::ignored_squelch_callback callback,
bool isTrusted);
/** Check if peers stopped relaying messages
* and if slots stopped receiving messages from the validator.
*/
void
deleteIdlePeers();
/** Return number of peers in state */
std::optional<std::uint16_t>
inState(PublicKey const& validator, PeerState state) const
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.inState(state);
return {};
}
/** Return number of peers not in state */
std::optional<std::uint16_t>
notInState(PublicKey const& validator, PeerState state) const
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.notInState(state);
return {};
}
/** Return true if Slot is in state */
bool
inState(PublicKey const& validator, SlotState state) const
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.state_ == state;
return false;
}
/** Get selected peers */
std::set<id_t>
getSelected(PublicKey const& validator)
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.getSelected();
return {};
}
/** Get peers info. Return map of peer's state, count, and squelch
* expiration milliseconds.
*/
std::unordered_map<
typename Peer::id_t,
std::tuple<PeerState, uint16_t, uint32_t, std::uint32_t>>
getPeers(PublicKey const& validator)
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.getPeers();
return {};
}
/** Get Slot's state */
std::optional<SlotState>
getState(PublicKey const& validator)
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.getState();
return {};
}
/** Called when a peer is deleted. If the peer was selected to be the
* source of messages from the validator then squelched peers have to be
* unsquelched.
@@ -716,6 +733,26 @@ public:
void
deletePeer(id_t id, bool erase);
/** Called to register that a given validator was squelched for a given
* peer. It is expected that this method is called by SquelchHandler.
*
* @param key Validator public key
* @param id peer ID
*/
void
squelchValidator(PublicKey const& key, id_t id)
{
auto it = peersWithValidators_.find(key);
if (it == peersWithValidators_.end())
peersWithValidators_.emplace(key, std::unordered_set<id_t>{id});
else if (it->second.find(id) == it->second.end())
it->second.insert(id);
}
void
onWrite(beast::PropertyStream::Map& stream) const;
private:
/** Add message/peer if have not seen this message
* from the peer. A message is aged after IDLED seconds.
@@ -723,15 +760,69 @@ private:
bool
addPeerMessage(uint256 const& key, id_t id);
/**
* Updates the last message sent from a validator.
* @param validator The validator public key
* @param peer The peer ID sending the message
* @return true if the validator was updated, false otherwise
*/
std::optional<PublicKey>
updateConsideredValidator(PublicKey const& validator, Peer::id_t peer);
/** Remove all validators that have become invalid due to selection
* criteria
* @return zero or more validators that have been removed.
*/
std::vector<PublicKey>
cleanConsideredValidators();
/** Checks whether a given validator is squelched.
* @param key Validator public key
* @return true if a given validator was squelched
*/
bool
validatorSquelched(PublicKey const& key) const
{
beast::expire(
peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT);
return peersWithValidators_.find(key) != peersWithValidators_.end();
}
/** Checks whether a given peer was recently sent a squelch message for
* a given validator.
* @param key Validator public key
* @param id Peer id
* @return true if a given validator was squelched for a given peeru
*/
bool
peerSquelched(PublicKey const& key, id_t id) const
{
beast::expire(
peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT);
auto const it = peersWithValidators_.find(key);
// if validator was not squelched, the peer was also not squelched
if (it == peersWithValidators_.end())
return false;
// if a peer is found the squelch for it has not expired
return it->second.find(id) != it->second.end();
}
std::atomic_bool reduceRelayReady_{false};
hash_map<PublicKey, Slot<clock_type>> slots_;
SquelchHandler const& handler_; // squelch/unsquelch handler
slots_map slots_;
slots_map untrusted_slots_;
SquelchHandler& handler_; // squelch/unsquelch handler
Logs& logs_;
beast::Journal const journal_;
bool const baseSquelchEnabled_;
uint16_t const maxSelectedPeers_;
bool const enhancedSquelchEnabled_;
// Maintain aged container of message/peers. This is required
// to discard duplicate message from the same peer. A message
@@ -739,6 +830,22 @@ private:
// after it was relayed is ignored by PeerImp.
inline static messages peersWithMessage_{
beast::get_abstract_clock<clock_type>()};
// Maintain aged container of validator/peers. This is used to track
// which validator/peer were squelced. A peer that whose squelch
// has expired is removed.
inline static validators peersWithValidators_{
beast::get_abstract_clock<clock_type>()};
struct ValidatorInfo
{
size_t count; // the number of messages sent from this validator
time_point lastMessage; // timestamp of the last message
std::unordered_set<Peer::id_t> peers; // a list of peer IDs that sent a
// message for this validator
};
hash_map<PublicKey, ValidatorInfo> considered_validators_;
};
template <typename clock_type>
@@ -774,61 +881,272 @@ Slots<clock_type>::addPeerMessage(uint256 const& key, id_t id)
return true;
}
template <typename clock_type>
std::optional<PublicKey>
Slots<clock_type>::updateConsideredValidator(
PublicKey const& validator,
Peer::id_t peer)
{
auto const now = clock_type::now();
auto it = considered_validators_.find(validator);
if (it == considered_validators_.end())
{
considered_validators_.emplace(std::make_pair(
validator,
ValidatorInfo{
.count = 1,
.lastMessage = now,
.peers = {peer},
}));
return {};
}
// the validator idled. Don't update it, it will be cleaned later
if (now - it->second.lastMessage > IDLED)
return {};
it->second.peers.insert(peer);
it->second.lastMessage = now;
++it->second.count;
if (it->second.count < MAX_MESSAGE_THRESHOLD ||
it->second.peers.size() < reduce_relay::MAX_SELECTED_PEERS)
return {};
auto const key = it->first;
considered_validators_.erase(it);
return key;
}
template <typename clock_type>
std::vector<PublicKey>
Slots<clock_type>::cleanConsideredValidators()
{
auto const now = clock_type::now();
std::vector<PublicKey> keys;
for (auto it = considered_validators_.begin();
it != considered_validators_.end();)
{
if (now - it->second.lastMessage > IDLED)
{
keys.push_back(it->first);
it = considered_validators_.erase(it);
}
else
++it;
}
return keys;
}
template <typename clock_type>
void
Slots<clock_type>::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type,
typename Slot<clock_type>::ignored_squelch_callback callback)
typename Slot<clock_type>::ignored_squelch_callback callback,
bool isTrusted)
{
if (!addPeerMessage(key, id))
return;
auto it = slots_.find(validator);
if (it == slots_.end())
// If we receive a message from a trusted validator either update an
// existing slot or insert a new one. If we are not running enhanced
// squelching also deduplicate untrusted validator messages
if (isTrusted || !enhancedSquelchEnabled_)
{
JLOG(journal_.trace())
<< "updateSlotAndSquelch: new slot " << Slice(validator);
auto it =
slots_
.emplace(std::make_pair(
validator,
Slot<clock_type>(
handler_, logs_.journal("Slot"), maxSelectedPeers_)))
.first;
it->second.update(validator, id, type, callback);
auto it = slots_
.emplace(std::make_pair(
validator,
Slot<clock_type>(
handler_,
logs_.journal("Slot"),
maxSelectedPeers_,
isTrusted)))
.first;
it->second.update(validator, id, callback);
}
else
it->second.update(validator, id, type, callback);
{
auto it = untrusted_slots_.find(validator);
// If we received a message from a validator that is not
// selected, and is not squelched, there is nothing to do. It
// will be squelched later when `updateValidatorSlot` is called.
if (it == untrusted_slots_.end())
return;
it->second.update(validator, id, callback);
}
}
template <typename clock_type>
void
Slots<clock_type>::updateValidatorSlot(
uint256 const& key,
PublicKey const& validator,
id_t id,
typename Slot<clock_type>::ignored_squelch_callback callback)
{
// We received a message from an already selected validator
// we can ignore this message
if (untrusted_slots_.find(validator) != untrusted_slots_.end())
return;
// We received a message from an already squelched validator.
// This could happen in few cases:
// 1. It happened so that the squelch for a particular peer expired
// before our local squelch.
// 2. We receive a message from a new peer that did not receive the
// squelch request.
// 3. The peer is ignoring our squelch request and we have not sent
// the controll message in a while.
// In all of these cases we can only send them a squelch request again.
if (validatorSquelched(validator))
{
if (!peerSquelched(validator, id))
{
squelchValidator(validator, id);
handler_.squelch(
validator, id, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
}
return;
}
// update a slot if the message is from a selected untrusted validator
if (auto const& it = untrusted_slots_.find(validator);
it != untrusted_slots_.end())
{
it->second.update(validator, id, callback);
return;
}
// Do we have any available slots for additional untrusted validators?
// This could happen in few cases:
// 1. We received a message from a new untrusted validator, but we
// are at capacity.
// 2. We received a message from a previously squelched validator.
// In all of these cases we send a squelch message to all peers.
// The validator may still be considered by the selector. However, it
// will be eventually cleaned and squelched
if (untrusted_slots_.size() == MAX_UNTRUSTED_SLOTS)
{
handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
return;
}
if (auto const v = updateConsideredValidator(validator, id))
untrusted_slots_.emplace(std::make_pair(
*v,
Slot<clock_type>(
handler_, logs_.journal("Slot"), maxSelectedPeers_, false)));
// When we reach MAX_UNTRUSTED_SLOTS, don't explicitly clean them.
// Since we stop updating their counters, they will idle, and will be
// removed and squelched.
}
template <typename clock_type>
void
Slots<clock_type>::deletePeer(id_t id, bool erase)
{
for (auto& [validator, slot] : slots_)
slot.deletePeer(validator, id, erase);
auto deletePeer = [&](slots_map& slots) {
for (auto& [validator, slot] : slots)
slot.deletePeer(validator, id, erase);
};
deletePeer(slots_);
deletePeer(untrusted_slots_);
}
template <typename clock_type>
void
Slots<clock_type>::deleteIdlePeers()
{
auto now = clock_type::now();
auto deleteSlots = [&](slots_map& slots) {
auto const now = clock_type::now();
for (auto it = slots_.begin(); it != slots_.end();)
{
it->second.deleteIdlePeer(it->first);
if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT)
for (auto it = slots.begin(); it != slots.end();)
{
JLOG(journal_.trace())
<< "deleteIdlePeers: deleting idle slot " << Slice(it->first);
it = slots_.erase(it);
it->second.deleteIdlePeer(it->first);
if (now - it->second.getLastSelected() >
MAX_UNSQUELCH_EXPIRE_DEFAULT)
{
JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot "
<< Slice(it->first);
// if an untrusted validator slot idled - peers stopped
// sending messages for this validator squelch it
if (!it->second.isTrusted_)
handler_.squelchAll(
it->first, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
it = slots.erase(it);
}
else
++it;
}
};
deleteSlots(slots_);
deleteSlots(untrusted_slots_);
// remove and squelch all validators that the selector deemed unsuitable
// there might be some good validators in this set that "lapsed".
// However, since these are untrusted validators we're not concerned
for (auto const& validator : cleanConsideredValidators())
handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
}
template <typename clock_type>
void
Slots<clock_type>::onWrite(beast::PropertyStream::Map& stream) const
{
auto const writeSlot =
[](beast::PropertyStream::Set& set,
hash_map<PublicKey, Slot<clock_type>> const& slots) {
for (auto const& [validator, slot] : slots)
{
beast::PropertyStream::Map item(set);
item["validator"] = toBase58(TokenType::NodePublic, validator);
slot.onWrite(item);
}
};
beast::PropertyStream::Map slots("slots", stream);
{
beast::PropertyStream::Set set("trusted", slots);
writeSlot(set, slots_);
}
{
beast::PropertyStream::Set set("untrusted", slots);
writeSlot(set, untrusted_slots_);
}
{
beast::PropertyStream::Set set("considered", slots);
auto const now = clock_type::now();
for (auto const& [validator, info] : considered_validators_)
{
beast::PropertyStream::Map item(set);
item["validator"] = toBase58(TokenType::NodePublic, validator);
item["lastMessage"] =
std::chrono::duration_cast<std::chrono::seconds>(
now - info.lastMessage)
.count();
item["messageCount"] = info.count;
item["peers"] = info.peers.size();
}
else
++it;
}
}

View File

@@ -22,12 +22,11 @@
#include <xrpld/overlay/ReduceRelayCommon.h>
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/PublicKey.h>
#include <algorithm>
#include <chrono>
#include <functional>
namespace ripple {
@@ -108,7 +107,7 @@ template <typename clock_type>
bool
Squelch<clock_type>::expireSquelch(PublicKey const& validator)
{
auto now = clock_type::now();
auto const now = clock_type::now();
auto const& it = squelched_.find(validator);
if (it == squelched_.end())

View File

@@ -578,16 +578,23 @@ OverlayImpl::stop()
void
OverlayImpl::onWrite(beast::PropertyStream::Map& stream)
{
beast::PropertyStream::Set set("traffic", stream);
auto const stats = m_traffic.getCounts();
for (auto const& pair : stats)
{
beast::PropertyStream::Map item(set);
item["category"] = pair.second.name;
item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
item["messages_in"] = std::to_string(pair.second.messagesIn.load());
item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
item["messages_out"] = std::to_string(pair.second.messagesOut.load());
beast::PropertyStream::Set set("traffic", stream);
auto const stats = m_traffic.getCounts();
for (auto const& pair : stats)
{
beast::PropertyStream::Map item(set);
item["category"] = pair.second.name;
item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
item["messages_in"] = std::to_string(pair.second.messagesIn.load());
item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
item["messages_out"] =
std::to_string(pair.second.messagesOut.load());
}
}
{
slots_.onWrite(stream);
}
}
@@ -1410,12 +1417,21 @@ OverlayImpl::squelch(
}
}
void
OverlayImpl::squelchAll(PublicKey const& validator, uint32_t squelchDuration)
{
for_each([&](std::shared_ptr<PeerImp>&& p) {
slots_.squelchValidator(validator, p->id());
p->send(makeSquelchMessage(validator, true, squelchDuration));
});
}
void
OverlayImpl::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
std::set<Peer::id_t>&& peers,
protocol::MessageType type)
bool isTrusted)
{
if (!slots_.baseSquelchReady())
return;
@@ -1423,14 +1439,22 @@ OverlayImpl::updateSlotAndSquelch(
if (!strand_.running_in_this_thread())
return post(
strand_,
[this, key, validator, peers = std::move(peers), type]() mutable {
updateSlotAndSquelch(key, validator, std::move(peers), type);
[this,
key,
validator,
peers = std::move(peers),
isTrusted]() mutable {
updateSlotAndSquelch(
key, validator, std::move(peers), isTrusted);
});
for (auto id : peers)
slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
});
slots_.updateSlotAndSquelch(
key,
validator,
id,
[&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); },
isTrusted);
}
void
@@ -1438,17 +1462,39 @@ OverlayImpl::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
Peer::id_t peer,
protocol::MessageType type)
bool isTrusted)
{
if (!slots_.baseSquelchReady())
return;
if (!strand_.running_in_this_thread())
return post(strand_, [this, key, validator, peer, type]() {
updateSlotAndSquelch(key, validator, peer, type);
return post(strand_, [this, key, validator, peer, isTrusted]() {
updateSlotAndSquelch(key, validator, peer, isTrusted);
});
slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
slots_.updateSlotAndSquelch(
key,
validator,
peer,
[&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); },
isTrusted);
}
void
OverlayImpl::updateValidatorSlot(
uint256 const& key,
PublicKey const& validator,
Peer::id_t peer)
{
if (!slots_.enhancedSquelchReady())
return;
if (!strand_.running_in_this_thread())
return post(strand_, [this, key, validator, peer]() {
updateValidatorSlot(key, validator, peer);
});
slots_.updateValidatorSlot(key, validator, peer, [&]() {
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
});
}

View File

@@ -399,14 +399,14 @@ public:
* @param key Unique message's key
* @param validator Validator's public key
* @param peers Peers' id to update the slots for
* @param type Received protocol message type
* @param isTrusted Indicate if the validator is trusted
*/
void
updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
std::set<Peer::id_t>&& peers,
protocol::MessageType type);
bool isTrusted);
/** Overload to reduce allocation in case of single peer
*/
@@ -415,7 +415,22 @@ public:
uint256 const& key,
PublicKey const& validator,
Peer::id_t peer,
protocol::MessageType type);
bool isTrusted);
/** Updates the slot information for an untrusted validator. If the
* untrusted validator was previously squelched, sends TMSquelch message to
* the sender of the message. If there are no untrusted slots available
* sends TMSquelch message to all peers to squelch messages from the
* validator.
* @param key Unique message's key
* @param validator Validator's public key
* @param peers Peers' id to update the slots for
*/
void
updateValidatorSlot(
uint256 const& key,
PublicKey const& validator,
Peer::id_t peer);
/** Called when the peer is deleted. If the peer was selected to be the
* source of messages from the validator then squelched peers have to be
@@ -451,6 +466,10 @@ private:
Peer::id_t const id,
std::uint32_t squelchDuration) const override;
void
squelchAll(PublicKey const& validator, std::uint32_t squelchDuration)
override;
void
unsquelch(PublicKey const& validator, Peer::id_t id) const override;

View File

@@ -1699,21 +1699,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
// suppression for 30 seconds to avoid doing a relatively expensive lookup
// every time a spam packet is received
PublicKey const publicKey{makeSlice(set.nodepubkey())};
auto const isTrusted = app_.validators().trusted(publicKey);
// If the operator has specified that untrusted proposals be dropped then
// this happens here I.e. before further wasting CPU verifying the signature
// of an untrusted key
if (!isTrusted)
{
// report untrusted proposal messages
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_untrusted,
Message::messageSize(*m));
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
return;
}
uint256 const proposeHash{set.currenttxhash()};
uint256 const prevLedger{set.previousledger()};
@@ -1728,7 +1713,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
publicKey.slice(),
sig);
if (auto [added, relayed] =
auto const isTrusted = app_.validators().trusted(publicKey);
if (auto const& [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
!added)
{
@@ -1736,7 +1723,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
// receives within IDLED seconds since the message has been relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
suppression, publicKey, id_, isTrusted);
// report duplicate proposal messages
overlay_.reportInboundTraffic(
@@ -1750,6 +1737,16 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
if (!isTrusted)
{
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_untrusted,
Message::messageSize(*m));
// If the operator has specified that untrusted proposals be dropped
// then this happens here I.e. before further wasting CPU verifying the
// signature of an untrusted key
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
return;
if (tracking_.load() == Tracking::diverged)
{
JLOG(p_journal_.debug())
@@ -2358,20 +2355,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
auto const isTrusted =
app_.validators().trusted(val->getSignerPublic());
// If the operator has specified that untrusted validations be
// dropped then this happens here I.e. before further wasting CPU
// verifying the signature of an untrusted key
if (!isTrusted)
{
// increase untrusted validations received
overlay_.reportInboundTraffic(
TrafficCount::category::validation_untrusted,
Message::messageSize(*m));
if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
return;
}
auto key = sha512Half(makeSlice(m->validation()));
auto [added, relayed] =
@@ -2384,7 +2367,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
key, val->getSignerPublic(), id_, isTrusted);
// increase duplicate validations received
overlay_.reportInboundTraffic(
@@ -2395,6 +2378,22 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
return;
}
// at this point the message is guaranteed to be unique
if (!isTrusted)
{
overlay_.reportInboundTraffic(
TrafficCount::category::validation_untrusted,
Message::messageSize(*m));
overlay_.updateValidatorSlot(key, val->getSignerPublic(), id_);
// If the operator has specified that untrusted validations be
// dropped then this happens here I.e. before further wasting CPU
// verifying the signature of an untrusted key
if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
return;
}
if (!isTrusted && (tracking_.load() == Tracking::diverged))
{
JLOG(p_journal_.debug())
@@ -2415,6 +2414,23 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
return ret;
}();
std::stringstream ss;
ss << "PEER_IMP_VALIDATION: "
"ledger_hash: "
<< val->getLedgerHash() << " is_trusted: " << isTrusted
<< " master_key: ";
auto master =
app_.validators().getTrustedKey(val->getSignerPublic());
if (master)
{
ss << toBase58(TokenType::NodePublic, *master);
}
else
{
ss << "none";
}
JLOG(p_journal_.debug()) << ss.str();
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
@@ -3008,7 +3024,7 @@ PeerImp::checkPropose(
peerPos.suppressionID(),
peerPos.publicKey(),
std::move(haveMessage),
protocol::mtPROPOSE_LEDGER);
isTrusted);
}
}
@@ -3044,7 +3060,7 @@ PeerImp::checkValidation(
key,
val->getSignerPublic(),
std::move(haveMessage),
protocol::mtVALIDATION);
val->isTrusted());
}
}
}