Merge remote-tracking branch 'upstream/develop' into develop3

This commit is contained in:
Mayukha Vadari
2025-05-29 13:05:11 -04:00
17 changed files with 503 additions and 164 deletions

View File

@@ -2273,6 +2273,8 @@ class Vault_test : public beast::unit_test::suite
env(pay(issuer, owner, asset(500)));
env.trust(asset(1000), depositor);
env(pay(issuer, depositor, asset(500)));
env.trust(asset(1000), charlie);
env(pay(issuer, charlie, asset(5)));
env.close();
auto [tx, keylet] = vault.create(
@@ -2362,7 +2364,7 @@ class Vault_test : public beast::unit_test::suite
env(credentials::create(depositor, credIssuer1, credType));
env(credentials::accept(depositor, credIssuer1, credType));
env(credentials::create(charlie, credIssuer1, credType));
env(credentials::accept(charlie, credIssuer1, credType));
// charlie's credential not accepted
env.close();
auto credSle = env.le(credKeylet);
BEAST_EXPECT(credSle != nullptr);
@@ -2376,7 +2378,7 @@ class Vault_test : public beast::unit_test::suite
tx = vault.deposit(
{.depositor = charlie, .id = keylet.key, .amount = asset(50)});
env(tx, ter{tecINSUFFICIENT_FUNDS});
env(tx, ter{tecNO_AUTH});
env.close();
}
@@ -2384,6 +2386,8 @@ class Vault_test : public beast::unit_test::suite
testcase("private vault depositor lost authorization");
env(credentials::deleteCred(
credIssuer1, depositor, credIssuer1, credType));
env(credentials::deleteCred(
credIssuer1, charlie, credIssuer1, credType));
env.close();
auto credSle = env.le(credKeylet);
BEAST_EXPECT(credSle == nullptr);
@@ -2396,18 +2400,84 @@ class Vault_test : public beast::unit_test::suite
env.close();
}
{
testcase("private vault depositor new authorization");
env(credentials::create(depositor, credIssuer2, credType));
env(credentials::accept(depositor, credIssuer2, credType));
env.close();
auto const shares = [&env, keylet = keylet, this]() -> Asset {
auto const vault = env.le(keylet);
BEAST_EXPECT(vault != nullptr);
return MPTIssue(vault->at(sfShareMPTID));
}();
auto tx = vault.deposit(
{.depositor = depositor,
.id = keylet.key,
.amount = asset(50)});
env(tx);
env.close();
{
testcase("private vault expired authorization");
uint32_t const closeTime = env.current()
->info()
.parentCloseTime.time_since_epoch()
.count();
{
auto tx0 =
credentials::create(depositor, credIssuer2, credType);
tx0[sfExpiration] = closeTime + 20;
env(tx0);
tx0 = credentials::create(charlie, credIssuer2, credType);
tx0[sfExpiration] = closeTime + 20;
env(tx0);
env.close();
env(credentials::accept(depositor, credIssuer2, credType));
env(credentials::accept(charlie, credIssuer2, credType));
env.close();
}
{
auto tx1 = vault.deposit(
{.depositor = depositor,
.id = keylet.key,
.amount = asset(50)});
env(tx1);
env.close();
auto const tokenKeylet = keylet::mptoken(
shares.get<MPTIssue>().getMptID(), depositor.id());
BEAST_EXPECT(env.le(tokenKeylet) != nullptr);
}
{
// time advance
env.close();
env.close();
env.close();
auto const credsKeylet =
credentials::keylet(depositor, credIssuer2, credType);
BEAST_EXPECT(env.le(credsKeylet) != nullptr);
auto tx2 = vault.deposit(
{.depositor = depositor,
.id = keylet.key,
.amount = asset(1)});
env(tx2, ter{tecEXPIRED});
env.close();
BEAST_EXPECT(env.le(credsKeylet) == nullptr);
}
{
auto const credsKeylet =
credentials::keylet(charlie, credIssuer2, credType);
BEAST_EXPECT(env.le(credsKeylet) != nullptr);
auto const tokenKeylet = keylet::mptoken(
shares.get<MPTIssue>().getMptID(), charlie.id());
BEAST_EXPECT(env.le(tokenKeylet) == nullptr);
auto tx3 = vault.deposit(
{.depositor = charlie,
.id = keylet.key,
.amount = asset(2)});
env(tx3, ter{tecEXPIRED});
env.close();
BEAST_EXPECT(env.le(credsKeylet) == nullptr);
BEAST_EXPECT(env.le(tokenKeylet) == nullptr);
}
}
{

View File

@@ -589,13 +589,16 @@ public:
}
/** Return metadata for the last JTx.
Effects:
The open ledger is closed as if by a call
to close(). The metadata for the last
transaction ID, if any, is returned.
*/
*
* NOTE: this has a side effect of closing the open ledger.
* The ledger will only be closed if it includes transactions.
*
* Effects:
*
* The open ledger is closed as if by a call
* to close(). The metadata for the last
* transaction ID, if any, is returned.
*/
std::shared_ptr<STObject const>
meta();

View File

@@ -446,7 +446,12 @@ Env::postconditions(
std::shared_ptr<STObject const>
Env::meta()
{
close();
if (current()->txCount() != 0)
{
// close the ledger if it has not already been closed
// (metadata is not finalized until the ledger is closed)
close();
}
auto const item = closed()->txRead(txid_);
return item.second;
}

View File

@@ -473,17 +473,14 @@ public:
Config c;
std::stringstream str;
str << "[reduce_relay]\n"
<< "vp_enable=1\n"
<< "vp_squelch=1\n"
<< "vp_base_squelch_enable=1\n"
<< "[compression]\n"
<< enable << "\n";
c.loadFromString(str.str());
auto env = std::make_shared<jtx::Env>(*this);
env->app().config().COMPRESSION = c.COMPRESSION;
env->app().config().VP_REDUCE_RELAY_ENABLE =
c.VP_REDUCE_RELAY_ENABLE;
env->app().config().VP_REDUCE_RELAY_SQUELCH =
c.VP_REDUCE_RELAY_SQUELCH;
env->app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE;
return env;
};
auto handshake = [&](int outboundEnable, int inboundEnable) {
@@ -496,7 +493,7 @@ public:
env->app().config().COMPRESSION,
false,
env->app().config().TX_REDUCE_RELAY_ENABLE,
env->app().config().VP_REDUCE_RELAY_ENABLE);
env->app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
http_request_type http_request;
http_request.version(request.version());
http_request.base() = request.base();

View File

@@ -17,6 +17,7 @@
*/
//==============================================================================
#include <test/jtx.h>
#include <test/jtx/Env.h>
#include <xrpld/overlay/Message.h>
@@ -32,6 +33,8 @@
#include <boost/thread.hpp>
#include <chrono>
#include <iostream>
#include <numeric>
#include <optional>
@@ -517,7 +520,8 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler
public:
using id_t = Peer::id_t;
using clock_type = ManualClock;
OverlaySim(Application& app) : slots_(app.logs(), *this), logs_(app.logs())
OverlaySim(Application& app)
: slots_(app.logs(), *this, app.config()), logs_(app.logs())
{
}
@@ -985,7 +989,10 @@ protected:
network_.overlay().isCountingState(validator);
BEAST_EXPECT(
countingState == false &&
selected.size() == reduce_relay::MAX_SELECTED_PEERS);
selected.size() ==
env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
}
// Trigger Link Down or Peer Disconnect event
@@ -1187,7 +1194,10 @@ protected:
{
BEAST_EXPECT(
squelched ==
MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS);
MAX_PEERS -
env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
n++;
}
},
@@ -1196,7 +1206,9 @@ protected:
purge,
resetClock);
auto selected = network_.overlay().getSelected(network_.validator(0));
BEAST_EXPECT(selected.size() == reduce_relay::MAX_SELECTED_PEERS);
BEAST_EXPECT(
selected.size() ==
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
BEAST_EXPECT(n == 1); // only one selection round
auto res = checkCounting(network_.validator(0), false);
BEAST_EXPECT(res);
@@ -1260,7 +1272,11 @@ protected:
unsquelched++;
});
BEAST_EXPECT(
unsquelched == MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS);
unsquelched ==
MAX_PEERS -
env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
BEAST_EXPECT(checkCounting(network_.validator(0), true));
});
}
@@ -1281,7 +1297,11 @@ protected:
});
auto peers = network_.overlay().getPeers(network_.validator(0));
BEAST_EXPECT(
unsquelched == MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS);
unsquelched ==
MAX_PEERS -
env_.app()
.config()
.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
BEAST_EXPECT(checkCounting(network_.validator(0), true));
});
}
@@ -1313,42 +1333,164 @@ protected:
void
testConfig(bool log)
{
doTest("Config Test", log, [&](bool log) {
doTest("Test Config - squelch enabled (legacy)", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_enable=1
vp_squelch=1
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENABLE == true);
BEAST_EXPECT(c.VP_REDUCE_RELAY_SQUELCH == true);
BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == true);
});
doTest("Test Config - squelch disabled (legacy)", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_enable=0
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false);
Config c1;
toLoad = (R"rippleConfig(
toLoad = R"rippleConfig(
[reduce_relay]
vp_enable=0
vp_squelch=0
)rippleConfig");
)rippleConfig";
c1.loadFromString(toLoad);
BEAST_EXPECT(c1.VP_REDUCE_RELAY_ENABLE == false);
BEAST_EXPECT(c1.VP_REDUCE_RELAY_SQUELCH == false);
BEAST_EXPECT(c1.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false);
});
doTest("Test Config - squelch enabled", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_base_squelch_enable=1
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == true);
});
doTest("Test Config - squelch disabled", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_base_squelch_enable=0
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false);
});
doTest("Test Config - legacy and new", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_base_squelch_enable=0
vp_enable=0
)rippleConfig");
std::string error;
auto const expectedError =
"Invalid reduce_relay"
" cannot specify both vp_base_squelch_enable and vp_enable "
"options. "
"vp_enable was deprecated and replaced by "
"vp_base_squelch_enable";
try
{
c.loadFromString(toLoad);
}
catch (std::runtime_error& e)
{
error = e.what();
}
BEAST_EXPECT(error == expectedError);
});
doTest("Test Config - max selected peers", log, [&](bool log) {
Config c;
std::string toLoad(R"rippleConfig(
[reduce_relay]
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS == 5);
Config c1;
toLoad = R"rippleConfig(
[reduce_relay]
vp_base_squelch_max_selected_peers=6
)rippleConfig";
c1.loadFromString(toLoad);
BEAST_EXPECT(c1.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS == 6);
Config c2;
toLoad = R"rippleConfig(
[reduce_relay]
vp_enabled=1
vp_squelched=1
vp_base_squelch_max_selected_peers=2
)rippleConfig";
c2.loadFromString(toLoad);
BEAST_EXPECT(c2.VP_REDUCE_RELAY_ENABLE == false);
BEAST_EXPECT(c2.VP_REDUCE_RELAY_SQUELCH == false);
std::string error;
auto const expectedError =
"Invalid reduce_relay"
" vp_base_squelch_max_selected_peers must be "
"greater than or equal to 3";
try
{
c2.loadFromString(toLoad);
}
catch (std::runtime_error& e)
{
error = e.what();
}
BEAST_EXPECT(error == expectedError);
});
}
void
testBaseSquelchReady(bool log)
{
doTest("BaseSquelchReady", log, [&](bool log) {
ManualClock::reset();
auto createSlots = [&](bool baseSquelchEnabled)
-> reduce_relay::Slots<ManualClock> {
env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
baseSquelchEnabled;
return reduce_relay::Slots<ManualClock>(
env_.app().logs(), network_.overlay(), env_.app().config());
};
// base squelching must not be ready if squelching is disabled
BEAST_EXPECT(!createSlots(false).baseSquelchReady());
// base squelch must not be ready as not enough time passed from
// bootup
BEAST_EXPECT(!createSlots(true).baseSquelchReady());
ManualClock::advance(reduce_relay::WAIT_ON_BOOTUP + minutes{1});
// base squelch enabled and bootup time passed
BEAST_EXPECT(createSlots(true).baseSquelchReady());
// even if time passed, base squelching must not be ready if turned
// off in the config
BEAST_EXPECT(!createSlots(false).baseSquelchReady());
});
}
@@ -1424,7 +1566,7 @@ vp_squelched=1
auto run = [&](int npeers) {
handler.maxDuration_ = 0;
reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), handler);
env_.app().logs(), handler, env_.app().config());
// 1st message from a new peer switches the slot
// to counting state and resets the counts of all peers +
// MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold
@@ -1502,14 +1644,12 @@ vp_squelched=1
std::stringstream str;
str << "[reduce_relay]\n"
<< "vp_enable=" << enable << "\n"
<< "vp_squelch=" << enable << "\n"
<< "[compression]\n"
<< "1\n";
c.loadFromString(str.str());
env_.app().config().VP_REDUCE_RELAY_ENABLE =
c.VP_REDUCE_RELAY_ENABLE;
env_.app().config().VP_REDUCE_RELAY_SQUELCH =
c.VP_REDUCE_RELAY_SQUELCH;
env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE;
env_.app().config().COMPRESSION = c.COMPRESSION;
};
auto handshake = [&](int outboundEnable, int inboundEnable) {
@@ -1522,7 +1662,7 @@ vp_squelched=1
env_.app().config().COMPRESSION,
false,
env_.app().config().TX_REDUCE_RELAY_ENABLE,
env_.app().config().VP_REDUCE_RELAY_ENABLE);
env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
http_request_type http_request;
http_request.version(request.version());
http_request.base() = request.base();
@@ -1562,7 +1702,13 @@ vp_squelched=1
Network network_;
public:
reduce_relay_test() : env_(*this), network_(env_.app())
reduce_relay_test()
: env_(*this, jtx::envconfig([](std::unique_ptr<Config> cfg) {
cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = true;
cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6;
return cfg;
}))
, network_(env_.app())
{
}
@@ -1581,6 +1727,7 @@ public:
testInternalHashRouter(log);
testRandomSquelch(log);
testHandshake(log);
testBaseSquelchReady(log);
}
};

