adds logic to reset validator progress and better deletion safeguards

If the validator was idle for a short period, reset it's progress. However, if the validator was idle for a long time, delete and squelch it.
Similarly, if the validator sent a lot of unique messages, but failed to reach peering constraints, squelch it.
This commit is contained in:
Vito
2025-07-01 15:01:45 +02:00
parent 8156a70f0e
commit eb805654e0
6 changed files with 198 additions and 46 deletions

View File

@@ -1107,7 +1107,7 @@ protected:
.count();
mustHandle = event.isSelected_ &&
d > milliseconds(reduce_relay::IDLED).count() &&
d > milliseconds(reduce_relay::PEER_IDLED).count() &&
network_.overlay().inState(
*event.key_, reduce_relay::PeerState::Squelched) >
0 &&
@@ -1129,7 +1129,7 @@ protected:
}
if (event.state_ == State::WaitReset ||
(event.state_ == State::On &&
(now - event.time_ > (reduce_relay::IDLED + seconds(2)))))
(now - event.time_ > (reduce_relay::PEER_IDLED + seconds(2)))))
{
bool handled =
event.state_ == State::WaitReset || !event.handled_;
@@ -1319,7 +1319,7 @@ protected:
network_.overlay().clock().advance(seconds(601));
BEAST_EXPECT(propagateAndSquelch(log, true));
network_.overlay().clock().advance(
reduce_relay::IDLED + seconds(1));
reduce_relay::PEER_IDLED + seconds(1));
std::uint16_t unsquelched = 0;
network_.overlay().deleteIdlePeers(
[&](PublicKey const& key, PeerWPtr const& peer) {
@@ -1560,7 +1560,7 @@ vp_base_squelch_max_selected_peers=2
BEAST_EXPECT(peers[0].count == (nMessages - 1));
// advance the clock
network_.overlay().clock().advance(
reduce_relay::IDLED + seconds(1));
reduce_relay::PEER_IDLED + seconds(1));
network_.overlay().updateSlotAndSquelch(
key,
network_.validator(0),

View File

@@ -573,9 +573,9 @@ vp_enhanced_squelch_enable=0
}
void
testUpdateConsideredValidator_newValidator()
testUpdateConsideredValidator_new()
{
testcase("testUpdateConsideredValidator_newValidator");
testcase("testUpdateConsideredValidator_new");
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), noop_handler, env_.app().config(), stopwatch);
@@ -613,9 +613,9 @@ vp_enhanced_squelch_enable=0
}
void
testUpdateConsideredValidator_idleValidator()
testUpdateConsideredValidator_idle()
{
testcase("testUpdateConsideredValidator_idleValidator");
testcase("testUpdateConsideredValidator_idle");
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), noop_handler, env_.app().config(), stopwatch);
@@ -635,7 +635,7 @@ vp_enhanced_squelch_enable=0
auto const state = slots.getConsideredValidators().at(validator);
// simulate a validator sending a new message before the idle timer
stopwatch.advance(reduce_relay::IDLED - std::chrono::seconds(1));
stopwatch.advance(reduce_relay::PEER_IDLED - std::chrono::seconds(1));
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
@@ -647,22 +647,17 @@ vp_enhanced_squelch_enable=0
"non-idling validator was updated");
// simulate a validator idling
stopwatch.advance(reduce_relay::IDLED + std::chrono::seconds(1));
stopwatch.advance(reduce_relay::PEER_IDLED + std::chrono::seconds(1));
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
auto const idleState = slots.getConsideredValidators().at(validator);
// we expect that an idling validator will not be updated
BEAST_EXPECTS(
newState.count == idleState.count, "idling validator was updated");
}
void
testUpdateConsideredValidator_selectQualifyingValidator()
testUpdateConsideredValidator_selectQualifying()
{
testcase("testUpdateConsideredValidator_selectQualifyingValidator");
testcase("testUpdateConsideredValidator_selectQualifying");
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
@@ -686,7 +681,8 @@ vp_enhanced_squelch_enable=0
!slots.updateConsideredValidator(validator2, peerID),
"validator was selected before reaching message threshold");
stopwatch.advance(reduce_relay::IDLED - std::chrono::seconds(1));
stopwatch.advance(
reduce_relay::PEER_IDLED - std::chrono::seconds(1));
}
// as long as the peer criteria is not met, the validator most not be
// selected
@@ -701,7 +697,8 @@ vp_enhanced_squelch_enable=0
!slots.updateConsideredValidator(validator2, i),
"validator was selected before reaching enough peers");
stopwatch.advance(reduce_relay::IDLED - std::chrono::seconds(1));
stopwatch.advance(
reduce_relay::PEER_IDLED - std::chrono::seconds(1));
}
auto const consideredValidator =
@@ -721,9 +718,109 @@ vp_enhanced_squelch_enable=0
}
void
testCleanConsideredValidators_deleteIdleValidator()
testCleanConsideredValidators_resetIdle()
{
testcase("cleanConsideredValidators_deleteIdleValidator");
testcase("testCleanConsideredValidators_resetIdle");
auto const validator = randomKeyPair(KeyType::ed25519).first;
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), noop_handler, env_.app().config(), stopwatch);
// send enough messages for a slot to meet peer requirements
for (int i = 0;
i < env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS;
++i)
slots.updateUntrustedValidatorSlot(
sha512Half(validator) + static_cast<uint256>(i), validator, i);
// send enough messages from some peer to be one message away from
// meeting the selection criteria
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD -
(env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS +
1);
++i)
slots.updateUntrustedValidatorSlot(
sha512Half(validator) + static_cast<uint256>(i), validator, 0);
BEAST_EXPECTS(
slots.getConsideredValidators().at(validator).count ==
reduce_relay::MAX_MESSAGE_THRESHOLD - 1,
"considered validator information is in an invalid state");
BEAST_EXPECTS(
slots.getConsideredValidators().at(validator).peers.size() ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS,
"considered validator information is in an invalid state");
stopwatch.advance(reduce_relay::PEER_IDLED + std::chrono::seconds{1});
// deleteIdlePeers must reset the progress of a validator that idled
slots.deleteIdlePeers();
slots.updateUntrustedValidatorSlot(
sha512Half(validator) + static_cast<uint256>(1), validator, 0);
// we expect that the validator was not selected
BEAST_EXPECTS(
slots.getSlots(false).size() == 0, "untrusted slot was created");
BEAST_EXPECTS(
slots.getConsideredValidators().at(validator).count == 1,
"considered validator information is in an invalid state");
BEAST_EXPECTS(
slots.getConsideredValidators().at(validator).peers.size() == 1,
"considered validator information is in an invalid state");
}
void
testCleanConsideredValidators_deletePoorlyConnected()
{
testcase("cleanConsideredValidators_deletePoorlyConnected");
auto const validator = randomKeyPair(KeyType::ed25519).first;
Peer::id_t peerID = 0;
TestHandler handler{noop_handler};
// verify that squelchAll is called for poorly connected validator
handler.squelchAll_f_ = [&](PublicKey const& actualKey,
std::uint32_t duration,
std::function<void(Peer::id_t)> callback) {
BEAST_EXPECTS(
actualKey == validator, "unexpected key passed to squelchAll");
callback(peerID);
};
TestStopwatch stopwatch;
EnhancedSquelchingTestSlots slots(
env_.app().logs(), handler, env_.app().config(), stopwatch);
// send enough messages from a single peer
for (int i = 0; i < 2 * reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i)
slots.updateUntrustedValidatorSlot(
sha512Half(validator) + static_cast<uint256>(i),
validator,
peerID);
stopwatch.advance(reduce_relay::PEER_IDLED + std::chrono::seconds{1});
// deleteIdlePeers must squelch the validator as it failed to reach
// peering requirements
slots.deleteIdlePeers();
BEAST_EXPECTS(
slots.getConsideredValidators().size() == 0,
"poorly connected validator was not deleted");
}
void
testCleanConsideredValidators_deleteSilent()
{
testcase("cleanConsideredValidators_deleteSilent");
// insert some random validator key
auto const idleValidator = randomKeyPair(KeyType::ed25519).first;
auto const validator = randomKeyPair(KeyType::ed25519).first;
@@ -755,7 +852,9 @@ vp_enhanced_squelch_enable=0
"new validator was not added for consideration");
// simulate a validator idling
stopwatch.advance(reduce_relay::IDLED + std::chrono::seconds(1));
stopwatch.advance(
reduce_relay::MAX_UNTRUSTED_VALIDATOR_IDLE +
std::chrono::seconds(1));
BEAST_EXPECTS(
!slots.updateConsideredValidator(validator, peerID),
"validator was selected with insufficient number of peers");
@@ -825,10 +924,12 @@ private:
testDeleteIdlePeers_deleteIdleSlots();
testDeleteIdlePeers_deleteIdleUntrustedPeer();
testUpdateSlotAndSquelch_untrustedValidator();
testUpdateConsideredValidator_newValidator();
testUpdateConsideredValidator_idleValidator();
testUpdateConsideredValidator_selectQualifyingValidator();
testCleanConsideredValidators_deleteIdleValidator();
testUpdateConsideredValidator_new();
testUpdateConsideredValidator_idle();
testUpdateConsideredValidator_selectQualifying();
testCleanConsideredValidators_deleteSilent();
testCleanConsideredValidators_resetIdle();
testCleanConsideredValidators_deletePoorlyConnected();
}
};

