adds squelchStore

This commit is contained in:
Vito
2025-07-14 11:35:54 +02:00
parent 8087785204
commit 2ec5add603
8 changed files with 232 additions and 66 deletions

View File

@@ -23,7 +23,7 @@
#include <xrpld/overlay/Peer.h> #include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/ReduceRelayCommon.h> #include <xrpld/overlay/ReduceRelayCommon.h>
#include <xrpld/overlay/Slot.h> #include <xrpld/overlay/Slot.h>
#include <xrpld/overlay/Squelch.h> #include <xrpld/overlay/SquelchStore.h>
#include <xrpld/overlay/detail/Handshake.h> #include <xrpld/overlay/detail/Handshake.h>
#include <xrpl/basics/random.h> #include <xrpl/basics/random.h>
@@ -458,7 +458,7 @@ class PeerSim : public PeerPartial, public std::enable_shared_from_this<PeerSim>
{ {
public: public:
PeerSim(Overlay& overlay, beast::Journal journal) PeerSim(Overlay& overlay, beast::Journal journal)
: overlay_(overlay), squelch_(journal, overlay_.clock()) : overlay_(overlay), squelchStore_(journal, overlay_.clock())
{ {
id_ = sid_++; id_ = sid_++;
} }
@@ -483,7 +483,7 @@ public:
{ {
auto validator = m->getValidatorKey(); auto validator = m->getValidatorKey();
assert(validator); assert(validator);
if (!squelch_.expireSquelch(*validator)) if (squelchStore_.expireAndIsSquelched(*validator))
return; return;
overlay_.updateSlotAndSquelch({}, *validator, id(), f); overlay_.updateSlotAndSquelch({}, *validator, id(), f);
@@ -495,18 +495,17 @@ public:
{ {
auto validator = squelch.validatorpubkey(); auto validator = squelch.validatorpubkey();
PublicKey key(Slice(validator.data(), validator.size())); PublicKey key(Slice(validator.data(), validator.size()));
if (squelch.squelch()) squelchStore_.handleSquelch(
squelch_.addSquelch( key,
key, std::chrono::seconds{squelch.squelchduration()}); squelch.squelch(),
else std::chrono::seconds{squelch.squelchduration()});
squelch_.removeSquelch(key);
} }
private: private:
inline static Peer::id_t sid_ = 0; inline static Peer::id_t sid_ = 0;
Peer::id_t id_; Peer::id_t id_;
Overlay& overlay_; Overlay& overlay_;
reduce_relay::Squelch squelch_; reduce_relay::SquelchStore squelchStore_;
}; };
class OverlaySim : public Overlay, public reduce_relay::SquelchHandler class OverlaySim : public Overlay, public reduce_relay::SquelchHandler
@@ -1159,7 +1158,6 @@ protected:
checkCounting(PublicKey const& validator, bool isCountingState) checkCounting(PublicKey const& validator, bool isCountingState)
{ {
auto countingState = network_.overlay().isCountingState(validator); auto countingState = network_.overlay().isCountingState(validator);
BEAST_EXPECT(countingState == isCountingState);
return countingState == isCountingState; return countingState == isCountingState;
} }
@@ -1210,7 +1208,7 @@ protected:
bool bool
propagateAndSquelch(bool log, bool purge = true) propagateAndSquelch(bool log, bool purge = true)
{ {
int n = 0; int squelchEvents = 0;
network_.propagate( network_.propagate(
[&](Link& link, MessageSPtr message) { [&](Link& link, MessageSPtr message) {
std::uint16_t squelched = 0; std::uint16_t squelched = 0;
@@ -1230,7 +1228,7 @@ protected:
env_.app() env_.app()
.config() .config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS); .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
n++; squelchEvents++;
} }
}, },
1, 1,
@@ -1240,10 +1238,11 @@ protected:
BEAST_EXPECT( BEAST_EXPECT(
selected.size() == selected.size() ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS); env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
BEAST_EXPECT(n == 1); // only one selection round
BEAST_EXPECT(squelchEvents == 1); // only one selection round
auto res = checkCounting(network_.validator(0), false); auto res = checkCounting(network_.validator(0), false);
BEAST_EXPECT(res); BEAST_EXPECT(res);
return n == 1 && res; return squelchEvents == 1 && res;
} }
/** Send fewer message so that squelch event is not generated */ /** Send fewer message so that squelch event is not generated */
@@ -1270,6 +1269,7 @@ protected:
nMessages, nMessages,
purge); purge);
auto res = checkCounting(network_.validator(0), countingState); auto res = checkCounting(network_.validator(0), countingState);
BEAST_EXPECT(res);
return !squelched && res; return !squelched && res;
} }

View File

