feat: improve squelching configuration (#5438)

This commit introduces the following changes:
* Renames `vp_enable config` option to `vp_base_squelch_enable` to enable squelching for validators.
* Removes `vp_squelch` config option which was used to configure whether to send squelch messages to peers or not. With this flag removed, if squelching is enabled, squelch messages will be sent. This was an option used for debugging.
* Introduces a temporary `vp_base_squelch_max_trusted_peers` config option to change the max number of peers who are selected as validator message sources. This is a temporary option, which will be removed once a good value is found.
* Adds a traffic counter to count the number of times peers ignored squelch messages and kept sending messages for squelched validators.
* Moves the decision whether squelching is enabled and ready into Slot.h.
This commit is contained in:
Vito Tumas
2025-05-28 12:30:03 +02:00
committed by GitHub
parent be668ee26d
commit d71ce51901
12 changed files with 387 additions and 135 deletions

View File

@@ -473,17 +473,14 @@ public:
Config c;
std::stringstream str;
str << "[reduce_relay]\n"
<< "vp_enable=1\n"
<< "vp_squelch=1\n"
<< "vp_base_squelch_enable=1\n"
<< "[compression]\n"
<< enable << "\n";
c.loadFromString(str.str());
auto env = std::make_shared<jtx::Env>(*this);
env->app().config().COMPRESSION = c.COMPRESSION;
env->app().config().VP_REDUCE_RELAY_ENABLE =
c.VP_REDUCE_RELAY_ENABLE;
env->app().config().VP_REDUCE_RELAY_SQUELCH =
c.VP_REDUCE_RELAY_SQUELCH;
env->app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE;
return env;
};
auto handshake = [&](int outboundEnable, int inboundEnable) {
@@ -496,7 +493,7 @@ public:
env->app().config().COMPRESSION,
false,
env->app().config().TX_REDUCE_RELAY_ENABLE,
env->app().config().VP_REDUCE_RELAY_ENABLE);
env->app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
http_request_type http_request;
http_request.version(request.version());
http_request.base() = request.base();

View File