View File

@@ -336,9 +336,7 @@ verifyValidDomain(
credentials.push_back(keyletCredential.key);
}
// Result intentionally ignored.
[[maybe_unused]] bool _ = credentials::removeExpired(view, credentials, j);
bool const foundExpired = credentials::removeExpired(view, credentials, j);
for (auto const& h : credentials)
{
auto sleCredential = view.read(keylet::credential(h));
@@ -349,7 +347,7 @@ verifyValidDomain(
return tesSUCCESS;
}
return tecNO_PERMISSION;
return foundExpired ? tecEXPIRED : tecNO_PERMISSION;
}
TER

View File

@@ -251,19 +251,18 @@ public:
// size, but we allow admins to explicitly set it in the config.
std::optional<int> SWEEP_INTERVAL;
// Reduce-relay - these parameters are experimental.
// Enable reduce-relay features
// Validation/proposal reduce-relay feature
bool VP_REDUCE_RELAY_ENABLE = false;
// Send squelch message to peers. Generally this config should
// have the same value as VP_REDUCE_RELAY_ENABLE. It can be
// used for testing the feature's function without
// affecting the message relaying. To use it for testing,
// set it to false and set VP_REDUCE_RELAY_ENABLE to true.
// Squelch messages will not be sent to the peers in this case.
// Set log level to debug so that the feature function can be
// analyzed.
bool VP_REDUCE_RELAY_SQUELCH = false;
// Reduce-relay - Experimental parameters to control p2p routing algorithms
// Enable base squelching of duplicate validation/proposal messages
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = false;
///////////////////// !!TEMPORARY CODE BLOCK!! ////////////////////////
// Temporary squelching config for the peers selected as a source of //
// validator messages. The config must be removed once squelching is //
// made the default routing algorithm //
std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 5;
///////////////// END OF TEMPORARY CODE BLOCK /////////////////////
// Transaction reduce-relay feature
bool TX_REDUCE_RELAY_ENABLE = false;
// If tx reduce-relay feature is disabled