@@ -0,0 +1,124 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx/Env.h>
#include <xrpld/overlay/SquelchStore.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/protocol/PublicKey.h>
#include "xrpld/overlay/ReduceRelayCommon.h"
#include <chrono>
namespace ripple {
namespace test {
class squelch_store_test : public beast::unit_test::suite
{
using seconds = std::chrono::seconds;
public:
jtx::Env env_;
squelch_store_test() : env_(*this)
{
}
void
testHandleSquelch()
{
testcase("SquelchStore handleSquelch");
TestStopwatch clock;
auto store = reduce_relay::SquelchStore(env_.journal, clock);
auto const validator = randomKeyPair(KeyType::ed25519).first;
// attempt to squelch the peer with a too small duration
store.handleSquelch(
validator, true, reduce_relay::MIN_UNSQUELCH_EXPIRE - seconds{1});
// the peer must not be squelched
BEAST_EXPECTS(
!store.expireAndIsSquelched(validator), "peer is squelched");
// attempt to squelch the peer with a too big duration
store.handleSquelch(
validator,
true,
reduce_relay::MAX_UNSQUELCH_EXPIRE_PEERS + seconds{1});
// the peer must not be squelched
BEAST_EXPECTS(
!store.expireAndIsSquelched(validator), "peer is squelched");
// squelch the peer with a good duration
store.handleSquelch(
validator, true, reduce_relay::MIN_UNSQUELCH_EXPIRE + seconds{1});
// the peer for the validator should be squelched
BEAST_EXPECTS(
store.expireAndIsSquelched(validator),
"peer and validator are not squelched");
// unsquelch the validator
store.handleSquelch(validator, false, seconds{0});
BEAST_EXPECTS(
!store.expireAndIsSquelched(validator), "peer is squelched");
}
void
testExpireAndIsSquelched()
{
testcase("SquelchStore expireAndIsSquelched");
TestStopwatch clock;
auto store = reduce_relay::SquelchStore(env_.journal, clock);
auto const validator = randomKeyPair(KeyType::ed25519).first;
auto const duration = reduce_relay::MIN_UNSQUELCH_EXPIRE + seconds{1};
store.handleSquelch(
validator, true, reduce_relay::MIN_UNSQUELCH_EXPIRE + seconds{1});
BEAST_EXPECTS(
store.expireAndIsSquelched(validator),
"peer and validator are not squelched");
clock.advance(duration + seconds{1});
// the peer with short squelch duration must be not squelched
BEAST_EXPECTS(
!store.expireAndIsSquelched(validator),
"peer and validator are squelched");
}
void
run() override
{
testHandleSquelch();
testExpireAndIsSquelched();
}
};
BEAST_DEFINE_TESTSUITE(squelch_store, ripple_data, ripple);
} // namespace test
} // namespace ripple

View File