@@ -17,6 +17,7 @@
*/
//==============================================================================
#include <test/jtx.h>
#include <test/jtx/Env.h>
#include <xrpld/overlay/Message.h>
@@ -32,6 +33,8 @@
#include <boost/thread.hpp>
#include <chrono>
#include <iostream>
#include <numeric>
#include <optional>
@@ -517,7 +520,8 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler
public:
using id_t = Peer::id_t;
using clock_type = ManualClock;
OverlaySim(Application& app) : slots_(app.logs(), *this), logs_(app.logs())
OverlaySim(Application& app)
: slots_(app.logs(), *this, app.config()), logs_(app.logs())
{
}
@@ -986,7 +990,10 @@ protected:
network_.overlay().isCountingState(validator);
BEAST_EXPECT(
countingState == false &&
selected.size() == reduce_relay::MAX_SELECTED_PEERS);
selected.size() ==
env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
}
// Trigger Link Down or Peer Disconnect event
@@ -1188,7 +1195,10 @@ protected:
{
BEAST_EXPECT(
squelched ==
MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS);
MAX_PEERS -
env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
n++;
}
},
@@ -1197,7 +1207,9 @@ protected:
purge,
resetClock);
auto selected = network_.overlay().getSelected(network_.validator(0));
BEAST_EXPECT(selected.size() == reduce_relay::MAX_SELECTED_PEERS);
BEAST_EXPECT(
selected.size() ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
BEAST_EXPECT(n == 1); // only one selection round
auto res = checkCounting(network_.validator(0), false);
BEAST_EXPECT(res);
@@ -1261,7 +1273,11 @@ protected:
unsquelched++;
});
BEAST_EXPECT(
unsquelched == MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS);
unsquelched ==
MAX_PEERS -
env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
BEAST_EXPECT(checkCounting(network_.validator(0), true));
});
}
@@ -1282,7 +1298,11 @@ protected:
});
auto peers = network_.overlay().getPeers(network_.validator(0));
BEAST_EXPECT(
unsquelched == MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS);
unsquelched ==
MAX_PEERS -
env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
BEAST_EXPECT(checkCounting(network_.validator(0), true));
});
}
@@ -1314,42 +1334,164 @@ protected:
void
testConfig(bool log)
{
doTest("Config Test", log, [&](bool log) {
doTest("Test Config - squelch enabled (legacy)", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_enable=1
vp_squelch=1
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENABLE == true);
BEAST_EXPECT(c.VP_REDUCE_RELAY_SQUELCH == true);
BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == true);
});
doTest("Test Config - squelch disabled (legacy)", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_enable=0
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false);
Config c1;
toLoad = (R"rippleConfig(
toLoad = R"rippleConfig(
[reduce_relay]
vp_enable=0
vp_squelch=0
)rippleConfig");
)rippleConfig";
c1.loadFromString(toLoad);
BEAST_EXPECT(c1.VP_REDUCE_RELAY_ENABLE == false);
BEAST_EXPECT(c1.VP_REDUCE_RELAY_SQUELCH == false);
BEAST_EXPECT(c1.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false);
});
doTest("Test Config - squelch enabled", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_base_squelch_enable=1
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == true);
});
doTest("Test Config - squelch disabled", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_base_squelch_enable=0
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false);
});
doTest("Test Config - legacy and new", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_base_squelch_enable=0
vp_enable=0
)rippleConfig");
std::string error;
auto const expectedError =
"Invalid reduce_relay"
" cannot specify both vp_base_squelch_enable and vp_enable "
"options. "
"vp_enable was deprecated and replaced by "
"vp_base_squelch_enable";
try
{
c.loadFromString(toLoad);
}
catch (std::runtime_error& e)
{
error = e.what();
}
BEAST_EXPECT(error == expectedError);
});
doTest("Test Config - max selected peers", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS == 5);
Config c1;
toLoad = R"rippleConfig(
[reduce_relay]
vp_base_squelch_max_selected_peers=6
)rippleConfig";
c1.loadFromString(toLoad);
BEAST_EXPECT(c1.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS == 6);
Config c2;
toLoad = R"rippleConfig(
[reduce_relay]
vp_enabled=1
vp_squelched=1
vp_base_squelch_max_selected_peers=2
)rippleConfig";
c2.loadFromString(toLoad);
BEAST_EXPECT(c2.VP_REDUCE_RELAY_ENABLE == false);
BEAST_EXPECT(c2.VP_REDUCE_RELAY_SQUELCH == false);
std::string error;
auto const expectedError =
"Invalid reduce_relay"
" vp_base_squelch_max_selected_peers must be "
"greater than or equal to 3";
try
{
c2.loadFromString(toLoad);
}
catch (std::runtime_error& e)
{
error = e.what();
}
BEAST_EXPECT(error == expectedError);
});
}
void
testBaseSquelchReady(bool log)
{
doTest("BaseSquelchReady", log, [&](bool log) {
ManualClock::reset();
auto createSlots = [&](bool baseSquelchEnabled)
-> reduce_relay::Slots<ManualClock> {
env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
baseSquelchEnabled;
return reduce_relay::Slots<ManualClock>(
env_.app().logs(), network_.overlay(), env_.app().config());
};
// base squelching must not be ready if squelching is disabled
BEAST_EXPECT(!createSlots(false).baseSquelchReady());
// base squelch must not be ready as not enough time passed from
// bootup
BEAST_EXPECT(!createSlots(true).baseSquelchReady());
ManualClock::advance(reduce_relay::WAIT_ON_BOOTUP + minutes{1});
// base squelch enabled and bootup time passed
BEAST_EXPECT(createSlots(true).baseSquelchReady());
// even if time passed, base squelching must not be ready if turned
// off in the config
BEAST_EXPECT(!createSlots(false).baseSquelchReady());
});
}
@@ -1425,7 +1567,7 @@ vp_squelched=1
auto run = [&](int npeers) {
handler.maxDuration_ = 0;
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), handler);
env_.app().logs(), handler, env_.app().config());
// 1st message from a new peer switches the slot
// to counting state and resets the counts of all peers +
// MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold
@@ -1503,14 +1645,12 @@ vp_squelched=1
std::stringstream str;
str << "[reduce_relay]\n"
<< "vp_enable=" << enable << "\n"
<< "vp_squelch=" << enable << "\n"
<< "[compression]\n"
<< "1\n";
c.loadFromString(str.str());
env_.app().config().VP_REDUCE_RELAY_ENABLE =
c.VP_REDUCE_RELAY_ENABLE;
env_.app().config().VP_REDUCE_RELAY_SQUELCH =
c.VP_REDUCE_RELAY_SQUELCH;
env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE;
env_.app().config().COMPRESSION = c.COMPRESSION;
};
auto handshake = [&](int outboundEnable, int inboundEnable) {
@@ -1523,7 +1663,7 @@ vp_squelched=1
env_.app().config().COMPRESSION,
false,
env_.app().config().TX_REDUCE_RELAY_ENABLE,
env_.app().config().VP_REDUCE_RELAY_ENABLE);
env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
http_request_type http_request;
http_request.version(request.version());
http_request.base() = request.base();
@@ -1563,7 +1703,13 @@ vp_squelched=1
Network network_;
public:
reduce_relay_test() : env_(*this), network_(env_.app())
reduce_relay_test()
: env_(*this, jtx::envconfig([](std::unique_ptr<Config> cfg) {
cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = true;
cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6;
return cfg;
}))
, network_(env_.app())
{
}
@@ -1582,6 +1728,7 @@ public:
testInternalHashRouter(log);
testRandomSquelch(log);
testHandshake(log);
testBaseSquelchReady(log);
}
};