View File

@@ -737,8 +737,44 @@ Config::loadFromString(std::string const& fileContents)
if (exists(SECTION_REDUCE_RELAY))
{
auto sec = section(SECTION_REDUCE_RELAY);
VP_REDUCE_RELAY_ENABLE = sec.value_or("vp_enable", false);
VP_REDUCE_RELAY_SQUELCH = sec.value_or("vp_squelch", false);
///////////////////// !!TEMPORARY CODE BLOCK!! ////////////////////////
// vp_enable config option is deprecated by vp_base_squelch_enable //
// This option is kept for backwards compatibility. When squelching //
// is the default algorithm, it must be replaced with: //
// VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = //
// sec.value_or("vp_base_squelch_enable", true); //
if (sec.exists("vp_base_squelch_enable") && sec.exists("vp_enable"))
Throw<std::runtime_error>(
"Invalid " SECTION_REDUCE_RELAY
" cannot specify both vp_base_squelch_enable and vp_enable "
"options. "
"vp_enable was deprecated and replaced by "
"vp_base_squelch_enable");
if (sec.exists("vp_base_squelch_enable"))
VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
sec.value_or("vp_base_squelch_enable", false);
else if (sec.exists("vp_enable"))
VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
sec.value_or("vp_enable", false);
else
VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = false;
///////////////// !!END OF TEMPORARY CODE BLOCK!! /////////////////////
///////////////////// !!TEMPORARY CODE BLOCK!! ///////////////////////
// Temporary squelching config for the peers selected as a source of //
// validator messages. The config must be removed once squelching is //
// made the default routing algorithm. //
VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS =
sec.value_or("vp_base_squelch_max_selected_peers", 5);
if (VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS < 3)
Throw<std::runtime_error>(
"Invalid " SECTION_REDUCE_RELAY
" vp_base_squelch_max_selected_peers must be "
"greater than or equal to 3");
///////////////// !!END OF TEMPORARY CODE BLOCK!! /////////////////////
TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false);
TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false);
TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20);
@@ -747,9 +783,9 @@ Config::loadFromString(std::string const& fileContents)
TX_REDUCE_RELAY_MIN_PEERS < 10)
Throw<std::runtime_error>(
"Invalid " SECTION_REDUCE_RELAY
", tx_min_peers must be greater or equal to 10"
", tx_relay_percentage must be greater or equal to 10 "
"and less or equal to 100");
", tx_min_peers must be greater than or equal to 10"
", tx_relay_percentage must be greater than or equal to 10 "
"and less than or equal to 100");
}
if (getSingleSection(secConfig, SECTION_MAX_TRANSACTIONS, strTemp, j_))