@@ -20,6 +20,8 @@
#ifndef RIPPLE_OVERLAY_SQUELCH_H_INCLUDED #ifndef RIPPLE_OVERLAY_SQUELCH_H_INCLUDED
#define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED #define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED
#include <xrpld/overlay/Peer.h>
#include <xrpl/basics/Log.h> #include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/Journal.h> #include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/PublicKey.h> #include <xrpl/protocol/PublicKey.h>
@@ -30,45 +32,57 @@ namespace ripple {
namespace reduce_relay { namespace reduce_relay {
/** Maintains squelching of relaying messages from validators */ /** Tracks which validators were squelched.
class Squelch */
class SquelchStore
{ {
using clock_type = beast::abstract_clock<std::chrono::steady_clock>; using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
using time_point = typename clock_type::time_point; using time_point = typename clock_type::time_point;
public: public:
explicit Squelch(beast::Journal journal, clock_type& clock) explicit SquelchStore(beast::Journal journal, clock_type& clock)
: journal_(journal), clock_(clock) : journal_(journal), clock_(clock)
{ {
} }
virtual ~Squelch() = default; virtual ~SquelchStore() = default;
/** Handle a squelch request.
* @param validator The validator's public key.
* @param squelch Indicate if the validator should be squelched.
* @param duration Duration in seconds for how long to squelch the
* validator. Duration can be zero when squelch is false.
*/
void
handleSquelch(
PublicKey const& validator,
bool squelch,
std::chrono::seconds duration);
/** Check if a given validator is squelched. If the validator is no longer
* squelched, clear the squelch entry.
* @param validator Validator's public key
* @return true if squelch exists and it is not expired. False otherwise.
*/
bool
expireAndIsSquelched(PublicKey const& validator);
private:
/** Squelch validation/proposal relaying for the validator /** Squelch validation/proposal relaying for the validator
* @param validator The validator's public key * @param validator The validator's public key
* @param squelchDuration Squelch duration in seconds * @param squelchDuration Squelch duration in seconds
* @return false if invalid squelch duration * @return false if invalid squelch duration
*/ */
bool void
addSquelch( add(PublicKey const& validator,
PublicKey const& validator,
std::chrono::seconds const& squelchDuration); std::chrono::seconds const& squelchDuration);
/** Remove the squelch /** Remove the squelch for a validator
* @param validator The validator's public key * @param validator The validator's public key
*/ */
void void
removeSquelch(PublicKey const& validator); remove(PublicKey const& validator);
/** Remove expired squelch // holds a squelch expiration time_point for each validator
* @param validator Validator's public key
* @return true if removed or doesn't exist, false if still active
*/
bool
expireSquelch(PublicKey const& validator);
private:
/** Maintains the list of squelched relaying to downstream peers.
* Expiration time is included in the TMSquelch message. */
hash_map<PublicKey, time_point> squelched_; hash_map<PublicKey, time_point> squelched_;
beast::Journal const journal_; beast::Journal const journal_;
clock_type& clock_; clock_type& clock_;

View File

@@ -17,6 +17,8 @@
*/ */
//============================================================================== //==============================================================================
#include "xrpld/overlay/detail/OverlayImpl.h"
#include <xrpld/app/misc/HashRouter.h> #include <xrpld/app/misc/HashRouter.h>
#include <xrpld/app/misc/NetworkOPs.h> #include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/app/misc/ValidatorList.h> #include <xrpld/app/misc/ValidatorList.h>
@@ -41,6 +43,7 @@
#include <boost/algorithm/string/predicate.hpp> #include <boost/algorithm/string/predicate.hpp>
#include "xrpld/overlay/Peer.h"
#include "xrpld/overlay/detail/TrafficCount.h" #include "xrpld/overlay/detail/TrafficCount.h"
#include <functional> #include <functional>

View File

@@ -44,6 +44,8 @@
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/container/flat_map.hpp> #include <boost/container/flat_map.hpp>
#include "xrpld/overlay/Peer.h"
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
@@ -104,7 +106,7 @@ private:
boost::asio::io_service& io_service_; boost::asio::io_service& io_service_;
std::optional<boost::asio::io_service::work> work_; std::optional<boost::asio::io_service::work> work_;
boost::asio::io_service::strand strand_; boost::asio::io_service::strand strand_;
mutable std::recursive_mutex mutex_; // VFALCO use std::mutex std::recursive_mutex mutable mutex_; // VFALCO use std::mutex
std::condition_variable_any cond_; std::condition_variable_any cond_;
std::weak_ptr<Timer> timer_; std::weak_ptr<Timer> timer_;
boost::container::flat_map<Child*, std::weak_ptr<Child>> list_; boost::container::flat_map<Child*, std::weak_ptr<Child>> list_;

View File

@@ -45,6 +45,7 @@
#include <boost/beast/core/ostream.hpp> #include <boost/beast/core/ostream.hpp>
#include <algorithm> #include <algorithm>
#include <chrono>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <numeric> #include <numeric>
@@ -96,7 +97,7 @@ PeerImp::PeerImp(
, publicKey_(publicKey) , publicKey_(publicKey)
, lastPingTime_(clock_type::now()) , lastPingTime_(clock_type::now())
, creationTime_(clock_type::now()) , creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"), stopwatch()) , squelchStore_(app_.journal("SquelchStore"), stopwatch())
, usage_(consumer) , usage_(consumer)
, fee_{Resource::feeTrivialPeer, ""} , fee_{Resource::feeTrivialPeer, ""}
, slot_(slot) , slot_(slot)
@@ -247,8 +248,8 @@ PeerImp::send(std::shared_ptr<Message> const& m)
if (detaching_) if (detaching_)
return; return;
auto validator = m->getValidatorKey(); auto const validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator)) if (validator && squelchStore_.expireAndIsSquelched(*validator))
{ {
overlay_.reportOutboundTraffic( overlay_.reportOutboundTraffic(
TrafficCount::category::squelch_suppressed, TrafficCount::category::squelch_suppressed,
@@ -266,7 +267,7 @@ PeerImp::send(std::shared_ptr<Message> const& m)
TrafficCount::category::total, TrafficCount::category::total,
static_cast<int>(m->getBuffer(compressionEnabled_).size())); static_cast<int>(m->getBuffer(compressionEnabled_).size()));
auto sendq_size = send_queue_.size(); auto const sendq_size = send_queue_.size();
if (sendq_size < Tuning::targetSendQueue) if (sendq_size < Tuning::targetSendQueue)
{ {

View File

@@ -23,7 +23,7 @@
#include <xrpld/app/consensus/RCLCxPeerPos.h> #include <xrpld/app/consensus/RCLCxPeerPos.h>
#include <xrpld/app/ledger/detail/LedgerReplayMsgHandler.h> #include <xrpld/app/ledger/detail/LedgerReplayMsgHandler.h>
#include <xrpld/app/misc/HashRouter.h> #include <xrpld/app/misc/HashRouter.h>
#include <xrpld/overlay/Squelch.h> #include <xrpld/overlay/SquelchStore.h>
#include <xrpld/overlay/detail/OverlayImpl.h> #include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/overlay/detail/ProtocolVersion.h> #include <xrpld/overlay/detail/ProtocolVersion.h>
#include <xrpld/peerfinder/PeerfinderManager.h> #include <xrpld/peerfinder/PeerfinderManager.h>
@@ -116,7 +116,7 @@ private:
clock_type::time_point lastPingTime_; clock_type::time_point lastPingTime_;
clock_type::time_point const creationTime_; clock_type::time_point const creationTime_;
reduce_relay::Squelch squelch_; reduce_relay::SquelchStore squelchStore_;
// Notes on thread locking: // Notes on thread locking:
// //
@@ -680,7 +680,7 @@ PeerImp::PeerImp(
, publicKey_(publicKey) , publicKey_(publicKey)
, lastPingTime_(clock_type::now()) , lastPingTime_(clock_type::now())
, creationTime_(clock_type::now()) , creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"), stopwatch()) , squelchStore_(app_.journal("SquelchStore"), stopwatch())
, usage_(usage) , usage_(usage)
, fee_{Resource::feeTrivialPeer} , fee_{Resource::feeTrivialPeer}
, slot_(std::move(slot)) , slot_(std::move(slot))

View File

@@ -17,61 +17,83 @@
*/ */
//============================================================================== //==============================================================================
#include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/ReduceRelayCommon.h> #include <xrpld/overlay/ReduceRelayCommon.h>
#include <xrpld/overlay/Squelch.h> #include <xrpld/overlay/SquelchStore.h>
#include <xrpl/basics/Log.h> #include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/Journal.h> #include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/PublicKey.h> #include <xrpl/protocol/PublicKey.h>
#include <chrono> #include <chrono>
#include <unordered_map>
#include <utility>
namespace ripple { namespace ripple {
namespace reduce_relay { namespace reduce_relay {
bool void
Squelch::addSquelch( SquelchStore::handleSquelch(
PublicKey const& validator, PublicKey const& validator,
std::chrono::seconds const& squelchDuration) bool squelch,
std::chrono::seconds duration)
{ {
if (squelchDuration >= MIN_UNSQUELCH_EXPIRE && if (squelch)
squelchDuration <= MAX_UNSQUELCH_EXPIRE_PEERS)
{ {
squelched_[validator] = clock_.now() + squelchDuration; // This should never trigger. The squelh duration is validated in
return true; // PeerImp.onMessage(TMSquelch). However, if somehow invalid duration is
// passed, log is as an error
if ((duration < reduce_relay::MIN_UNSQUELCH_EXPIRE ||
duration > reduce_relay::MAX_UNSQUELCH_EXPIRE_PEERS))
{
JLOG(journal_.error())
<< "SquelchStore: invalid squelch duration validator: "
<< Slice(validator) << " duration: " << duration.count();
return;
}
add(validator, duration);
return;
} }
JLOG(journal_.error()) << "squelch: invalid squelch duration " remove(validator);
<< squelchDuration.count(); }
// unsquelch if invalid duration bool
removeSquelch(validator); SquelchStore::expireAndIsSquelched(PublicKey const& validator)
{
auto const now = clock_.now();
auto const it = squelched_.find(validator);
if (it == squelched_.end())
return false;
if (it->second > now)
return true;
// erase the entry if the squelch expired
squelched_.erase(it);
return false; return false;
} }
void void
Squelch::removeSquelch(PublicKey const& validator) SquelchStore::add(
PublicKey const& validator,
std::chrono::seconds const& duration)
{ {
squelched_.erase(validator); squelched_[validator] = clock_.now() + duration;
} }
bool void
Squelch::expireSquelch(PublicKey const& validator) SquelchStore::remove(PublicKey const& validator)
{ {
auto const now = clock_.now(); auto const it = squelched_.find(validator);
auto const& it = squelched_.find(validator);
if (it == squelched_.end()) if (it == squelched_.end())
return true; return;
else if (it->second > now)
return false;
// squelch expired
squelched_.erase(it); squelched_.erase(it);
return true;
} }
} // namespace reduce_relay } // namespace reduce_relay