From cf4f95a9c9eadfb60b7372d9c99a28ed1b3b9a72 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 16:29:33 +0200 Subject: [PATCH] feature: extend squelching to suppress untrusted validator traffic This feature improves network efficiency by limiting message propagation from untrusted validators. Squelching currently reduces the volume of duplicate messages from validators but does not address the volume of unique messages from untrusted validators, who may not contribute meaningfully to network progress. This change introduces a bounded number of slots for untrusted validators, selected based on message frequency. Once selected, their duplicate messages are subject to standard squelching logic, thereby reducing overall message overhead without impacting trusted validator performance. --- src/xrpld/overlay/ReduceRelayCommon.h | 2 + src/xrpld/overlay/Slot.h | 199 +++++++++++++++++++---- src/xrpld/overlay/detail/OverlayImpl.cpp | 19 +++ src/xrpld/overlay/detail/OverlayImpl.h | 15 ++ src/xrpld/overlay/detail/PeerImp.cpp | 2 + 5 files changed, 209 insertions(+), 28 deletions(-) diff --git a/src/xrpld/overlay/ReduceRelayCommon.h b/src/xrpld/overlay/ReduceRelayCommon.h index 473e5d1527..efcb0f9690 100644 --- a/src/xrpld/overlay/ReduceRelayCommon.h +++ b/src/xrpld/overlay/ReduceRelayCommon.h @@ -49,6 +49,8 @@ static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 19; static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 20; // Max selected peers to choose as the source of messages from validator static constexpr uint16_t MAX_SELECTED_PEERS = 5; +// Max number of untrusted slots the server will maintain +static constexpr uint16_t MAX_UNTRUSTED_SLOTS = 5; // Wait before reduce-relay feature is enabled on boot up to let // the server establish peer connections static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10}; diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index cce4fa99f4..d730ad9c70 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -583,6 +583,7 @@ class Slots final std::unordered_set, clock_type, hardened_hash>; + using slots_map = hash_map>; public: /** @@ -590,14 +591,17 @@ public: * @param handler Squelch/unsquelch implementation * @param config reference to the global config */ - Slots(Logs& logs, SquelchHandler const& handler, Config const& config) + Slots(Logs& logs, SquelchHandler& handler, Config const& config) : handler_(handler) , logs_(logs) , journal_(logs.journal("Slots")) , baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE) , maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS) + , enhancedSquelchEnabled_( + config.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE) { } + ~Slots() = default; /** Check if base squelching feature is enabled and ready */ @@ -607,6 +611,13 @@ public: return baseSquelchEnabled_ && reduceRelayReady(); } + /** Check if enhanced squelching feature is enabled and ready */ + bool + enhancedSquelchReady() + { + return enhancedSquelchEnabled_ && reduceRelayReady(); + } + /** Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup */ bool reduceRelayReady() @@ -619,6 +630,32 @@ public: return reduceRelayReady_; } + /** Updates untrusted validator slot. Do not call for trusted + * validators. The caller must ensure passed messages are unique. + * @param key Message hash + * @param validator Validator public key + * @param id The ID of the peer that sent the message + */ + void + updateValidatorSlot(uint256 const& key, PublicKey const& validator, id_t id) + { + updateValidatorSlot(key, validator, id, []() {}); + } + + /** Updates untrusted validator slot. Do not call for trusted + * validators. The caller must ensure passed messages are unique. + * @param key Message hash + * @param validator Validator public key + * @param id The ID of the peer that sent the message + * @param callback A callback to report ignored validations + */ + void + updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + id_t id, + typename Slot::ignored_squelch_callback callback); + /** Calls Slot::update of Slot associated with the validator, with a * noop callback. * @param key Message's hash @@ -758,7 +795,8 @@ private: /** * Updates the last message sent from a validator. - * @param validator the validator public kety + * @param validator The validator public key + * @param peer The peer ID sending the message * @return true if the validator was updated, false otherwise */ std::optional @@ -808,13 +846,16 @@ private: std::atomic_bool reduceRelayReady_{false}; - hash_map> slots_; - SquelchHandler const& handler_; // squelch/unsquelch handler + slots_map slots_; + slots_map untrusted_slots_; + + SquelchHandler& handler_; // squelch/unsquelch handler Logs& logs_; beast::Journal const journal_; bool const baseSquelchEnabled_; uint16_t const maxSelectedPeers_; + bool const enhancedSquelchEnabled_; // Maintain aged container of message/peers. This is required // to discard duplicate message from the same peer. A message @@ -948,50 +989,152 @@ Slots::updateSlotAndSquelch( if (!addPeerMessage(key, id)) return; - auto it = slots_.find(validator); - if (it == slots_.end()) + // If we receive a message from a trusted validator either update an + // existing slot or insert a new one. If we are not running enhanced + // squelching also deduplicate untrusted validator messages + if (isTrusted || !enhancedSquelchEnabled_) { JLOG(journal_.trace()) << "updateSlotAndSquelch: new slot " << Slice(validator); - auto it = - slots_ - .emplace(std::make_pair( - validator, - Slot( - handler_, logs_.journal("Slot"), maxSelectedPeers_))) - .first; - it->second.update(validator, id, type, callback); + auto it = slots_ + .emplace(std::make_pair( + validator, + Slot( + handler_, + logs_.journal("Slot"), + maxSelectedPeers_, + isTrusted))) + .first; + it->second.update(validator, id, callback); } else - it->second.update(validator, id, type, callback); + { + auto it = untrusted_slots_.find(validator); + // If we received a message from a validator that is not + // selected, and is not squelched, there is nothing to do. It + // will be squelched later when `updateValidatorSlot` is called. + if (it == untrusted_slots_.end()) + return; + + it->second.update(validator, id, callback); + } +} + +template +void +Slots::updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + id_t id, + typename Slot::ignored_squelch_callback callback) +{ + // We received a message from an already selected validator + // we can ignore this message + if (untrusted_slots_.find(validator) != untrusted_slots_.end()) + return; + + // We received a message from an already squelched validator. + // This could happen in few cases: + // 1. It happened so that the squelch for a particular peer expired + // before our local squelch. + // 2. We receive a message from a new peer that did not receive the + // squelch request. + // 3. The peer is ignoring our squelch request and we have not sent + // the controll message in a while. + // In all of these cases we can only send them a squelch request again. + if (validatorSquelched(validator)) + { + if (!peerSquelched(validator, id)) + { + squelchValidator(validator, id); + handler_.squelch( + validator, id, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + } + return; + } + + // update a slot if the message is from a selected untrusted validator + if (auto const& it = untrusted_slots_.find(validator); + it != untrusted_slots_.end()) + { + it->second.update(validator, id, callback); + return; + } + + // Do we have any available slots for additional untrusted validators? + // This could happen in few cases: + // 1. We received a message from a new untrusted validator, but we + // are at capacity. + // 2. We received a message from a previously squelched validator. + // In all of these cases we send a squelch message to all peers. + // The validator may still be considered by the selector. However, it + // will be eventually cleaned and squelched + if (untrusted_slots_.size() == MAX_UNTRUSTED_SLOTS) + { + handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + return; + } + + if (auto const v = updateConsideredValidator(validator, id)) + untrusted_slots_.emplace(std::make_pair( + *v, + Slot( + handler_, logs_.journal("Slot"), maxSelectedPeers_, false))); + // When we reach MAX_UNTRUSTED_SLOTS, don't explicitly clean them. + // Since we stop updating their counters, they will idle, and will be + // removed and squelched. } template void Slots::deletePeer(id_t id, bool erase) { - for (auto& [validator, slot] : slots_) - slot.deletePeer(validator, id, erase); + auto deletePeer = [&](slots_map& slots) { + for (auto& [validator, slot] : slots) + slot.deletePeer(validator, id, erase); + }; + + deletePeer(slots_); + deletePeer(untrusted_slots_); } template void Slots::deleteIdlePeers() { - auto now = clock_type::now(); + auto deleteSlots = [&](slots_map& slots) { + auto const now = clock_type::now(); - for (auto it = slots_.begin(); it != slots_.end();) - { - it->second.deleteIdlePeer(it->first); - if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT) + for (auto it = slots.begin(); it != slots.end();) { - JLOG(journal_.trace()) - << "deleteIdlePeers: deleting idle slot " << Slice(it->first); - it = slots_.erase(it); + it->second.deleteIdlePeer(it->first); + if (now - it->second.getLastSelected() > + MAX_UNSQUELCH_EXPIRE_DEFAULT) + { + JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot " + << Slice(it->first); + + // if an untrusted validator slot idled - peers stopped + // sending messages for this validator squelch it + if (!it->second.isTrusted_) + handler_.squelchAll( + it->first, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + + it = slots.erase(it); + } + else + ++it; } - else - ++it; - } + }; + + deleteSlots(slots_); + deleteSlots(untrusted_slots_); + + // remove and squelch all validators that the selector deemed unsuitable + // there might be some good validators in this set that "lapsed". + // However, since these are untrusted validators we're not concerned + for (auto const& validator : cleanConsideredValidators()) + handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); } } // namespace reduce_relay diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index d79ec621f5..3da627fea5 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -1473,6 +1473,25 @@ OverlayImpl::updateSlotAndSquelch( isTrusted); } +void +OverlayImpl::updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + Peer::id_t peer) +{ + if (!slots_.enhancedSquelchReady()) + return; + + if (!strand_.running_in_this_thread()) + return post(strand_, [this, key, validator, peer]() { + updateValidatorSlot(key, validator, peer); + }); + + slots_.updateValidatorSlot(key, validator, peer, [&]() { + reportInboundTraffic(TrafficCount::squelch_ignored, 0); + }); +} + void OverlayImpl::deletePeer(Peer::id_t id) { diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index b27d4a2570..0bd3fafcdb 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -417,6 +417,21 @@ public: Peer::id_t peer, bool isTrusted); + /** Updates the slot information for an untrusted validator. If the + * untrusted validator was previously squelched, sends TMSquelch message to + * the sender of the message. If there are no untrusted slots available + * sends TMSquelch message to all peers to squelch messages from the + * validator. + * @param key Unique message's key + * @param validator Validator's public key + * @param peers Peers' id to update the slots for + */ + void + updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + Peer::id_t peer); + /** Called when the peer is deleted. If the peer was selected to be the * source of messages from the validator then squelched peers have to be * unsquelched. diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 92b9a40c30..6c8af5f6ac 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -2385,6 +2385,8 @@ PeerImp::onMessage(std::shared_ptr const& m) TrafficCount::category::validation_untrusted, Message::messageSize(*m)); + overlay_.updateValidatorSlot(key, val->getSignerPublic(), id_); + // If the operator has specified that untrusted validations be // dropped then this happens here I.e. before further wasting CPU // verifying the signature of an untrusted key