View File

@@ -2389,8 +2389,19 @@ enforceMPTokenAuthorization(
auto const keylet = keylet::mptoken(mptIssuanceID, account);
auto const sleToken = view.read(keylet); // NOTE: might be null
auto const maybeDomainID = sleIssuance->at(~sfDomainID);
bool const authorizedByDomain = maybeDomainID.has_value() &&
verifyValidDomain(view, account, *maybeDomainID, j) == tesSUCCESS;
bool expired = false;
bool const authorizedByDomain = [&]() -> bool {
// NOTE: defensive here, shuld be checked in preclaim
if (!maybeDomainID.has_value())
return false; // LCOV_EXCL_LINE
auto const ter = verifyValidDomain(view, account, *maybeDomainID, j);
if (isTesSuccess(ter))
return true;
if (ter == tecEXPIRED)
expired = true;
return false;
}();
if (!authorizedByDomain && sleToken == nullptr)
{
@@ -2401,14 +2412,14 @@ enforceMPTokenAuthorization(
// 3. Account has all expired credentials (deleted in verifyValidDomain)
//
// Either way, return tecNO_AUTH and there is nothing else to do
return tecNO_AUTH;
return expired ? tecEXPIRED : tecNO_AUTH;
}
else if (!authorizedByDomain && maybeDomainID.has_value())
{
// Found an MPToken but the account is not authorized and we expect
// it to have been authorized by the domain. This could be because the
// credentials used to create the MPToken have expired or been deleted.
return tecNO_AUTH;
return expired ? tecEXPIRED : tecNO_AUTH;
}
else if (!authorizedByDomain)
{

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED
#define RIPPLE_OVERLAY_SLOT_H_INCLUDED
#include <xrpld/core/Config.h>
#include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/ReduceRelayCommon.h>
@@ -32,7 +33,6 @@
#include <xrpl/protocol/messages.h>
#include <algorithm>
#include <memory>
#include <optional>
#include <set>
#include <tuple>
@@ -109,16 +109,25 @@ private:
using id_t = Peer::id_t;
using time_point = typename clock_type::time_point;
// a callback to report ignored squelches
using ignored_squelch_callback = std::function<void()>;
/** Constructor
* @param journal Journal for logging
* @param handler Squelch/Unsquelch implementation
* @param maxSelectedPeers the maximum number of peers to be selected as
* validator message source
*/
Slot(SquelchHandler const& handler, beast::Journal journal)
Slot(
SquelchHandler const& handler,
beast::Journal journal,
uint16_t maxSelectedPeers)
: reachedThreshold_(0)
, lastSelected_(clock_type::now())
, state_(SlotState::Counting)
, handler_(handler)
, journal_(journal)
, maxSelectedPeers_(maxSelectedPeers)
{
}
@@ -129,7 +138,7 @@ private:
* slot's state to Counting. If the number of messages for the peer is >
* MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the
* number of considered peers who reached MAX_MESSAGE_THRESHOLD is
* MAX_SELECTED_PEERS then randomly select MAX_SELECTED_PEERS from
* maxSelectedPeers_ then randomly select maxSelectedPeers_ from
* considered peers, and call squelch handler for each peer, which is not
* selected and not already in Squelched state. Set the state for those
* peers to Squelched and reset the count of all peers. Set slot's state to
@@ -139,9 +148,14 @@ private:
* @param id Peer id which received the message
* @param type Message type (Validation and Propose Set only,
* others are ignored, future use)
* @param callback A callback to report ignored squelches
*/
void
update(PublicKey const& validator, id_t id, protocol::MessageType type);
update(
PublicKey const& validator,
id_t id,
protocol::MessageType type,
ignored_squelch_callback callback);
/** Handle peer deletion when a peer disconnects.
* If the peer is in Selected state then
@@ -223,17 +237,26 @@ private:
time_point expire; // squelch expiration time
time_point lastMessage; // time last message received
};
std::unordered_map<id_t, PeerInfo> peers_; // peer's data
// pool of peers considered as the source of messages
// from validator - peers that reached MIN_MESSAGE_THRESHOLD
std::unordered_set<id_t> considered_;
// number of peers that reached MAX_MESSAGE_THRESHOLD
std::uint16_t reachedThreshold_;
// last time peers were selected, used to age the slot
typename clock_type::time_point lastSelected_;
SlotState state_; // slot's state
SquelchHandler const& handler_; // squelch/unsquelch handler
beast::Journal const journal_; // logging
// the maximum number of peers that should be selected as a validator
// message source
uint16_t const maxSelectedPeers_;
};
template <typename clock_type>
@@ -264,7 +287,8 @@ void
Slot<clock_type>::update(
PublicKey const& validator,
id_t id,
protocol::MessageType type)
protocol::MessageType type,
ignored_squelch_callback callback)
{
using namespace std::chrono;
auto now = clock_type::now();
@@ -302,6 +326,10 @@ Slot<clock_type>::update(
peer.lastMessage = now;
// report if we received a message from a squelched peer
if (peer.state == PeerState::Squelched)
callback();
if (state_ != SlotState::Counting || peer.state == PeerState::Squelched)
return;
@@ -319,17 +347,17 @@ Slot<clock_type>::update(
return;
}
if (reachedThreshold_ == MAX_SELECTED_PEERS)
if (reachedThreshold_ == maxSelectedPeers_)
{
// Randomly select MAX_SELECTED_PEERS peers from considered.
// Randomly select maxSelectedPeers_ peers from considered.
// Exclude peers that have been idling > IDLED -
// it's possible that deleteIdlePeer() has not been called yet.
// If number of remaining peers != MAX_SELECTED_PEERS
// If number of remaining peers != maxSelectedPeers_
// then reset the Counting state and let deleteIdlePeer() handle
// idled peers.
std::unordered_set<id_t> selected;
auto const consideredPoolSize = considered_.size();
while (selected.size() != MAX_SELECTED_PEERS && considered_.size() != 0)
while (selected.size() != maxSelectedPeers_ && considered_.size() != 0)
{
auto i =
considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1);
@@ -347,7 +375,7 @@ Slot<clock_type>::update(
selected.insert(id);
}
if (selected.size() != MAX_SELECTED_PEERS)
if (selected.size() != maxSelectedPeers_)
{
JLOG(journal_.trace())
<< "update: selection failed " << Slice(validator) << " " << id;
@@ -364,7 +392,7 @@ Slot<clock_type>::update(
<< *std::next(s, 1) << " " << *std::next(s, 2);
XRPL_ASSERT(
peers_.size() >= MAX_SELECTED_PEERS,
peers_.size() >= maxSelectedPeers_,
"ripple::reduce_relay::Slot::update : minimum peers");
// squelch peers which are not selected and
@@ -382,7 +410,7 @@ Slot<clock_type>::update(
str << k << " ";
v.state = PeerState::Squelched;
std::chrono::seconds duration =
getSquelchDuration(peers_.size() - MAX_SELECTED_PEERS);
getSquelchDuration(peers_.size() - maxSelectedPeers_);
v.expire = now + duration;
handler_.squelch(validator, k, duration.count());
}
@@ -544,15 +572,41 @@ class Slots final
public:
/**
* @param app Applicaton reference
* @param logs reference to the logger
* @param handler Squelch/unsquelch implementation
* @param config reference to the global config
*/
Slots(Logs& logs, SquelchHandler const& handler)
: handler_(handler), logs_(logs), journal_(logs.journal("Slots"))
Slots(Logs& logs, SquelchHandler const& handler, Config const& config)
: handler_(handler)
, logs_(logs)
, journal_(logs.journal("Slots"))
, baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
, maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS)
{
}
~Slots() = default;
/** Calls Slot::update of Slot associated with the validator.
/** Check if base squelching feature is enabled and ready */
bool
baseSquelchReady()
{
return baseSquelchEnabled_ && reduceRelayReady();
}
/** Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup */
bool
reduceRelayReady()
{
if (!reduceRelayReady_)
reduceRelayReady_ =
reduce_relay::epoch<std::chrono::minutes>(clock_type::now()) >
reduce_relay::WAIT_ON_BOOTUP;
return reduceRelayReady_;
}
/** Calls Slot::update of Slot associated with the validator, with a noop
* callback.
* @param key Message's hash
* @param validator Validator's public key
* @param id Peer's id which received the message
@@ -563,7 +617,25 @@ public:
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type);
protocol::MessageType type)
{
updateSlotAndSquelch(key, validator, id, type, []() {});
}
/** Calls Slot::update of Slot associated with the validator.
* @param key Message's hash
* @param validator Validator's public key
* @param id Peer's id which received the message
* @param type Received protocol message type
* @param callback A callback to report ignored validations
*/
void
updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type,
typename Slot<clock_type>::ignored_squelch_callback callback);
/** Check if peers stopped relaying messages
* and if slots stopped receiving messages from the validator.
@@ -651,10 +723,16 @@ private:
bool
addPeerMessage(uint256 const& key, id_t id);
std::atomic_bool reduceRelayReady_{false};
hash_map<PublicKey, Slot<clock_type>> slots_;
SquelchHandler const& handler_; // squelch/unsquelch handler
Logs& logs_;
beast::Journal const journal_;
bool const baseSquelchEnabled_;
uint16_t const maxSelectedPeers_;
// Maintain aged container of message/peers. This is required
// to discard duplicate message from the same peer. A message
// is aged after IDLED seconds. A message received IDLED seconds
@@ -702,7 +780,8 @@ Slots<clock_type>::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type)
protocol::MessageType type,
typename Slot<clock_type>::ignored_squelch_callback callback)
{
if (!addPeerMessage(key, id))
return;
@@ -712,15 +791,17 @@ Slots<clock_type>::updateSlotAndSquelch(
{
JLOG(journal_.trace())
<< "updateSlotAndSquelch: new slot " << Slice(validator);
auto it = slots_
.emplace(std::make_pair(
validator,
Slot<clock_type>(handler_, logs_.journal("Slot"))))
.first;
it->second.update(validator, id, type);
auto it =
slots_
.emplace(std::make_pair(
validator,
Slot<clock_type>(
handler_, logs_.journal("Slot"), maxSelectedPeers_)))
.first;
it->second.update(validator, id, type, callback);
}
else
it->second.update(validator, id, type);
it->second.update(validator, id, type, callback);
}
template <typename clock_type>

View File

@@ -209,7 +209,7 @@ ConnectAttempt::onHandshake(error_code ec)
app_.config().COMPRESSION,
app_.config().LEDGER_REPLAY,
app_.config().TX_REDUCE_RELAY_ENABLE,
app_.config().VP_REDUCE_RELAY_ENABLE);
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
buildHandshake(
req_,

View File

@@ -414,7 +414,7 @@ makeResponse(
app.config().COMPRESSION,
app.config().LEDGER_REPLAY,
app.config().TX_REDUCE_RELAY_ENABLE,
app.config().VP_REDUCE_RELAY_ENABLE));
app.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE));
buildHandshake(resp, sharedValue, networkID, public_ip, remote_ip, app);

View File

@@ -139,7 +139,7 @@ makeResponse(
// compression feature
static constexpr char FEATURE_COMPR[] = "compr";
// validation/proposal reduce-relay feature
// validation/proposal reduce-relay base squelch feature
static constexpr char FEATURE_VPRR[] = "vprr";
// transaction reduce-relay feature
static constexpr char FEATURE_TXRR[] = "txrr";
@@ -221,7 +221,7 @@ peerFeatureEnabled(
@param txReduceRelayEnabled if true then transaction reduce-relay feature is
enabled
@param vpReduceRelayEnabled if true then validation/proposal reduce-relay
feature is enabled
base squelch feature is enabled
@return X-Protocol-Ctl header value
*/
std::string
@@ -241,8 +241,7 @@ makeFeaturesRequestHeader(
@param txReduceRelayEnabled if true then transaction reduce-relay feature is
enabled
@param vpReduceRelayEnabled if true then validation/proposal reduce-relay
feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
base squelch feature is enabled
@return X-Protocol-Ctl header value
*/
std::string

View File

@@ -142,7 +142,7 @@ OverlayImpl::OverlayImpl(
, m_resolver(resolver)
, next_id_(1)
, timer_count_(0)
, slots_(app.logs(), *this)
, slots_(app.logs(), *this, app.config())
, m_stats(
std::bind(&OverlayImpl::collect_metrics, this),
collector,
@@ -1390,8 +1390,7 @@ makeSquelchMessage(
void
OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const
{
if (auto peer = findPeerByShortID(id);
peer && app_.config().VP_REDUCE_RELAY_SQUELCH)
if (auto peer = findPeerByShortID(id); peer)
{
// optimize - multiple message with different
// validator might be sent to the same peer
@@ -1405,8 +1404,7 @@ OverlayImpl::squelch(
Peer::id_t id,
uint32_t squelchDuration) const
{
if (auto peer = findPeerByShortID(id);
peer && app_.config().VP_REDUCE_RELAY_SQUELCH)
if (auto peer = findPeerByShortID(id); peer)
{
peer->send(makeSquelchMessage(validator, true, squelchDuration));
}
@@ -1419,6 +1417,9 @@ OverlayImpl::updateSlotAndSquelch(
std::set<Peer::id_t>&& peers,
protocol::MessageType type)
{
if (!slots_.baseSquelchReady())
return;
if (!strand_.running_in_this_thread())
return post(
strand_,
@@ -1427,7 +1428,9 @@ OverlayImpl::updateSlotAndSquelch(
});
for (auto id : peers)
slots_.updateSlotAndSquelch(key, validator, id, type);
slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
});
}
void
@@ -1437,12 +1440,17 @@ OverlayImpl::updateSlotAndSquelch(
Peer::id_t peer,
protocol::MessageType type)
{
if (!slots_.baseSquelchReady())
return;
if (!strand_.running_in_this_thread())
return post(strand_, [this, key, validator, peer, type]() {
updateSlotAndSquelch(key, validator, peer, type);
});
slots_.updateSlotAndSquelch(key, validator, peer, type);
slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
});
}
void

View File

@@ -113,20 +113,21 @@ PeerImp::PeerImp(
headers_,
FEATURE_TXRR,
app_.config().TX_REDUCE_RELAY_ENABLE))
, vpReduceRelayEnabled_(app_.config().VP_REDUCE_RELAY_ENABLE)
, ledgerReplayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_LEDGER_REPLAY,
app_.config().LEDGER_REPLAY))
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
{
JLOG(journal_.info()) << "compression enabled "
<< (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay enabled "
<< vpReduceRelayEnabled_
<< " tx reduce-relay enabled "
<< txReduceRelayEnabled_ << " on " << remote_address_
<< " " << id_;
JLOG(journal_.info())
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay base squelch enabled "
<< peerFeatureEnabled(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
<< remote_address_ << " " << id_;
}
PeerImp::~PeerImp()
@@ -1733,8 +1734,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
// receives within IDLED seconds since the message has been relayed.
if (reduceRelayReady() && relayed &&
(stopwatch().now() - *relayed) < reduce_relay::IDLED)
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
@@ -2381,10 +2381,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed. Wait WAIT_ON_BOOTUP time to let the server establish
// connections to peers.
if (reduceRelayReady() && relayed &&
(stopwatch().now() - *relayed) < reduce_relay::IDLED)
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
@@ -3005,7 +3003,7 @@ PeerImp::checkPropose(
// as part of the squelch logic.
auto haveMessage = app_.overlay().relay(
*packet, peerPos.suppressionID(), peerPos.publicKey());
if (reduceRelayReady() && !haveMessage.empty())
if (!haveMessage.empty())
overlay_.updateSlotAndSquelch(
peerPos.suppressionID(),
peerPos.publicKey(),
@@ -3040,7 +3038,7 @@ PeerImp::checkValidation(
// as part of the squelch logic.
auto haveMessage =
overlay_.relay(*packet, key, val->getSignerPublic());
if (reduceRelayReady() && !haveMessage.empty())
if (!haveMessage.empty())
{
overlay_.updateSlotAndSquelch(
key,
@@ -3506,16 +3504,6 @@ PeerImp::isHighLatency() const
return latency_ >= peerHighLatency;
}
bool
PeerImp::reduceRelayReady()
{
if (!reduceRelayReady_)
reduceRelayReady_ =
reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
reduce_relay::WAIT_ON_BOOTUP;
return vpReduceRelayEnabled_ && reduceRelayReady_;
}
void
PeerImp::Metrics::add_message(std::uint64_t bytes)
{

View File

@@ -116,7 +116,6 @@ private:
clock_type::time_point const creationTime_;
reduce_relay::Squelch<UptimeClock> squelch_;
inline static std::atomic_bool reduceRelayReady_{false};
// Notes on thread locking:
//
@@ -190,9 +189,7 @@ private:
hash_set<uint256> txQueue_;
// true if tx reduce-relay feature is enabled on the peer.
bool txReduceRelayEnabled_ = false;
// true if validation/proposal reduce-relay feature is enabled
// on the peer.
bool vpReduceRelayEnabled_ = false;
bool ledgerReplayEnabled_ = false;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
@@ -521,11 +518,6 @@ private:
handleHaveTransactions(
std::shared_ptr<protocol::TMHaveTransactions> const& m);
// Check if reduce-relay feature is enabled and
// reduce_relay::WAIT_ON_BOOTUP time passed since the start
bool
reduceRelayReady();
public:
//--------------------------------------------------------------------------
//
@@ -705,7 +697,6 @@ PeerImp::PeerImp(
headers_,
FEATURE_TXRR,
app_.config().TX_REDUCE_RELAY_ENABLE))
, vpReduceRelayEnabled_(app_.config().VP_REDUCE_RELAY_ENABLE)
, ledgerReplayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_LEDGER_REPLAY,
@@ -714,13 +705,15 @@ PeerImp::PeerImp(
{
read_buffer_.commit(boost::asio::buffer_copy(
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
JLOG(journal_.info()) << "compression enabled "
<< (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay enabled "
<< vpReduceRelayEnabled_
<< " tx reduce-relay enabled "
<< txReduceRelayEnabled_ << " on " << remote_address_
<< " " << id_;
JLOG(journal_.info())
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay base squelch enabled "
<< peerFeatureEnabled(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
<< remote_address_ << " " << id_;
}
template <class FwdIt, class>

View File

@@ -109,6 +109,8 @@ public:
squelch,
squelch_suppressed, // egress traffic amount suppressed by squelching
squelch_ignored, // the traffic amount that came from peers ignoring
// squelch messages
// TMHaveSet message:
get_set, // transaction sets we try to get
@@ -262,6 +264,7 @@ public:
{validatorlist, "validator_lists"},
{squelch, "squelch"},
{squelch_suppressed, "squelch_suppressed"},
{squelch_ignored, "squelch_ignored"},
{get_set, "set_get"},
{share_set, "set_share"},
{ld_tsc_get, "ledger_data_Transaction_Set_candidate_get"},
@@ -326,6 +329,7 @@ protected:
{validatorlist, {validatorlist}},
{squelch, {squelch}},
{squelch_suppressed, {squelch_suppressed}},
{squelch_ignored, {squelch_ignored}},
{get_set, {get_set}},
{share_set, {share_set}},
{ld_tsc_get, {ld_tsc_get}},