View File

@@ -242,19 +242,18 @@ public:
// size, but we allow admins to explicitly set it in the config.
std::optional<int> SWEEP_INTERVAL;
// Reduce-relay - these parameters are experimental.
// Enable reduce-relay features
// Validation/proposal reduce-relay feature
bool VP_REDUCE_RELAY_ENABLE = false;
// Send squelch message to peers. Generally this config should
// have the same value as VP_REDUCE_RELAY_ENABLE. It can be
// used for testing the feature's function without
// affecting the message relaying. To use it for testing,
// set it to false and set VP_REDUCE_RELAY_ENABLE to true.
// Squelch messages will not be sent to the peers in this case.
// Set log level to debug so that the feature function can be
// analyzed.
bool VP_REDUCE_RELAY_SQUELCH = false;
// Reduce-relay - Experimental parameters to control p2p routing algorithms
// Enable base squelching of duplicate validation/proposal messages
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = false;
///////////////////// !!TEMPORARY CODE BLOCK!! ////////////////////////
// Temporary squelching config for the peers selected as a source of //
// validator messages. The config must be removed once squelching is //
// made the default routing algorithm //
std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 5;
///////////////// END OF TEMPORARY CODE BLOCK /////////////////////
// Transaction reduce-relay feature
bool TX_REDUCE_RELAY_ENABLE = false;
// If tx reduce-relay feature is disabled

View File

