diff --git a/src/xrpld/overlay/ReduceRelayCommon.h b/src/xrpld/overlay/ReduceRelayCommon.h index 473e5d1527..efcb0f9690 100644 --- a/src/xrpld/overlay/ReduceRelayCommon.h +++ b/src/xrpld/overlay/ReduceRelayCommon.h @@ -49,6 +49,8 @@ static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 19; static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 20; // Max selected peers to choose as the source of messages from validator static constexpr uint16_t MAX_SELECTED_PEERS = 5; +// Max number of untrusted slots the server will maintain +static constexpr uint16_t MAX_UNTRUSTED_SLOTS = 5; // Wait before reduce-relay feature is enabled on boot up to let // the server establish peer connections static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10}; diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index cce4fa99f4..d730ad9c70 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -583,6 +583,7 @@ class Slots final std::unordered_set, clock_type, hardened_hash>; + using slots_map = hash_map>; public: /** @@ -590,14 +591,17 @@ public: * @param handler Squelch/unsquelch implementation * @param config reference to the global config */ - Slots(Logs& logs, SquelchHandler const& handler, Config const& config) + Slots(Logs& logs, SquelchHandler& handler, Config const& config) : handler_(handler) , logs_(logs) , journal_(logs.journal("Slots")) , baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE) , maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS) + , enhancedSquelchEnabled_( + config.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE) { } + ~Slots() = default; /** Check if base squelching feature is enabled and ready */ @@ -607,6 +611,13 @@ public: return baseSquelchEnabled_ && reduceRelayReady(); } + /** Check if enhanced squelching feature is enabled and ready */ + bool + enhancedSquelchReady() + { + return enhancedSquelchEnabled_ && reduceRelayReady(); + } + /** Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup */ bool reduceRelayReady() @@ -619,6 +630,32 @@ public: return reduceRelayReady_; } + /** Updates untrusted validator slot. Do not call for trusted + * validators. The caller must ensure passed messages are unique. + * @param key Message hash + * @param validator Validator public key + * @param id The ID of the peer that sent the message + */ + void + updateValidatorSlot(uint256 const& key, PublicKey const& validator, id_t id) + { + updateValidatorSlot(key, validator, id, []() {}); + } + + /** Updates untrusted validator slot. Do not call for trusted + * validators. The caller must ensure passed messages are unique. + * @param key Message hash + * @param validator Validator public key + * @param id The ID of the peer that sent the message + * @param callback A callback to report ignored validations + */ + void + updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + id_t id, + typename Slot::ignored_squelch_callback callback); + /** Calls Slot::update of Slot associated with the validator, with a * noop callback. * @param key Message's hash @@ -758,7 +795,8 @@ private: /** * Updates the last message sent from a validator. - * @param validator the validator public kety + * @param validator The validator public key + * @param peer The peer ID sending the message * @return true if the validator was updated, false otherwise */ std::optional @@ -808,13 +846,16 @@ private: std::atomic_bool reduceRelayReady_{false}; - hash_map> slots_; - SquelchHandler const& handler_; // squelch/unsquelch handler + slots_map slots_; + slots_map untrusted_slots_; + + SquelchHandler& handler_; // squelch/unsquelch handler Logs& logs_; beast::Journal const journal_; bool const baseSquelchEnabled_; uint16_t const maxSelectedPeers_; + bool const enhancedSquelchEnabled_; // Maintain aged container of message/peers. This is required // to discard duplicate message from the same peer. A message @@ -948,50 +989,152 @@ Slots::updateSlotAndSquelch( if (!addPeerMessage(key, id)) return; - auto it = slots_.find(validator); - if (it == slots_.end()) + // If we receive a message from a trusted validator either update an + // existing slot or insert a new one. If we are not running enhanced + // squelching also deduplicate untrusted validator messages + if (isTrusted || !enhancedSquelchEnabled_) { JLOG(journal_.trace()) << "updateSlotAndSquelch: new slot " << Slice(validator); - auto it = - slots_ - .emplace(std::make_pair( - validator, - Slot( - handler_, logs_.journal("Slot"), maxSelectedPeers_))) - .first; - it->second.update(validator, id, type, callback); + auto it = slots_ + .emplace(std::make_pair( + validator, + Slot( + handler_, + logs_.journal("Slot"), + maxSelectedPeers_, + isTrusted))) + .first; + it->second.update(validator, id, callback); } else - it->second.update(validator, id, type, callback); + { + auto it = untrusted_slots_.find(validator); + // If we received a message from a validator that is not + // selected, and is not squelched, there is nothing to do. It + // will be squelched later when `updateValidatorSlot` is called. + if (it == untrusted_slots_.end()) + return; + + it->second.update(validator, id, callback); + } +} + +template +void +Slots::updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + id_t id, + typename Slot::ignored_squelch_callback callback) +{ + // We received a message from an already selected validator + // we can ignore this message + if (untrusted_slots_.find(validator) != untrusted_slots_.end()) + return; + + // We received a message from an already squelched validator. + // This could happen in few cases: + // 1. It happened so that the squelch for a particular peer expired + // before our local squelch. + // 2. We receive a message from a new peer that did not receive the + // squelch request. + // 3. The peer is ignoring our squelch request and we have not sent + // the controll message in a while. + // In all of these cases we can only send them a squelch request again. + if (validatorSquelched(validator)) + { + if (!peerSquelched(validator, id)) + { + squelchValidator(validator, id); + handler_.squelch( + validator, id, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + } + return; + } + + // update a slot if the message is from a selected untrusted validator + if (auto const& it = untrusted_slots_.find(validator); + it != untrusted_slots_.end()) + { + it->second.update(validator, id, callback); + return; + } + + // Do we have any available slots for additional untrusted validators? + // This could happen in few cases: + // 1. We received a message from a new untrusted validator, but we + // are at capacity. + // 2. We received a message from a previously squelched validator. + // In all of these cases we send a squelch message to all peers. + // The validator may still be considered by the selector. However, it + // will be eventually cleaned and squelched + if (untrusted_slots_.size() == MAX_UNTRUSTED_SLOTS) + { + handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + return; + } + + if (auto const v = updateConsideredValidator(validator, id)) + untrusted_slots_.emplace(std::make_pair( + *v, + Slot( + handler_, logs_.journal("Slot"), maxSelectedPeers_, false))); + // When we reach MAX_UNTRUSTED_SLOTS, don't explicitly clean them. + // Since we stop updating their counters, they will idle, and will be + // removed and squelched. } template void Slots::deletePeer(id_t id, bool erase) { - for (auto& [validator, slot] : slots_) - slot.deletePeer(validator, id, erase); + auto deletePeer = [&](slots_map& slots) { + for (auto& [validator, slot] : slots) + slot.deletePeer(validator, id, erase); + }; + + deletePeer(slots_); + deletePeer(untrusted_slots_); } template void Slots::deleteIdlePeers() { - auto now = clock_type::now(); + auto deleteSlots = [&](slots_map& slots) { + auto const now = clock_type::now(); - for (auto it = slots_.begin(); it != slots_.end();) - { - it->second.deleteIdlePeer(it->first); - if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT) + for (auto it = slots.begin(); it != slots.end();) { - JLOG(journal_.trace()) - << "deleteIdlePeers: deleting idle slot " << Slice(it->first); - it = slots_.erase(it); + it->second.deleteIdlePeer(it->first); + if (now - it->second.getLastSelected() > + MAX_UNSQUELCH_EXPIRE_DEFAULT) + { + JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot " + << Slice(it->first); + + // if an untrusted validator slot idled - peers stopped + // sending messages for this validator squelch it + if (!it->second.isTrusted_) + handler_.squelchAll( + it->first, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + + it = slots.erase(it); + } + else + ++it; } - else - ++it; - } + }; + + deleteSlots(slots_); + deleteSlots(untrusted_slots_); + + // remove and squelch all validators that the selector deemed unsuitable + // there might be some good validators in this set that "lapsed". + // However, since these are untrusted validators we're not concerned + for (auto const& validator : cleanConsideredValidators()) + handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); } } // namespace reduce_relay diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index d79ec621f5..3da627fea5 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -1473,6 +1473,25 @@ OverlayImpl::updateSlotAndSquelch( isTrusted); } +void +OverlayImpl::updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + Peer::id_t peer) +{ + if (!slots_.enhancedSquelchReady()) + return; + + if (!strand_.running_in_this_thread()) + return post(strand_, [this, key, validator, peer]() { + updateValidatorSlot(key, validator, peer); + }); + + slots_.updateValidatorSlot(key, validator, peer, [&]() { + reportInboundTraffic(TrafficCount::squelch_ignored, 0); + }); +} + void OverlayImpl::deletePeer(Peer::id_t id) { diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index b27d4a2570..0bd3fafcdb 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -417,6 +417,21 @@ public: Peer::id_t peer, bool isTrusted); + /** Updates the slot information for an untrusted validator. If the + * untrusted validator was previously squelched, sends TMSquelch message to + * the sender of the message. If there are no untrusted slots available + * sends TMSquelch message to all peers to squelch messages from the + * validator. + * @param key Unique message's key + * @param validator Validator's public key + * @param peers Peers' id to update the slots for + */ + void + updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + Peer::id_t peer); + /** Called when the peer is deleted. If the peer was selected to be the * source of messages from the validator then squelched peers have to be * unsquelched. diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 92b9a40c30..6c8af5f6ac 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -2385,6 +2385,8 @@ PeerImp::onMessage(std::shared_ptr const& m) TrafficCount::category::validation_untrusted, Message::messageSize(*m)); + overlay_.updateValidatorSlot(key, val->getSignerPublic(), id_); + // If the operator has specified that untrusted validations be // dropped then this happens here I.e. before further wasting CPU // verifying the signature of an untrusted key