diff --git a/src/test/overlay/compression_test.cpp b/src/test/overlay/compression_test.cpp index 76c38fd59..4ecbe7f23 100644 --- a/src/test/overlay/compression_test.cpp +++ b/src/test/overlay/compression_test.cpp @@ -473,17 +473,14 @@ public: Config c; std::stringstream str; str << "[reduce_relay]\n" - << "vp_enable=1\n" - << "vp_squelch=1\n" + << "vp_base_squelch_enable=1\n" << "[compression]\n" << enable << "\n"; c.loadFromString(str.str()); auto env = std::make_shared(*this); env->app().config().COMPRESSION = c.COMPRESSION; - env->app().config().VP_REDUCE_RELAY_ENABLE = - c.VP_REDUCE_RELAY_ENABLE; - env->app().config().VP_REDUCE_RELAY_SQUELCH = - c.VP_REDUCE_RELAY_SQUELCH; + env->app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = + c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE; return env; }; auto handshake = [&](int outboundEnable, int inboundEnable) { @@ -496,7 +493,7 @@ public: env->app().config().COMPRESSION, false, env->app().config().TX_REDUCE_RELAY_ENABLE, - env->app().config().VP_REDUCE_RELAY_ENABLE); + env->app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE); http_request_type http_request; http_request.version(request.version()); http_request.base() = request.base(); diff --git a/src/test/overlay/reduce_relay_test.cpp b/src/test/overlay/reduce_relay_test.cpp index 18aebbe19..a8aafcfa0 100644 --- a/src/test/overlay/reduce_relay_test.cpp +++ b/src/test/overlay/reduce_relay_test.cpp @@ -17,6 +17,7 @@ */ //============================================================================== +#include #include #include @@ -32,6 +33,8 @@ #include +#include +#include #include #include @@ -517,7 +520,8 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler public: using id_t = Peer::id_t; using clock_type = ManualClock; - OverlaySim(Application& app) : slots_(app.logs(), *this), logs_(app.logs()) + OverlaySim(Application& app) + : slots_(app.logs(), *this, app.config()), logs_(app.logs()) { } @@ -986,7 +990,10 @@ protected: network_.overlay().isCountingState(validator); BEAST_EXPECT( countingState == false && - selected.size() == reduce_relay::MAX_SELECTED_PEERS); + selected.size() == + env_.app() + .config() + .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS); } // Trigger Link Down or Peer Disconnect event @@ -1188,7 +1195,10 @@ protected: { BEAST_EXPECT( squelched == - MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS); + MAX_PEERS - + env_.app() + .config() + .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS); n++; } }, @@ -1197,7 +1207,9 @@ protected: purge, resetClock); auto selected = network_.overlay().getSelected(network_.validator(0)); - BEAST_EXPECT(selected.size() == reduce_relay::MAX_SELECTED_PEERS); + BEAST_EXPECT( + selected.size() == + env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS); BEAST_EXPECT(n == 1); // only one selection round auto res = checkCounting(network_.validator(0), false); BEAST_EXPECT(res); @@ -1261,7 +1273,11 @@ protected: unsquelched++; }); BEAST_EXPECT( - unsquelched == MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS); + unsquelched == + MAX_PEERS - + env_.app() + .config() + .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS); BEAST_EXPECT(checkCounting(network_.validator(0), true)); }); } @@ -1282,7 +1298,11 @@ protected: }); auto peers = network_.overlay().getPeers(network_.validator(0)); BEAST_EXPECT( - unsquelched == MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS); + unsquelched == + MAX_PEERS - + env_.app() + .config() + .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS); BEAST_EXPECT(checkCounting(network_.validator(0), true)); }); } @@ -1314,42 +1334,164 @@ protected: void testConfig(bool log) { - doTest("Config Test", log, [&](bool log) { + doTest("Test Config - squelch enabled (legacy)", log, [&](bool log) { Config c; std::string toLoad(R"rippleConfig( [reduce_relay] vp_enable=1 -vp_squelch=1 )rippleConfig"); c.loadFromString(toLoad); - BEAST_EXPECT(c.VP_REDUCE_RELAY_ENABLE == true); - BEAST_EXPECT(c.VP_REDUCE_RELAY_SQUELCH == true); + BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == true); + }); + + doTest("Test Config - squelch disabled (legacy)", log, [&](bool log) { + Config c; + + std::string toLoad(R"rippleConfig( +[reduce_relay] +vp_enable=0 +)rippleConfig"); + + c.loadFromString(toLoad); + BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false); Config c1; - toLoad = (R"rippleConfig( + toLoad = R"rippleConfig( [reduce_relay] -vp_enable=0 -vp_squelch=0 -)rippleConfig"); +)rippleConfig"; c1.loadFromString(toLoad); - BEAST_EXPECT(c1.VP_REDUCE_RELAY_ENABLE == false); - BEAST_EXPECT(c1.VP_REDUCE_RELAY_SQUELCH == false); + BEAST_EXPECT(c1.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false); + }); + + doTest("Test Config - squelch enabled", log, [&](bool log) { + Config c; + + std::string toLoad(R"rippleConfig( +[reduce_relay] +vp_base_squelch_enable=1 +)rippleConfig"); + + c.loadFromString(toLoad); + BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == true); + }); + + doTest("Test Config - squelch disabled", log, [&](bool log) { + Config c; + + std::string toLoad(R"rippleConfig( +[reduce_relay] +vp_base_squelch_enable=0 +)rippleConfig"); + + c.loadFromString(toLoad); + BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false); + }); + + doTest("Test Config - legacy and new", log, [&](bool log) { + Config c; + + std::string toLoad(R"rippleConfig( +[reduce_relay] +vp_base_squelch_enable=0 +vp_enable=0 +)rippleConfig"); + + std::string error; + auto const expectedError = + "Invalid reduce_relay" + " cannot specify both vp_base_squelch_enable and vp_enable " + "options. " + "vp_enable was deprecated and replaced by " + "vp_base_squelch_enable"; + + try + { + c.loadFromString(toLoad); + } + catch (std::runtime_error& e) + { + error = e.what(); + } + + BEAST_EXPECT(error == expectedError); + }); + + doTest("Test Config - max selected peers", log, [&](bool log) { + Config c; + + std::string toLoad(R"rippleConfig( +[reduce_relay] +)rippleConfig"); + + c.loadFromString(toLoad); + BEAST_EXPECT(c.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS == 5); + + Config c1; + + toLoad = R"rippleConfig( +[reduce_relay] +vp_base_squelch_max_selected_peers=6 +)rippleConfig"; + + c1.loadFromString(toLoad); + BEAST_EXPECT(c1.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS == 6); Config c2; toLoad = R"rippleConfig( [reduce_relay] -vp_enabled=1 -vp_squelched=1 +vp_base_squelch_max_selected_peers=2 )rippleConfig"; - c2.loadFromString(toLoad); - BEAST_EXPECT(c2.VP_REDUCE_RELAY_ENABLE == false); - BEAST_EXPECT(c2.VP_REDUCE_RELAY_SQUELCH == false); + std::string error; + auto const expectedError = + "Invalid reduce_relay" + " vp_base_squelch_max_selected_peers must be " + "greater than or equal to 3"; + try + { + c2.loadFromString(toLoad); + } + catch (std::runtime_error& e) + { + error = e.what(); + } + + BEAST_EXPECT(error == expectedError); + }); + } + + void + testBaseSquelchReady(bool log) + { + doTest("BaseSquelchReady", log, [&](bool log) { + ManualClock::reset(); + auto createSlots = [&](bool baseSquelchEnabled) + -> reduce_relay::Slots { + env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = + baseSquelchEnabled; + return reduce_relay::Slots( + env_.app().logs(), network_.overlay(), env_.app().config()); + }; + // base squelching must not be ready if squelching is disabled + BEAST_EXPECT(!createSlots(false).baseSquelchReady()); + + // base squelch must not be ready as not enough time passed from + // bootup + BEAST_EXPECT(!createSlots(true).baseSquelchReady()); + + ManualClock::advance(reduce_relay::WAIT_ON_BOOTUP + minutes{1}); + + // base squelch enabled and bootup time passed + BEAST_EXPECT(createSlots(true).baseSquelchReady()); + + // even if time passed, base squelching must not be ready if turned + // off in the config + BEAST_EXPECT(!createSlots(false).baseSquelchReady()); }); } @@ -1425,7 +1567,7 @@ vp_squelched=1 auto run = [&](int npeers) { handler.maxDuration_ = 0; reduce_relay::Slots slots( - env_.app().logs(), handler); + env_.app().logs(), handler, env_.app().config()); // 1st message from a new peer switches the slot // to counting state and resets the counts of all peers + // MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold @@ -1503,14 +1645,12 @@ vp_squelched=1 std::stringstream str; str << "[reduce_relay]\n" << "vp_enable=" << enable << "\n" - << "vp_squelch=" << enable << "\n" << "[compression]\n" << "1\n"; c.loadFromString(str.str()); - env_.app().config().VP_REDUCE_RELAY_ENABLE = - c.VP_REDUCE_RELAY_ENABLE; - env_.app().config().VP_REDUCE_RELAY_SQUELCH = - c.VP_REDUCE_RELAY_SQUELCH; + env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = + c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE; + env_.app().config().COMPRESSION = c.COMPRESSION; }; auto handshake = [&](int outboundEnable, int inboundEnable) { @@ -1523,7 +1663,7 @@ vp_squelched=1 env_.app().config().COMPRESSION, false, env_.app().config().TX_REDUCE_RELAY_ENABLE, - env_.app().config().VP_REDUCE_RELAY_ENABLE); + env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE); http_request_type http_request; http_request.version(request.version()); http_request.base() = request.base(); @@ -1563,7 +1703,13 @@ vp_squelched=1 Network network_; public: - reduce_relay_test() : env_(*this), network_(env_.app()) + reduce_relay_test() + : env_(*this, jtx::envconfig([](std::unique_ptr cfg) { + cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = true; + cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6; + return cfg; + })) + , network_(env_.app()) { } @@ -1582,6 +1728,7 @@ public: testInternalHashRouter(log); testRandomSquelch(log); testHandshake(log); + testBaseSquelchReady(log); } }; diff --git a/src/xrpld/core/Config.h b/src/xrpld/core/Config.h index 5f4f752be..53ae25ea9 100644 --- a/src/xrpld/core/Config.h +++ b/src/xrpld/core/Config.h @@ -258,19 +258,18 @@ public: // size, but we allow admins to explicitly set it in the config. std::optional SWEEP_INTERVAL; - // Reduce-relay - these parameters are experimental. - // Enable reduce-relay features - // Validation/proposal reduce-relay feature - bool VP_REDUCE_RELAY_ENABLE = false; - // Send squelch message to peers. Generally this config should - // have the same value as VP_REDUCE_RELAY_ENABLE. It can be - // used for testing the feature's function without - // affecting the message relaying. To use it for testing, - // set it to false and set VP_REDUCE_RELAY_ENABLE to true. - // Squelch messages will not be sent to the peers in this case. - // Set log level to debug so that the feature function can be - // analyzed. - bool VP_REDUCE_RELAY_SQUELCH = false; + // Reduce-relay - Experimental parameters to control p2p routing algorithms + + // Enable base squelching of duplicate validation/proposal messages + bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = false; + + ///////////////////// !!TEMPORARY CODE BLOCK!! //////////////////////// + // Temporary squelching config for the peers selected as a source of // + // validator messages. The config must be removed once squelching is // + // made the default routing algorithm // + std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 5; + ///////////////// END OF TEMPORARY CODE BLOCK ///////////////////// + // Transaction reduce-relay feature bool TX_REDUCE_RELAY_ENABLE = false; // If tx reduce-relay feature is disabled diff --git a/src/xrpld/core/detail/Config.cpp b/src/xrpld/core/detail/Config.cpp index b1cc86b4f..890101029 100644 --- a/src/xrpld/core/detail/Config.cpp +++ b/src/xrpld/core/detail/Config.cpp @@ -737,8 +737,44 @@ Config::loadFromString(std::string const& fileContents) if (exists(SECTION_REDUCE_RELAY)) { auto sec = section(SECTION_REDUCE_RELAY); - VP_REDUCE_RELAY_ENABLE = sec.value_or("vp_enable", false); - VP_REDUCE_RELAY_SQUELCH = sec.value_or("vp_squelch", false); + + ///////////////////// !!TEMPORARY CODE BLOCK!! //////////////////////// + // vp_enable config option is deprecated by vp_base_squelch_enable // + // This option is kept for backwards compatibility. When squelching // + // is the default algorithm, it must be replaced with: // + // VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = // + // sec.value_or("vp_base_squelch_enable", true); // + if (sec.exists("vp_base_squelch_enable") && sec.exists("vp_enable")) + Throw( + "Invalid " SECTION_REDUCE_RELAY + " cannot specify both vp_base_squelch_enable and vp_enable " + "options. " + "vp_enable was deprecated and replaced by " + "vp_base_squelch_enable"); + + if (sec.exists("vp_base_squelch_enable")) + VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = + sec.value_or("vp_base_squelch_enable", false); + else if (sec.exists("vp_enable")) + VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = + sec.value_or("vp_enable", false); + else + VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = false; + ///////////////// !!END OF TEMPORARY CODE BLOCK!! ///////////////////// + + ///////////////////// !!TEMPORARY CODE BLOCK!! /////////////////////// + // Temporary squelching config for the peers selected as a source of // + // validator messages. The config must be removed once squelching is // + // made the default routing algorithm. // + VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = + sec.value_or("vp_base_squelch_max_selected_peers", 5); + if (VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS < 3) + Throw( + "Invalid " SECTION_REDUCE_RELAY + " vp_base_squelch_max_selected_peers must be " + "greater than or equal to 3"); + ///////////////// !!END OF TEMPORARY CODE BLOCK!! ///////////////////// + TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false); TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false); TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20); @@ -747,9 +783,9 @@ Config::loadFromString(std::string const& fileContents) TX_REDUCE_RELAY_MIN_PEERS < 10) Throw( "Invalid " SECTION_REDUCE_RELAY - ", tx_min_peers must be greater or equal to 10" - ", tx_relay_percentage must be greater or equal to 10 " - "and less or equal to 100"); + ", tx_min_peers must be greater than or equal to 10" + ", tx_relay_percentage must be greater than or equal to 10 " + "and less than or equal to 100"); } if (getSingleSection(secConfig, SECTION_MAX_TRANSACTIONS, strTemp, j_)) diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 6ae3c9a14..0956eb06f 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED #define RIPPLE_OVERLAY_SLOT_H_INCLUDED +#include #include #include @@ -32,7 +33,6 @@ #include #include -#include #include #include #include @@ -109,16 +109,25 @@ private: using id_t = Peer::id_t; using time_point = typename clock_type::time_point; + // a callback to report ignored squelches + using ignored_squelch_callback = std::function; + /** Constructor * @param journal Journal for logging * @param handler Squelch/Unsquelch implementation + * @param maxSelectedPeers the maximum number of peers to be selected as + * validator message source */ - Slot(SquelchHandler const& handler, beast::Journal journal) + Slot( + SquelchHandler const& handler, + beast::Journal journal, + uint16_t maxSelectedPeers) : reachedThreshold_(0) , lastSelected_(clock_type::now()) , state_(SlotState::Counting) , handler_(handler) , journal_(journal) + , maxSelectedPeers_(maxSelectedPeers) { } @@ -129,7 +138,7 @@ private: * slot's state to Counting. If the number of messages for the peer is > * MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the * number of considered peers who reached MAX_MESSAGE_THRESHOLD is - * MAX_SELECTED_PEERS then randomly select MAX_SELECTED_PEERS from + * maxSelectedPeers_ then randomly select maxSelectedPeers_ from * considered peers, and call squelch handler for each peer, which is not * selected and not already in Squelched state. Set the state for those * peers to Squelched and reset the count of all peers. Set slot's state to @@ -139,9 +148,14 @@ private: * @param id Peer id which received the message * @param type Message type (Validation and Propose Set only, * others are ignored, future use) + * @param callback A callback to report ignored squelches */ void - update(PublicKey const& validator, id_t id, protocol::MessageType type); + update( + PublicKey const& validator, + id_t id, + protocol::MessageType type, + ignored_squelch_callback callback); /** Handle peer deletion when a peer disconnects. * If the peer is in Selected state then @@ -223,17 +237,26 @@ private: time_point expire; // squelch expiration time time_point lastMessage; // time last message received }; + std::unordered_map peers_; // peer's data + // pool of peers considered as the source of messages // from validator - peers that reached MIN_MESSAGE_THRESHOLD std::unordered_set considered_; + // number of peers that reached MAX_MESSAGE_THRESHOLD std::uint16_t reachedThreshold_; + // last time peers were selected, used to age the slot typename clock_type::time_point lastSelected_; + SlotState state_; // slot's state SquelchHandler const& handler_; // squelch/unsquelch handler beast::Journal const journal_; // logging + + // the maximum number of peers that should be selected as a validator + // message source + uint16_t const maxSelectedPeers_; }; template @@ -264,7 +287,8 @@ void Slot::update( PublicKey const& validator, id_t id, - protocol::MessageType type) + protocol::MessageType type, + ignored_squelch_callback callback) { using namespace std::chrono; auto now = clock_type::now(); @@ -302,6 +326,10 @@ Slot::update( peer.lastMessage = now; + // report if we received a message from a squelched peer + if (peer.state == PeerState::Squelched) + callback(); + if (state_ != SlotState::Counting || peer.state == PeerState::Squelched) return; @@ -319,17 +347,17 @@ Slot::update( return; } - if (reachedThreshold_ == MAX_SELECTED_PEERS) + if (reachedThreshold_ == maxSelectedPeers_) { - // Randomly select MAX_SELECTED_PEERS peers from considered. + // Randomly select maxSelectedPeers_ peers from considered. // Exclude peers that have been idling > IDLED - // it's possible that deleteIdlePeer() has not been called yet. - // If number of remaining peers != MAX_SELECTED_PEERS + // If number of remaining peers != maxSelectedPeers_ // then reset the Counting state and let deleteIdlePeer() handle // idled peers. std::unordered_set selected; auto const consideredPoolSize = considered_.size(); - while (selected.size() != MAX_SELECTED_PEERS && considered_.size() != 0) + while (selected.size() != maxSelectedPeers_ && considered_.size() != 0) { auto i = considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1); @@ -347,7 +375,7 @@ Slot::update( selected.insert(id); } - if (selected.size() != MAX_SELECTED_PEERS) + if (selected.size() != maxSelectedPeers_) { JLOG(journal_.trace()) << "update: selection failed " << Slice(validator) << " " << id; @@ -364,7 +392,7 @@ Slot::update( << *std::next(s, 1) << " " << *std::next(s, 2); XRPL_ASSERT( - peers_.size() >= MAX_SELECTED_PEERS, + peers_.size() >= maxSelectedPeers_, "ripple::reduce_relay::Slot::update : minimum peers"); // squelch peers which are not selected and @@ -382,7 +410,7 @@ Slot::update( str << k << " "; v.state = PeerState::Squelched; std::chrono::seconds duration = - getSquelchDuration(peers_.size() - MAX_SELECTED_PEERS); + getSquelchDuration(peers_.size() - maxSelectedPeers_); v.expire = now + duration; handler_.squelch(validator, k, duration.count()); } @@ -544,15 +572,41 @@ class Slots final public: /** - * @param app Applicaton reference + * @param logs reference to the logger * @param handler Squelch/unsquelch implementation + * @param config reference to the global config */ - Slots(Logs& logs, SquelchHandler const& handler) - : handler_(handler), logs_(logs), journal_(logs.journal("Slots")) + Slots(Logs& logs, SquelchHandler const& handler, Config const& config) + : handler_(handler) + , logs_(logs) + , journal_(logs.journal("Slots")) + , baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE) + , maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS) { } ~Slots() = default; - /** Calls Slot::update of Slot associated with the validator. + + /** Check if base squelching feature is enabled and ready */ + bool + baseSquelchReady() + { + return baseSquelchEnabled_ && reduceRelayReady(); + } + + /** Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup */ + bool + reduceRelayReady() + { + if (!reduceRelayReady_) + reduceRelayReady_ = + reduce_relay::epoch(clock_type::now()) > + reduce_relay::WAIT_ON_BOOTUP; + + return reduceRelayReady_; + } + + /** Calls Slot::update of Slot associated with the validator, with a noop + * callback. * @param key Message's hash * @param validator Validator's public key * @param id Peer's id which received the message @@ -563,7 +617,25 @@ public: uint256 const& key, PublicKey const& validator, id_t id, - protocol::MessageType type); + protocol::MessageType type) + { + updateSlotAndSquelch(key, validator, id, type, []() {}); + } + + /** Calls Slot::update of Slot associated with the validator. + * @param key Message's hash + * @param validator Validator's public key + * @param id Peer's id which received the message + * @param type Received protocol message type + * @param callback A callback to report ignored validations + */ + void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + id_t id, + protocol::MessageType type, + typename Slot::ignored_squelch_callback callback); /** Check if peers stopped relaying messages * and if slots stopped receiving messages from the validator. @@ -651,10 +723,16 @@ private: bool addPeerMessage(uint256 const& key, id_t id); + std::atomic_bool reduceRelayReady_{false}; + hash_map> slots_; SquelchHandler const& handler_; // squelch/unsquelch handler Logs& logs_; beast::Journal const journal_; + + bool const baseSquelchEnabled_; + uint16_t const maxSelectedPeers_; + // Maintain aged container of message/peers. This is required // to discard duplicate message from the same peer. A message // is aged after IDLED seconds. A message received IDLED seconds @@ -702,7 +780,8 @@ Slots::updateSlotAndSquelch( uint256 const& key, PublicKey const& validator, id_t id, - protocol::MessageType type) + protocol::MessageType type, + typename Slot::ignored_squelch_callback callback) { if (!addPeerMessage(key, id)) return; @@ -712,15 +791,17 @@ Slots::updateSlotAndSquelch( { JLOG(journal_.trace()) << "updateSlotAndSquelch: new slot " << Slice(validator); - auto it = slots_ - .emplace(std::make_pair( - validator, - Slot(handler_, logs_.journal("Slot")))) - .first; - it->second.update(validator, id, type); + auto it = + slots_ + .emplace(std::make_pair( + validator, + Slot( + handler_, logs_.journal("Slot"), maxSelectedPeers_))) + .first; + it->second.update(validator, id, type, callback); } else - it->second.update(validator, id, type); + it->second.update(validator, id, type, callback); } template diff --git a/src/xrpld/overlay/detail/ConnectAttempt.cpp b/src/xrpld/overlay/detail/ConnectAttempt.cpp index 30763b135..84fbd36d3 100644 --- a/src/xrpld/overlay/detail/ConnectAttempt.cpp +++ b/src/xrpld/overlay/detail/ConnectAttempt.cpp @@ -209,7 +209,7 @@ ConnectAttempt::onHandshake(error_code ec) app_.config().COMPRESSION, app_.config().LEDGER_REPLAY, app_.config().TX_REDUCE_RELAY_ENABLE, - app_.config().VP_REDUCE_RELAY_ENABLE); + app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE); buildHandshake( req_, diff --git a/src/xrpld/overlay/detail/Handshake.cpp b/src/xrpld/overlay/detail/Handshake.cpp index 5c8bd6cb0..30f8c7faa 100644 --- a/src/xrpld/overlay/detail/Handshake.cpp +++ b/src/xrpld/overlay/detail/Handshake.cpp @@ -417,7 +417,7 @@ makeResponse( app.config().COMPRESSION, app.config().LEDGER_REPLAY, app.config().TX_REDUCE_RELAY_ENABLE, - app.config().VP_REDUCE_RELAY_ENABLE)); + app.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)); buildHandshake(resp, sharedValue, networkID, public_ip, remote_ip, app); diff --git a/src/xrpld/overlay/detail/Handshake.h b/src/xrpld/overlay/detail/Handshake.h index 37f138b88..1cd733ef5 100644 --- a/src/xrpld/overlay/detail/Handshake.h +++ b/src/xrpld/overlay/detail/Handshake.h @@ -139,7 +139,7 @@ makeResponse( // compression feature static constexpr char FEATURE_COMPR[] = "compr"; -// validation/proposal reduce-relay feature +// validation/proposal reduce-relay base squelch feature static constexpr char FEATURE_VPRR[] = "vprr"; // transaction reduce-relay feature static constexpr char FEATURE_TXRR[] = "txrr"; @@ -221,7 +221,7 @@ peerFeatureEnabled( @param txReduceRelayEnabled if true then transaction reduce-relay feature is enabled @param vpReduceRelayEnabled if true then validation/proposal reduce-relay - feature is enabled + base squelch feature is enabled @return X-Protocol-Ctl header value */ std::string @@ -241,8 +241,7 @@ makeFeaturesRequestHeader( @param txReduceRelayEnabled if true then transaction reduce-relay feature is enabled @param vpReduceRelayEnabled if true then validation/proposal reduce-relay - feature is enabled - @param vpReduceRelayEnabled if true then reduce-relay feature is enabled + base squelch feature is enabled @return X-Protocol-Ctl header value */ std::string diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index 7cf447e16..2ff300cd9 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -147,7 +147,7 @@ OverlayImpl::OverlayImpl( , m_resolver(resolver) , next_id_(1) , timer_count_(0) - , slots_(app.logs(), *this) + , slots_(app.logs(), *this, app.config()) , m_stats( std::bind(&OverlayImpl::collect_metrics, this), collector, @@ -1388,8 +1388,7 @@ makeSquelchMessage( void OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const { - if (auto peer = findPeerByShortID(id); - peer && app_.config().VP_REDUCE_RELAY_SQUELCH) + if (auto peer = findPeerByShortID(id); peer) { // optimize - multiple message with different // validator might be sent to the same peer @@ -1403,8 +1402,7 @@ OverlayImpl::squelch( Peer::id_t id, uint32_t squelchDuration) const { - if (auto peer = findPeerByShortID(id); - peer && app_.config().VP_REDUCE_RELAY_SQUELCH) + if (auto peer = findPeerByShortID(id); peer) { peer->send(makeSquelchMessage(validator, true, squelchDuration)); } @@ -1417,6 +1415,9 @@ OverlayImpl::updateSlotAndSquelch( std::set&& peers, protocol::MessageType type) { + if (!slots_.baseSquelchReady()) + return; + if (!strand_.running_in_this_thread()) return post( strand_, @@ -1425,7 +1426,9 @@ OverlayImpl::updateSlotAndSquelch( }); for (auto id : peers) - slots_.updateSlotAndSquelch(key, validator, id, type); + slots_.updateSlotAndSquelch(key, validator, id, type, [&]() { + reportInboundTraffic(TrafficCount::squelch_ignored, 0); + }); } void @@ -1435,12 +1438,17 @@ OverlayImpl::updateSlotAndSquelch( Peer::id_t peer, protocol::MessageType type) { + if (!slots_.baseSquelchReady()) + return; + if (!strand_.running_in_this_thread()) return post(strand_, [this, key, validator, peer, type]() { updateSlotAndSquelch(key, validator, peer, type); }); - slots_.updateSlotAndSquelch(key, validator, peer, type); + slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() { + reportInboundTraffic(TrafficCount::squelch_ignored, 0); + }); } void diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index a6e4ad690..2a391439b 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -114,20 +114,21 @@ PeerImp::PeerImp( headers_, FEATURE_TXRR, app_.config().TX_REDUCE_RELAY_ENABLE)) - , vpReduceRelayEnabled_(app_.config().VP_REDUCE_RELAY_ENABLE) , ledgerReplayEnabled_(peerFeatureEnabled( headers_, FEATURE_LEDGER_REPLAY, app_.config().LEDGER_REPLAY)) , ledgerReplayMsgHandler_(app, app.getLedgerReplayer()) { - JLOG(journal_.info()) << "compression enabled " - << (compressionEnabled_ == Compressed::On) - << " vp reduce-relay enabled " - << vpReduceRelayEnabled_ - << " tx reduce-relay enabled " - << txReduceRelayEnabled_ << " on " << remote_address_ - << " " << id_; + JLOG(journal_.info()) + << "compression enabled " << (compressionEnabled_ == Compressed::On) + << " vp reduce-relay base squelch enabled " + << peerFeatureEnabled( + headers_, + FEATURE_VPRR, + app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE) + << " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on " + << remote_address_ << " " << id_; } PeerImp::~PeerImp() @@ -1743,8 +1744,7 @@ PeerImp::onMessage(std::shared_ptr const& m) { // Count unique messages (Slots has it's own 'HashRouter'), which a peer // receives within IDLED seconds since the message has been relayed. - if (reduceRelayReady() && relayed && - (stopwatch().now() - *relayed) < reduce_relay::IDLED) + if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED) overlay_.updateSlotAndSquelch( suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER); @@ -2391,10 +2391,8 @@ PeerImp::onMessage(std::shared_ptr const& m) { // Count unique messages (Slots has it's own 'HashRouter'), which a // peer receives within IDLED seconds since the message has been - // relayed. Wait WAIT_ON_BOOTUP time to let the server establish - // connections to peers. - if (reduceRelayReady() && relayed && - (stopwatch().now() - *relayed) < reduce_relay::IDLED) + // relayed. + if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED) overlay_.updateSlotAndSquelch( key, val->getSignerPublic(), id_, protocol::mtVALIDATION); @@ -3024,7 +3022,7 @@ PeerImp::checkPropose( // as part of the squelch logic. auto haveMessage = app_.overlay().relay( *packet, peerPos.suppressionID(), peerPos.publicKey()); - if (reduceRelayReady() && !haveMessage.empty()) + if (!haveMessage.empty()) overlay_.updateSlotAndSquelch( peerPos.suppressionID(), peerPos.publicKey(), @@ -3059,7 +3057,7 @@ PeerImp::checkValidation( // as part of the squelch logic. auto haveMessage = overlay_.relay(*packet, key, val->getSignerPublic()); - if (reduceRelayReady() && !haveMessage.empty()) + if (!haveMessage.empty()) { overlay_.updateSlotAndSquelch( key, @@ -3525,16 +3523,6 @@ PeerImp::isHighLatency() const return latency_ >= peerHighLatency; } -bool -PeerImp::reduceRelayReady() -{ - if (!reduceRelayReady_) - reduceRelayReady_ = - reduce_relay::epoch(UptimeClock::now()) > - reduce_relay::WAIT_ON_BOOTUP; - return vpReduceRelayEnabled_ && reduceRelayReady_; -} - void PeerImp::Metrics::add_message(std::uint64_t bytes) { diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index 8fbafa1ee..ecd3fc7f6 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -116,7 +116,6 @@ private: clock_type::time_point const creationTime_; reduce_relay::Squelch squelch_; - inline static std::atomic_bool reduceRelayReady_{false}; // Notes on thread locking: // @@ -190,9 +189,7 @@ private: hash_set txQueue_; // true if tx reduce-relay feature is enabled on the peer. bool txReduceRelayEnabled_ = false; - // true if validation/proposal reduce-relay feature is enabled - // on the peer. - bool vpReduceRelayEnabled_ = false; + bool ledgerReplayEnabled_ = false; LedgerReplayMsgHandler ledgerReplayMsgHandler_; @@ -521,11 +518,6 @@ private: handleHaveTransactions( std::shared_ptr const& m); - // Check if reduce-relay feature is enabled and - // reduce_relay::WAIT_ON_BOOTUP time passed since the start - bool - reduceRelayReady(); - public: //-------------------------------------------------------------------------- // @@ -705,7 +697,6 @@ PeerImp::PeerImp( headers_, FEATURE_TXRR, app_.config().TX_REDUCE_RELAY_ENABLE)) - , vpReduceRelayEnabled_(app_.config().VP_REDUCE_RELAY_ENABLE) , ledgerReplayEnabled_(peerFeatureEnabled( headers_, FEATURE_LEDGER_REPLAY, @@ -714,13 +705,15 @@ PeerImp::PeerImp( { read_buffer_.commit(boost::asio::buffer_copy( read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers)); - JLOG(journal_.info()) << "compression enabled " - << (compressionEnabled_ == Compressed::On) - << " vp reduce-relay enabled " - << vpReduceRelayEnabled_ - << " tx reduce-relay enabled " - << txReduceRelayEnabled_ << " on " << remote_address_ - << " " << id_; + JLOG(journal_.info()) + << "compression enabled " << (compressionEnabled_ == Compressed::On) + << " vp reduce-relay base squelch enabled " + << peerFeatureEnabled( + headers_, + FEATURE_VPRR, + app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE) + << " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on " + << remote_address_ << " " << id_; } template diff --git a/src/xrpld/overlay/detail/TrafficCount.h b/src/xrpld/overlay/detail/TrafficCount.h index e93163683..8dc02def5 100644 --- a/src/xrpld/overlay/detail/TrafficCount.h +++ b/src/xrpld/overlay/detail/TrafficCount.h @@ -109,6 +109,8 @@ public: squelch, squelch_suppressed, // egress traffic amount suppressed by squelching + squelch_ignored, // the traffic amount that came from peers ignoring + // squelch messages // TMHaveSet message: get_set, // transaction sets we try to get @@ -262,6 +264,7 @@ public: {validatorlist, "validator_lists"}, {squelch, "squelch"}, {squelch_suppressed, "squelch_suppressed"}, + {squelch_ignored, "squelch_ignored"}, {get_set, "set_get"}, {share_set, "set_share"}, {ld_tsc_get, "ledger_data_Transaction_Set_candidate_get"}, @@ -326,6 +329,7 @@ protected: {validatorlist, {validatorlist}}, {squelch, {squelch}}, {squelch_suppressed, {squelch_suppressed}}, + {squelch_ignored, {squelch_ignored}}, {get_set, {get_set}}, {share_set, {share_set}}, {ld_tsc_get, {ld_tsc_get}},