@@ -737,8 +737,44 @@ Config::loadFromString(std::string const& fileContents)
if (exists(SECTION_REDUCE_RELAY))
{
auto sec = section(SECTION_REDUCE_RELAY);
VP_REDUCE_RELAY_ENABLE = sec.value_or("vp_enable", false);
VP_REDUCE_RELAY_SQUELCH = sec.value_or("vp_squelch", false);
///////////////////// !!TEMPORARY CODE BLOCK!! ////////////////////////
// vp_enable config option is deprecated by vp_base_squelch_enable //
// This option is kept for backwards compatibility. When squelching //
// is the default algorithm, it must be replaced with: //
// VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = //
// sec.value_or("vp_base_squelch_enable", true); //
if (sec.exists("vp_base_squelch_enable") && sec.exists("vp_enable"))
Throw<std::runtime_error>(
"Invalid " SECTION_REDUCE_RELAY
" cannot specify both vp_base_squelch_enable and vp_enable "
"options. "
"vp_enable was deprecated and replaced by "
"vp_base_squelch_enable");
if (sec.exists("vp_base_squelch_enable"))
VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
sec.value_or("vp_base_squelch_enable", false);
else if (sec.exists("vp_enable"))
VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
sec.value_or("vp_enable", false);
else
VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = false;
///////////////// !!END OF TEMPORARY CODE BLOCK!! /////////////////////
///////////////////// !!TEMPORARY CODE BLOCK!! ///////////////////////
// Temporary squelching config for the peers selected as a source of //
// validator messages. The config must be removed once squelching is //
// made the default routing algorithm. //
VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS =
sec.value_or("vp_base_squelch_max_selected_peers", 5);
if (VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS < 3)
Throw<std::runtime_error>(
"Invalid " SECTION_REDUCE_RELAY
" vp_base_squelch_max_selected_peers must be "
"greater than or equal to 3");
///////////////// !!END OF TEMPORARY CODE BLOCK!! /////////////////////
TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false);
TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false);
TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20);
@@ -747,9 +783,9 @@ Config::loadFromString(std::string const& fileContents)
TX_REDUCE_RELAY_MIN_PEERS < 10)
Throw<std::runtime_error>(
"Invalid " SECTION_REDUCE_RELAY
", tx_min_peers must be greater or equal to 10"
", tx_relay_percentage must be greater or equal to 10 "
"and less or equal to 100");
", tx_min_peers must be greater than or equal to 10"
", tx_relay_percentage must be greater than or equal to 10 "
"and less than or equal to 100");
}
if (getSingleSection(secConfig, SECTION_MAX_TRANSACTIONS, strTemp, j_))

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED
#define RIPPLE_OVERLAY_SLOT_H_INCLUDED
#include <xrpld/core/Config.h>
#include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/ReduceRelayCommon.h>
@@ -32,7 +33,6 @@
#include <xrpl/protocol/messages.h>
#include <algorithm>
#include <memory>
#include <optional>
#include <set>
#include <tuple>
@@ -109,16 +109,25 @@ private:
using id_t = Peer::id_t;
using time_point = typename clock_type::time_point;
// a callback to report ignored squelches
using ignored_squelch_callback = std::function<void()>;
/** Constructor
* @param journal Journal for logging
* @param handler Squelch/Unsquelch implementation
* @param maxSelectedPeers the maximum number of peers to be selected as
* validator message source
*/
Slot(SquelchHandler const& handler, beast::Journal journal)
Slot(
SquelchHandler const& handler,
beast::Journal journal,
uint16_t maxSelectedPeers)
: reachedThreshold_(0)
, lastSelected_(clock_type::now())
, state_(SlotState::Counting)
, handler_(handler)
, journal_(journal)
, maxSelectedPeers_(maxSelectedPeers)
{
}
@@ -129,7 +138,7 @@ private:
* slot's state to Counting. If the number of messages for the peer is >
* MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the
* number of considered peers who reached MAX_MESSAGE_THRESHOLD is
* MAX_SELECTED_PEERS then randomly select MAX_SELECTED_PEERS from
* maxSelectedPeers_ then randomly select maxSelectedPeers_ from
* considered peers, and call squelch handler for each peer, which is not
* selected and not already in Squelched state. Set the state for those
* peers to Squelched and reset the count of all peers. Set slot's state to
@@ -139,9 +148,14 @@ private:
* @param id Peer id which received the message
* @param type Message type (Validation and Propose Set only,
* others are ignored, future use)
* @param callback A callback to report ignored squelches
*/
void
update(PublicKey const& validator, id_t id, protocol::MessageType type);
update(
PublicKey const& validator,
id_t id,
protocol::MessageType type,
ignored_squelch_callback callback);
/** Handle peer deletion when a peer disconnects.
* If the peer is in Selected state then
@@ -223,17 +237,26 @@ private:
time_point expire; // squelch expiration time
time_point lastMessage; // time last message received
};
std::unordered_map<id_t, PeerInfo> peers_; // peer's data
// pool of peers considered as the source of messages
// from validator - peers that reached MIN_MESSAGE_THRESHOLD
std::unordered_set<id_t> considered_;
// number of peers that reached MAX_MESSAGE_THRESHOLD
std::uint16_t reachedThreshold_;
// last time peers were selected, used to age the slot
typename clock_type::time_point lastSelected_;
SlotState state_; // slot's state
SquelchHandler const& handler_; // squelch/unsquelch handler
beast::Journal const journal_; // logging
// the maximum number of peers that should be selected as a validator
// message source
uint16_t const maxSelectedPeers_;
};
template <typename clock_type>
@@ -264,7 +287,8 @@ void
Slot<clock_type>::update(
PublicKey const& validator,
id_t id,
protocol::MessageType type)
protocol::MessageType type,
ignored_squelch_callback callback)
{
using namespace std::chrono;
auto now = clock_type::now();
@@ -302,6 +326,10 @@ Slot<clock_type>::update(
peer.lastMessage = now;
// report if we received a message from a squelched peer
if (peer.state == PeerState::Squelched)
callback();
if (state_ != SlotState::Counting || peer.state == PeerState::Squelched)
return;
@@ -319,17 +347,17 @@ Slot<clock_type>::update(
return;
}
if (reachedThreshold_ == MAX_SELECTED_PEERS)
if (reachedThreshold_ == maxSelectedPeers_)
{
// Randomly select MAX_SELECTED_PEERS peers from considered.
// Randomly select maxSelectedPeers_ peers from considered.
// Exclude peers that have been idling > IDLED -
// it's possible that deleteIdlePeer() has not been called yet.
// If number of remaining peers != MAX_SELECTED_PEERS
// If number of remaining peers != maxSelectedPeers_
// then reset the Counting state and let deleteIdlePeer() handle
// idled peers.
std::unordered_set<id_t> selected;
auto const consideredPoolSize = considered_.size();
while (selected.size() != MAX_SELECTED_PEERS && considered_.size() != 0)
while (selected.size() != maxSelectedPeers_ && considered_.size() != 0)
{
auto i =
considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1);
@@ -347,7 +375,7 @@ Slot<clock_type>::update(
selected.insert(id);
}
if (selected.size() != MAX_SELECTED_PEERS)
if (selected.size() != maxSelectedPeers_)
{
JLOG(journal_.trace())
<< "update: selection failed " << Slice(validator) << " " << id;
@@ -364,7 +392,7 @@ Slot<clock_type>::update(
<< *std::next(s, 1) << " " << *std::next(s, 2);
XRPL_ASSERT(
peers_.size() >= MAX_SELECTED_PEERS,
peers_.size() >= maxSelectedPeers_,
"ripple::reduce_relay::Slot::update : minimum peers");
// squelch peers which are not selected and
@@ -382,7 +410,7 @@ Slot<clock_type>::update(
str << k << " ";
v.state = PeerState::Squelched;
std::chrono::seconds duration =
getSquelchDuration(peers_.size() - MAX_SELECTED_PEERS);
getSquelchDuration(peers_.size() - maxSelectedPeers_);
v.expire = now + duration;
handler_.squelch(validator, k, duration.count());
}
@@ -544,15 +572,41 @@ class Slots final
public:
/**
* @param app Applicaton reference
* @param logs reference to the logger
* @param handler Squelch/unsquelch implementation
* @param config reference to the global config
*/
Slots(Logs& logs, SquelchHandler const& handler)
: handler_(handler), logs_(logs), journal_(logs.journal("Slots"))
Slots(Logs& logs, SquelchHandler const& handler, Config const& config)
: handler_(handler)
, logs_(logs)
, journal_(logs.journal("Slots"))
, baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
, maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS)
{
}
~Slots() = default;
/** Calls Slot::update of Slot associated with the validator.
/** Check if base squelching feature is enabled and ready */
bool
baseSquelchReady()
{
return baseSquelchEnabled_ && reduceRelayReady();
}
/** Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup */
bool
reduceRelayReady()
{
if (!reduceRelayReady_)
reduceRelayReady_ =
reduce_relay::epoch<std::chrono::minutes>(clock_type::now()) >
reduce_relay::WAIT_ON_BOOTUP;
return reduceRelayReady_;
}
/** Calls Slot::update of Slot associated with the validator, with a noop
* callback.
* @param key Message's hash
* @param validator Validator's public key
* @param id Peer's id which received the message
@@ -563,7 +617,25 @@ public:
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type);
protocol::MessageType type)
{
updateSlotAndSquelch(key, validator, id, type, []() {});
}
/** Calls Slot::update of Slot associated with the validator.
* @param key Message's hash
* @param validator Validator's public key
* @param id Peer's id which received the message
* @param type Received protocol message type
* @param callback A callback to report ignored validations
*/
void
updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type,
typename Slot<clock_type>::ignored_squelch_callback callback);
/** Check if peers stopped relaying messages
* and if slots stopped receiving messages from the validator.
@@ -651,10 +723,16 @@ private:
bool
addPeerMessage(uint256 const& key, id_t id);
std::atomic_bool reduceRelayReady_{false};
hash_map<PublicKey, Slot<clock_type>> slots_;
SquelchHandler const& handler_; // squelch/unsquelch handler
Logs& logs_;
beast::Journal const journal_;
bool const baseSquelchEnabled_;
uint16_t const maxSelectedPeers_;
// Maintain aged container of message/peers. This is required
// to discard duplicate message from the same peer. A message
// is aged after IDLED seconds. A message received IDLED seconds
@@ -702,7 +780,8 @@ Slots<clock_type>::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type)
protocol::MessageType type,
typename Slot<clock_type>::ignored_squelch_callback callback)
{
if (!addPeerMessage(key, id))
return;
@@ -712,15 +791,17 @@ Slots<clock_type>::updateSlotAndSquelch(
{
JLOG(journal_.trace())
<< "updateSlotAndSquelch: new slot " << Slice(validator);
auto it = slots_
.emplace(std::make_pair(
validator,
Slot<clock_type>(handler_, logs_.journal("Slot"))))
.first;
it->second.update(validator, id, type);
auto it =
slots_
.emplace(std::make_pair(
validator,
Slot<clock_type>(
handler_, logs_.journal("Slot"), maxSelectedPeers_)))
.first;
it->second.update(validator, id, type, callback);
}
else
it->second.update(validator, id, type);
it->second.update(validator, id, type, callback);
}
template <typename clock_type>

View File

@@ -209,7 +209,7 @@ ConnectAttempt::onHandshake(error_code ec)
app_.config().COMPRESSION,
app_.config().LEDGER_REPLAY,
app_.config().TX_REDUCE_RELAY_ENABLE,
app_.config().VP_REDUCE_RELAY_ENABLE);
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
buildHandshake(
req_,

View File

@@ -414,7 +414,7 @@ makeResponse(
app.config().COMPRESSION,
app.config().LEDGER_REPLAY,
app.config().TX_REDUCE_RELAY_ENABLE,
app.config().VP_REDUCE_RELAY_ENABLE));
app.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE));
buildHandshake(resp, sharedValue, networkID, public_ip, remote_ip, app);

View File

@@ -139,7 +139,7 @@ makeResponse(
// compression feature
static constexpr char FEATURE_COMPR[] = "compr";
// validation/proposal reduce-relay feature
// validation/proposal reduce-relay base squelch feature
static constexpr char FEATURE_VPRR[] = "vprr";
// transaction reduce-relay feature
static constexpr char FEATURE_TXRR[] = "txrr";
@@ -221,7 +221,7 @@ peerFeatureEnabled(
@param txReduceRelayEnabled if true then transaction reduce-relay feature is
enabled
@param vpReduceRelayEnabled if true then validation/proposal reduce-relay
feature is enabled
base squelch feature is enabled
@return X-Protocol-Ctl header value
*/
std::string
@@ -241,8 +241,7 @@ makeFeaturesRequestHeader(
@param txReduceRelayEnabled if true then transaction reduce-relay feature is
enabled
@param vpReduceRelayEnabled if true then validation/proposal reduce-relay
feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
base squelch feature is enabled
@return X-Protocol-Ctl header value
*/
std::string

View File

@@ -142,7 +142,7 @@ OverlayImpl::OverlayImpl(
, m_resolver(resolver)
, next_id_(1)
, timer_count_(0)
, slots_(app.logs(), *this)
, slots_(app.logs(), *this, app.config())
, m_stats(
std::bind(&OverlayImpl::collect_metrics, this),
collector,
@@ -1390,8 +1390,7 @@ makeSquelchMessage(
void
OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const
{
if (auto peer = findPeerByShortID(id);
peer && app_.config().VP_REDUCE_RELAY_SQUELCH)
if (auto peer = findPeerByShortID(id); peer)
{
// optimize - multiple message with different
// validator might be sent to the same peer
@@ -1405,8 +1404,7 @@ OverlayImpl::squelch(
Peer::id_t id,
uint32_t squelchDuration) const
{
if (auto peer = findPeerByShortID(id);
peer && app_.config().VP_REDUCE_RELAY_SQUELCH)
if (auto peer = findPeerByShortID(id); peer)
{
peer->send(makeSquelchMessage(validator, true, squelchDuration));
}
@@ -1419,6 +1417,9 @@ OverlayImpl::updateSlotAndSquelch(
std::set<Peer::id_t>&& peers,
protocol::MessageType type)
{
if (!slots_.baseSquelchReady())
return;
if (!strand_.running_in_this_thread())
return post(
strand_,
@@ -1427,7 +1428,9 @@ OverlayImpl::updateSlotAndSquelch(
});
for (auto id : peers)
slots_.updateSlotAndSquelch(key, validator, id, type);
slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
});
}
void
@@ -1437,12 +1440,17 @@ OverlayImpl::updateSlotAndSquelch(
Peer::id_t peer,
protocol::MessageType type)
{
if (!slots_.baseSquelchReady())
return;
if (!strand_.running_in_this_thread())
return post(strand_, [this, key, validator, peer, type]() {
updateSlotAndSquelch(key, validator, peer, type);
});
slots_.updateSlotAndSquelch(key, validator, peer, type);
slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
});
}
void

View File

@@ -113,20 +113,21 @@ PeerImp::PeerImp(
headers_,
FEATURE_TXRR,
app_.config().TX_REDUCE_RELAY_ENABLE))
, vpReduceRelayEnabled_(app_.config().VP_REDUCE_RELAY_ENABLE)
, ledgerReplayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_LEDGER_REPLAY,
app_.config().LEDGER_REPLAY))
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
{
JLOG(journal_.info()) << "compression enabled "
<< (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay enabled "
<< vpReduceRelayEnabled_
<< " tx reduce-relay enabled "
<< txReduceRelayEnabled_ << " on " << remote_address_
<< " " << id_;
JLOG(journal_.info())
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay base squelch enabled "
<< peerFeatureEnabled(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
<< remote_address_ << " " << id_;
}
PeerImp::~PeerImp()
@@ -1733,8 +1734,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
// receives within IDLED seconds since the message has been relayed.
if (reduceRelayReady() && relayed &&
(stopwatch().now() - *relayed) < reduce_relay::IDLED)
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
@@ -2381,10 +2381,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed. Wait WAIT_ON_BOOTUP time to let the server establish
// connections to peers.
if (reduceRelayReady() && relayed &&
(stopwatch().now() - *relayed) < reduce_relay::IDLED)
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
@@ -3005,7 +3003,7 @@ PeerImp::checkPropose(
// as part of the squelch logic.
auto haveMessage = app_.overlay().relay(
*packet, peerPos.suppressionID(), peerPos.publicKey());
if (reduceRelayReady() && !haveMessage.empty())
if (!haveMessage.empty())
overlay_.updateSlotAndSquelch(
peerPos.suppressionID(),
peerPos.publicKey(),
@@ -3040,7 +3038,7 @@ PeerImp::checkValidation(
// as part of the squelch logic.
auto haveMessage =
overlay_.relay(*packet, key, val->getSignerPublic());
if (reduceRelayReady() && !haveMessage.empty())
if (!haveMessage.empty())
{
overlay_.updateSlotAndSquelch(
key,
@@ -3506,16 +3504,6 @@ PeerImp::isHighLatency() const
return latency_ >= peerHighLatency;
}
bool
PeerImp::reduceRelayReady()
{
if (!reduceRelayReady_)
reduceRelayReady_ =
reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
reduce_relay::WAIT_ON_BOOTUP;
return vpReduceRelayEnabled_ && reduceRelayReady_;
}
void
PeerImp::Metrics::add_message(std::uint64_t bytes)
{

View File

@@ -116,7 +116,6 @@ private:
clock_type::time_point const creationTime_;
reduce_relay::Squelch<UptimeClock> squelch_;
inline static std::atomic_bool reduceRelayReady_{false};
// Notes on thread locking:
//
@@ -190,9 +189,7 @@ private:
hash_set<uint256> txQueue_;
// true if tx reduce-relay feature is enabled on the peer.
bool txReduceRelayEnabled_ = false;
// true if validation/proposal reduce-relay feature is enabled
// on the peer.
bool vpReduceRelayEnabled_ = false;
bool ledgerReplayEnabled_ = false;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
@@ -521,11 +518,6 @@ private:
handleHaveTransactions(
std::shared_ptr<protocol::TMHaveTransactions> const& m);
// Check if reduce-relay feature is enabled and
// reduce_relay::WAIT_ON_BOOTUP time passed since the start
bool
reduceRelayReady();
public:
//--------------------------------------------------------------------------
//
@@ -705,7 +697,6 @@ PeerImp::PeerImp(
headers_,
FEATURE_TXRR,
app_.config().TX_REDUCE_RELAY_ENABLE))
, vpReduceRelayEnabled_(app_.config().VP_REDUCE_RELAY_ENABLE)
, ledgerReplayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_LEDGER_REPLAY,
@@ -714,13 +705,15 @@ PeerImp::PeerImp(
{
read_buffer_.commit(boost::asio::buffer_copy(
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
JLOG(journal_.info()) << "compression enabled "
<< (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay enabled "
<< vpReduceRelayEnabled_
<< " tx reduce-relay enabled "
<< txReduceRelayEnabled_ << " on " << remote_address_
<< " " << id_;
JLOG(journal_.info())
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay base squelch enabled "
<< peerFeatureEnabled(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
<< remote_address_ << " " << id_;
}
template <class FwdIt, class>

View File

@@ -109,6 +109,8 @@ public:
squelch,
squelch_suppressed, // egress traffic amount suppressed by squelching
squelch_ignored, // the traffic amount that came from peers ignoring
// squelch messages
// TMHaveSet message:
get_set, // transaction sets we try to get
@@ -262,6 +264,7 @@ public:
{validatorlist, "validator_lists"},
{squelch, "squelch"},
{squelch_suppressed, "squelch_suppressed"},
{squelch_ignored, "squelch_ignored"},
{get_set, "set_get"},
{share_set, "set_share"},
{ld_tsc_get, "ledger_data_Transaction_Set_candidate_get"},
@@ -326,6 +329,7 @@ protected:
{validatorlist, {validatorlist}},
{squelch, {squelch}},
{squelch_suppressed, {squelch_suppressed}},
{squelch_ignored, {squelch_ignored}},
{get_set, {get_set}},
{share_set, {share_set}},
{ld_tsc_get, {ld_tsc_get}},