View File

@@ -21,6 +21,7 @@
#define RIPPLE_OVERLAY_REDUCERELAYCOMMON_H_INCLUDED
#include <chrono>
#include <cstdint>
namespace ripple {
@@ -39,21 +40,31 @@ static constexpr auto MIN_UNSQUELCH_EXPIRE = std::chrono::seconds{300};
static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT = std::chrono::seconds{600};
static constexpr auto SQUELCH_PER_PEER = std::chrono::seconds(10);
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS = std::chrono::seconds{3600};
// No message received threshold before identifying a peer as idled
static constexpr auto IDLED = std::chrono::seconds{8};
static constexpr auto PEER_IDLED = std::chrono::seconds{8};
// Message count threshold to start selecting peers as the source
// of messages from the validator. We add peers who reach
// MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS
// reach MAX_MESSAGE_THRESHOLD.
static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 19;
static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 20;
// Max selected peers to choose as the source of messages from validator
static constexpr uint16_t MAX_SELECTED_PEERS = 5;
// Max number of untrusted slots the server will maintain
static constexpr uint16_t MAX_UNTRUSTED_SLOTS = 5;
// The maximum of seconds an untrusted validator can go without sending a
// validation message. After this, a validator may be squelched
static constexpr auto MAX_UNTRUSTED_VALIDATOR_IDLE = std::chrono::seconds{30};
// Wait before reduce-relay feature is enabled on boot up to let
// the server establish peer connections
static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10};
// Maximum size of the aggregated transaction hashes per peer.
// Once we get to high tps throughput, this cap will prevent
// TMTransactions from exceeding the current protocol message

View File

@@ -148,8 +148,7 @@ public:
std::size_t timesSelected; // number of times the peer was selected
};
/** Get all peers of the slot. This methos is only to be used in
* unit-tests.
/** Get all peers of the slot.
*/
std::unordered_map<Peer::id_t, PeerInfo> const&
getPeers() const
@@ -520,10 +519,19 @@ protected:
struct ValidatorInfo
{
size_t count; // the number of messages sent from this validator
time_point lastMessage; // timestamp of the last message
std::unordered_set<Peer::id_t> peers; // a list of peer IDs that sent a
// message for this validator
// the number of messages sent from this validator
size_t count;
// timestamp of the last message
time_point lastMessage;
// a list of peer IDs that sent a message for this validator
std::unordered_set<Peer::id_t> peers;
void
reset()
{
count = 0;
peers.clear();
}
};
// Untrusted validators considered for open untrusted slots

View File

@@ -1722,7 +1722,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
// receives within IDLED seconds since the message has been relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
if (relayed &&
(stopwatch().now() - *relayed) < reduce_relay::PEER_IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, isTrusted);
@@ -2366,7 +2367,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
if (relayed &&
(stopwatch().now() - *relayed) < reduce_relay::PEER_IDLED)
overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, isTrusted);

View File

@@ -521,10 +521,6 @@ Slots::updateConsideredValidator(PublicKey const& validator, Peer::id_t peer)
return std::nullopt;
}
// the validator idled. Don't update it, it will be cleaned later
if (now - it->second.lastMessage > IDLED)
return std::nullopt;
it->second.peers.insert(peer);
it->second.lastMessage = now;
++it->second.count;
@@ -562,19 +558,32 @@ Slots::deleteIdlePeers()
for (auto it = slots.begin(); it != slots.end();)
{
it->second.deleteIdlePeer(it->first);
if (now - it->second.getLastSelected() >
MAX_UNSQUELCH_EXPIRE_DEFAULT)
auto const& validator = it->first;
auto& slot = it->second;
slot.deleteIdlePeer(validator);
// delete the slot if the untrusted slot no longer meets the
// selection critera or it has not been selected for a while
if ((!slot.isTrusted_ &&
slot.getPeers().size() < maxSelectedPeers_) ||
now - it->second.getLastSelected() >
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT)
{
JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot "
<< Slice(it->first);
JLOG(journal_.trace())
<< "deleteIdlePeers: deleting "
<< (slot.isTrusted_ ? "trusted" : "untrusted") << " slot "
<< Slice(it->first) << " reason: "
<< (now - it->second.getLastSelected() >
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT
? " inactive "
: " insufficient peers");
// if an untrusted validator slot idled - peers stopped
// sending messages for this validator squelch it
if (!it->second.isTrusted_)
handler_.squelchAll(
it->first,
MAX_UNSQUELCH_EXPIRE_DEFAULT.count(),
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT.count(),
[&](Peer::id_t id) {
registerSquelchedValidator(it->first, id);
});
@@ -608,11 +617,32 @@ Slots::cleanConsideredValidators()
for (auto it = consideredValidators_.begin();
it != consideredValidators_.end();)
{
if (now - it->second.lastMessage > IDLED)
// this is a safety check for validators that have
// sent a lot of validations via limited number of peers
if (it->second.count > 2 * reduce_relay::MAX_MESSAGE_THRESHOLD &&
it->second.peers.size() < maxSelectedPeers_)
{
JLOG(journal_.warn())
<< "cleanConsideredValidators: removing "
"validator "
<< Slice(it->first) << " with insufficient peers";
keys.push_back(it->first);
it = consideredValidators_.erase(it);
}
else if (
now - it->second.lastMessage >
reduce_relay::MAX_UNTRUSTED_VALIDATOR_IDLE)
{
keys.push_back(it->first);
it = consideredValidators_.erase(it);
}
// Due to some reason the validator idled, reset their progress
else if (now - it->second.lastMessage > reduce_relay::PEER_IDLED)
{
it->second.reset();
++it;
}
else
++it;
}