From 9b9f34f8818721711d56689701e5d07632a94d50 Mon Sep 17 00:00:00 2001 From: Gregory Tsipenyuk Date: Sun, 12 Apr 2020 10:03:03 -0400 Subject: [PATCH] Optimize relaying of validation and proposal messages: With few exceptions, servers will typically receive multiple copies of any given message from its directly connected peers. For servers with several peers this can impact the processing latency and force it to do redundant work. Proposal and validation messages are often relayed with extremely high redundancy. This commit, if merged, introduces experimental code that attempts to optimize the relaying of proposals and validations by allowing servers to instruct their peers to "squelch" delivery of selected proposals and validations. Servers making squelching decisions by a process that evaluates the fitness and performance of a given server and randomly selecting a subset of the best candidates. The experimental code is presently disabled and must be explicitly enabled by server operators that wish to test it. --- Builds/CMake/RippledCore.cmake | 1 + bin/ci/ubuntu/build-and-test.sh | 1 + src/ripple/app/consensus/RCLConsensus.cpp | 2 +- src/ripple/app/misc/HashRouter.cpp | 12 +- src/ripple/app/misc/HashRouter.h | 23 +- src/ripple/core/Config.h | 6 + src/ripple/core/ConfigSections.h | 1 + src/ripple/core/impl/Config.cpp | 7 + src/ripple/overlay/Message.h | 16 +- src/ripple/overlay/Overlay.h | 30 +- src/ripple/overlay/Slot.h | 734 +++++++++++ src/ripple/overlay/Squelch.h | 122 ++ src/ripple/overlay/SquelchCommon.h | 52 + src/ripple/overlay/impl/Message.cpp | 6 +- src/ripple/overlay/impl/OverlayImpl.cpp | 123 +- src/ripple/overlay/impl/OverlayImpl.h | 68 +- src/ripple/overlay/impl/PeerImp.cpp | 108 +- src/ripple/overlay/impl/PeerImp.h | 5 + src/ripple/overlay/impl/ProtocolMessage.h | 6 + src/ripple/overlay/impl/Tuning.h | 3 + src/ripple/proto/ripple.proto | 8 + src/test/app/HashRouter_test.cpp | 2 +- src/test/overlay/reduce_relay_test.cpp | 1414 +++++++++++++++++++++ 23 files changed, 2714 insertions(+), 36 deletions(-) create mode 100644 src/ripple/overlay/Slot.h create mode 100644 src/ripple/overlay/Squelch.h create mode 100644 src/ripple/overlay/SquelchCommon.h create mode 100644 src/test/overlay/reduce_relay_test.cpp diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 97d48bde8a..ceda0df4bc 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -867,6 +867,7 @@ target_sources (rippled PRIVATE src/test/overlay/cluster_test.cpp src/test/overlay/short_read_test.cpp src/test/overlay/compression_test.cpp + src/test/overlay/reduce_relay_test.cpp #[===============================[ test sources: subdir: peerfinder diff --git a/bin/ci/ubuntu/build-and-test.sh b/bin/ci/ubuntu/build-and-test.sh index 7ffad801dd..0b77406c94 100755 --- a/bin/ci/ubuntu/build-and-test.sh +++ b/bin/ci/ubuntu/build-and-test.sh @@ -136,6 +136,7 @@ else # ORDER matters here...sorted in approximately # descending execution time (longest running tests at top) declare -a manual_tests=( + 'ripple.ripple_data.reduce_relay_simulate' 'ripple.ripple_data.digest' 'ripple.tx.Offer_manual' 'ripple.app.PayStrandAllPairs' diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index 9b5ae65cd4..fd61ae2bf4 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -157,7 +157,7 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos) auto const sig = peerPos.signature(); prop.set_signature(sig.data(), sig.size()); - app_.overlay().relay(prop, peerPos.suppressionID()); + app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey()); } void diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index ea47450826..8a8170f486 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -49,12 +49,18 @@ HashRouter::addSuppression(uint256 const& key) bool HashRouter::addSuppressionPeer(uint256 const& key, PeerShortID peer) +{ + return addSuppressionPeerWithStatus(key, peer).first; +} + +std::pair> +HashRouter::addSuppressionPeerWithStatus(const uint256& key, PeerShortID peer) { std::lock_guard lock(mutex_); auto result = emplace(key); result.first.addPeer(peer); - return result.second; + return {result.second, result.first.relayed()}; } bool @@ -110,14 +116,14 @@ HashRouter::setFlags(uint256 const& key, int flags) auto HashRouter::shouldRelay(uint256 const& key) - -> boost::optional> + -> std::optional> { std::lock_guard lock(mutex_); auto& s = emplace(key).first; if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_)) - return boost::none; + return {}; return s.releasePeerSet(); } diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index 9038cf2922..76cf4431ec 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -97,6 +97,13 @@ private: return std::move(peers_); } + /** Return seated relay time point if the message has been relayed */ + std::optional + relayed() const + { + return relayed_; + } + /** Determines if this item should be relayed. Checks whether the item has been recently relayed. @@ -142,8 +149,8 @@ private: std::set peers_; // This could be generalized to a map, if more // than one flag needs to expire independently. - boost::optional relayed_; - boost::optional processed_; + std::optional relayed_; + std::optional processed_; std::uint32_t recoveries_ = 0; }; @@ -185,6 +192,14 @@ public: bool addSuppressionPeer(uint256 const& key, PeerShortID peer); + /** Add a suppression peer and get message's relay status. + * Return pair: + * element 1: true if the peer is added. + * element 2: optional is seated to the relay time point or + * is unseated if has not relayed yet. */ + std::pair> + addSuppressionPeerWithStatus(uint256 const& key, PeerShortID peer); + bool addSuppressionPeer(uint256 const& key, PeerShortID peer, int& flags); @@ -214,11 +229,11 @@ public: return `true` again until the hold time has expired. The internal set of peers will also be reset. - @return A `boost::optional` set of peers which do not need to be + @return A `std::optional` set of peers which do not need to be relayed to. If the result is uninitialized, the item should _not_ be relayed. */ - boost::optional> + std::optional> shouldRelay(uint256 const& key); /** Determines whether the hashed item should be recovered diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index 7eec3bc076..4bf26a6141 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -179,6 +179,12 @@ public: // Thread pool configuration std::size_t WORKERS = 0; + // Reduce-relay - these parameters are experimental. + // Enable reduce-relay functionality + bool REDUCE_RELAY_ENABLE = false; + // Send squelch message to peers + bool REDUCE_RELAY_SQUELCH = false; + // These override the command line client settings boost::optional rpc_ip; diff --git a/src/ripple/core/ConfigSections.h b/src/ripple/core/ConfigSections.h index 3aae9774d1..313a759c6f 100644 --- a/src/ripple/core/ConfigSections.h +++ b/src/ripple/core/ConfigSections.h @@ -70,6 +70,7 @@ struct ConfigSection #define SECTION_PATH_SEARCH_MAX "path_search_max" #define SECTION_PEER_PRIVATE "peer_private" #define SECTION_PEERS_MAX "peers_max" +#define SECTION_REDUCE_RELAY "reduce_relay" #define SECTION_RELAY_PROPOSALS "relay_proposals" #define SECTION_RELAY_VALIDATIONS "relay_validations" #define SECTION_RPC_STARTUP "rpc_startup" diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index a7aeca8617..bdaa47b28e 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -481,6 +481,13 @@ Config::loadFromString(std::string const& fileContents) if (getSingleSection(secConfig, SECTION_COMPRESSION, strTemp, j_)) COMPRESSION = beast::lexicalCastThrow(strTemp); + if (exists(SECTION_REDUCE_RELAY)) + { + auto sec = section(SECTION_REDUCE_RELAY); + REDUCE_RELAY_ENABLE = sec.value_or("enable", false); + REDUCE_RELAY_SQUELCH = sec.value_or("squelch", false); + } + if (getSingleSection( secConfig, SECTION_AMENDMENT_MAJORITY_TIME, strTemp, j_)) { diff --git a/src/ripple/overlay/Message.h b/src/ripple/overlay/Message.h index e2c081d123..724fad07e6 100644 --- a/src/ripple/overlay/Message.h +++ b/src/ripple/overlay/Message.h @@ -21,6 +21,7 @@ #define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED #include +#include #include #include #include @@ -55,8 +56,13 @@ public: /** Constructor * @param message Protocol message to serialize * @param type Protocol message type + * @param validator Public Key of the source validator for Validation or + * Proposal message. Used to check if the message should be squelched. */ - Message(::google::protobuf::Message const& message, int type); + Message( + ::google::protobuf::Message const& message, + int type, + boost::optional const& validator = {}); /** Retrieve the packed message data. If compressed message is requested but * the message is not compressible then the uncompressed buffer is returned. @@ -74,11 +80,19 @@ public: return category_; } + /** Get the validator's key */ + boost::optional const& + getValidatorKey() const + { + return validatorKey_; + } + private: std::vector buffer_; std::vector bufferCompressed_; std::size_t category_; std::once_flag once_flag_; + boost::optional validatorKey_; /** Set the payload header * @param in Pointer to the payload diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index d5f0c4bae2..fd1b9ca34f 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -133,7 +133,7 @@ public: /** Returns the peer with the matching short id, or null. */ virtual std::shared_ptr - findPeerByShortID(Peer::id_t const& id) = 0; + findPeerByShortID(Peer::id_t const& id) const = 0; /** Returns the peer with the matching public key, or null. */ virtual std::shared_ptr @@ -147,13 +147,29 @@ public: virtual void broadcast(protocol::TMValidation& m) = 0; - /** Relay a proposal. */ - virtual void - relay(protocol::TMProposeSet& m, uint256 const& uid) = 0; + /** Relay a proposal. + * @param m the serialized proposal + * @param uid the id used to identify this proposal + * @param validator The pubkey of the validator that issued this proposal + * @return the set of peers which have already sent us this proposal + */ + virtual std::set + relay( + protocol::TMProposeSet& m, + uint256 const& uid, + PublicKey const& validator) = 0; - /** Relay a validation. */ - virtual void - relay(protocol::TMValidation& m, uint256 const& uid) = 0; + /** Relay a validation. + * @param m the serialized validation + * @param uid the id used to identify this validation + * @param validator The pubkey of the validator that issued this validation + * @return the set of peers which have already sent us this validation + */ + virtual std::set + relay( + protocol::TMValidation& m, + uint256 const& uid, + PublicKey const& validator) = 0; /** Visit every active peer. * diff --git a/src/ripple/overlay/Slot.h b/src/ripple/overlay/Slot.h new file mode 100644 index 0000000000..57d1c137a1 --- /dev/null +++ b/src/ripple/overlay/Slot.h @@ -0,0 +1,734 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED +#define RIPPLE_OVERLAY_SLOT_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +namespace squelch { + +template +class Slots; + +/** Peer's State */ +enum class PeerState : uint8_t { + Counting, // counting messages + Selected, // selected to relay, counting if Slot in Counting + Squelched, // squelched, doesn't relay +}; +/** Slot's State */ +enum class SlotState : uint8_t { + Counting, // counting messages + Selected, // peers selected, stop counting +}; + +template +Unit +epoch(TP const& t) +{ + return duration_cast(t.time_since_epoch()); +} + +/** Abstract class. Declares squelch and unsquelch handlers. + * OverlayImpl inherits from this class. Motivation is + * for easier unit tests to facilitate on the fly + * changing callbacks. */ +class SquelchHandler +{ +public: + virtual ~SquelchHandler() + { + } + /** Squelch handler + * @param validator Public key of the source validator + * @param id Peer's id to squelch + * @param duration Squelch duration in seconds + */ + virtual void + squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration) + const = 0; + /** Unsquelch handler + * @param validator Public key of the source validator + * @param id Peer's id to unsquelch + */ + virtual void + unsquelch(PublicKey const& validator, Peer::id_t id) const = 0; +}; + +/** + * Slot is associated with a specific validator via validator's public key. + * Slot counts messages from a validator, selects peers to be the source + * of the messages, and communicates the peers to be squelched. Slot can be + * in the following states: 1) Counting. This is the peer selection state + * when Slot counts the messages and selects the peers; 2) Selected. Slot + * doesn't count messages in Selected state. A message received from + * unsquelched, disconnected peer, or idling peer may transition Slot to + * Counting state. + */ +template +class Slot final +{ +private: + friend class Slots; + using id_t = Peer::id_t; + using time_point = typename clock_type::time_point; + + /** Constructor + * @param journal Journal for logging + * @param handler Squelch/Unsquelch implementation + */ + Slot(SquelchHandler const& handler, beast::Journal journal) + : reachedThreshold_(0) + , lastSelected_(clock_type::now()) + , state_(SlotState::Counting) + , handler_(handler) + , journal_(journal) + { + } + + /** Update peer info. If the message is from a new + * peer or from a previously expired squelched peer then switch + * the peer's and slot's state to Counting. If time of last + * selection round is > 2 * MAX_UNSQUELCH_EXPIRE then switch the slot's + * state to Counting. If the number of messages for the peer + * is > MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. + * If the number of considered peers who reached MAX_MESSAGE_THRESHOLD is + * MAX_SELECTED_PEERS then randomly select MAX_SELECTED_PEERS from + * considered peers, and call squelch handler for each peer, which is not + * selected and not already in Squelched state. Set the state for those + * peers to Squelched and reset the count of all peers. Set slot's state to + * Selected. Message count is not updated when the slot is in Selected + * state. + * @param validator Public key of the source validator + * @param id Peer id which received the message + * @param type Message type (Validation and Propose Set only, + * others are ignored, future use) + */ + void + update(PublicKey const& validator, id_t id, protocol::MessageType type); + + /** Handle peer deletion when a peer disconnects. + * If the peer is in Selected state then + * call unsquelch handler for every peer in squelched state and reset + * every peer's state to Counting. Switch Slot's state to Counting. + * @param validator Public key of the source validator + * @param id Deleted peer id + * @param erase If true then erase the peer. The peer is not erased + * when the peer when is idled. The peer is deleted when it + * disconnects + */ + void + deletePeer(PublicKey const& validator, id_t id, bool erase); + + /** Get the time of the last peer selection round */ + const time_point& + getLastSelected() const + { + return lastSelected_; + } + + /** Return number of peers in state */ + std::uint16_t + inState(PeerState state) const; + + /** Return number of peers not in state */ + std::uint16_t + notInState(PeerState state) const; + + /** Return Slot's state */ + SlotState + getState() const + { + return state_; + } + + /** Return selected peers */ + std::set + getSelected() const; + + /** Get peers info. Return map of peer's state, count, squelch + * expiration milsec, and last message time milsec. + */ + std:: + unordered_map> + getPeers() const; + + /** Check if peers stopped relaying messages. If a peer is + * selected peer then call unsquelch handler for all + * currently squelched peers and switch the slot to + * Counting state. + * @param validator Public key of the source validator + */ + void + deleteIdlePeer(PublicKey const& validator); + +private: + /** Reset counts of peers in Selected or Counting state */ + void + resetCounts(); + + /** Initialize slot to Counting state */ + void + initCounting(); + + /** Data maintained for each peer */ + struct PeerInfo + { + PeerState state; // peer's state + std::size_t count; // message count + time_point expire; // squelch expiration time + time_point lastMessage; // time last message received + }; + std::unordered_map peers_; // peer's data + // pool of peers considered as the source of messages + // from validator - peers that reached MIN_MESSAGE_THRESHOLD + std::unordered_set considered_; + // number of peers that reached MAX_MESSAGE_THRESHOLD + std::uint16_t reachedThreshold_; + // last time peers were selected, used to age the slot + typename clock_type::time_point lastSelected_; + SlotState state_; // slot's state + SquelchHandler const& handler_; // squelch/unsquelch handler + beast::Journal const journal_; // logging +}; + +template +void +Slot::deleteIdlePeer(PublicKey const& validator) +{ + auto now = clock_type::now(); + for (auto it = peers_.begin(); it != peers_.end();) + { + auto& peer = it->second; + auto id = it->first; + ++it; + if (now - peer.lastMessage > IDLED) + { + JLOG(journal_.debug()) + << "deleteIdlePeer: " << Slice(validator) << " " << id + << " idled " + << duration_cast(now - peer.lastMessage).count() + << " selected " << (peer.state == PeerState::Selected); + deletePeer(validator, id, false); + } + } +} + +template +void +Slot::update( + PublicKey const& validator, + id_t id, + protocol::MessageType type) +{ + auto now = clock_type::now(); + auto it = peers_.find(id); + // First message from this peer + if (it == peers_.end()) + { + JLOG(journal_.debug()) + << "update: adding peer " << Slice(validator) << " " << id; + peers_.emplace( + std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now})); + initCounting(); + return; + } + // Message from a peer with expired squelch + if (it->second.state == PeerState::Squelched && now > it->second.expire) + { + JLOG(journal_.debug()) + << "update: squelch expired " << Slice(validator) << " " << id; + it->second.state = PeerState::Counting; + it->second.lastMessage = now; + initCounting(); + return; + } + + auto& peer = it->second; + + JLOG(journal_.debug()) + << "update: existing peer " << Slice(validator) << " " << id + << " slot state " << static_cast(state_) << " peer state " + << static_cast(peer.state) << " count " << peer.count << " last " + << duration_cast(now - peer.lastMessage).count() + << " pool " << considered_.size() << " threshold " << reachedThreshold_ + << " " << (type == protocol::mtVALIDATION ? "validation" : "proposal"); + + peer.lastMessage = now; + + if (state_ != SlotState::Counting || peer.state == PeerState::Squelched) + return; + + if (++peer.count > MIN_MESSAGE_THRESHOLD) + considered_.insert(id); + if (peer.count == (MAX_MESSAGE_THRESHOLD + 1)) + ++reachedThreshold_; + + if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE) + { + JLOG(journal_.debug()) + << "update: resetting due to inactivity " << Slice(validator) << " " + << id << " " << duration_cast(now - lastSelected_).count(); + initCounting(); + return; + } + + if (reachedThreshold_ == MAX_SELECTED_PEERS) + { + // Randomly select MAX_SELECTED_PEERS peers from considered. + // Exclude peers that have been idling > IDLED - + // it's possible that deleteIdlePeer() has not been called yet. + // If number of remaining peers != MAX_SELECTED_PEERS + // then reset the Counting state and let deleteIdlePeer() handle + // idled peers. + std::unordered_set selected; + auto const consideredPoolSize = considered_.size(); + while (selected.size() != MAX_SELECTED_PEERS && considered_.size() != 0) + { + auto i = + considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1); + auto it = std::next(considered_.begin(), i); + auto id = *it; + considered_.erase(it); + auto const& itpeers = peers_.find(id); + if (itpeers == peers_.end()) + { + JLOG(journal_.error()) << "update: peer not found " + << Slice(validator) << " " << id; + continue; + } + if (now - itpeers->second.lastMessage < IDLED) + selected.insert(id); + } + + if (selected.size() != MAX_SELECTED_PEERS) + { + JLOG(journal_.debug()) + << "update: selection failed " << Slice(validator) << " " << id; + initCounting(); + return; + } + + lastSelected_ = now; + + auto s = selected.begin(); + JLOG(journal_.debug()) + << "update: " << Slice(validator) << " " << id << " pool size " + << consideredPoolSize << " selected " << *s << " " + << *std::next(s, 1) << " " << *std::next(s, 2); + + // squelch peers which are not selected and + // not already squelched + std::stringstream str; + for (auto& [k, v] : peers_) + { + v.count = 0; + + if (selected.find(k) != selected.end()) + v.state = PeerState::Selected; + else if (v.state != PeerState::Squelched) + { + if (journal_.debug()) + str << k << " "; + v.state = PeerState::Squelched; + auto duration = Squelch::getSquelchDuration(); + v.expire = now + duration; + handler_.squelch( + validator, + k, + duration_cast(duration).count()); + } + } + JLOG(journal_.debug()) << "update: squelching " << Slice(validator) + << " " << id << " " << str.str(); + considered_.clear(); + reachedThreshold_ = 0; + state_ = SlotState::Selected; + } +} + +template +void +Slot::deletePeer(PublicKey const& validator, id_t id, bool erase) +{ + auto it = peers_.find(id); + if (it != peers_.end()) + { + JLOG(journal_.debug()) + << "deletePeer: " << Slice(validator) << " " << id << " selected " + << (it->second.state == PeerState::Selected) << " considered " + << (considered_.find(id) != considered_.end()) << " erase " + << erase; + auto now = clock_type::now(); + if (it->second.state == PeerState::Selected) + { + for (auto& [k, v] : peers_) + { + if (v.state == PeerState::Squelched) + handler_.unsquelch(validator, k); + v.state = PeerState::Counting; + v.count = 0; + v.expire = now; + } + + considered_.clear(); + reachedThreshold_ = 0; + state_ = SlotState::Counting; + } + else if (considered_.find(id) != considered_.end()) + { + if (it->second.count > MAX_MESSAGE_THRESHOLD) + --reachedThreshold_; + considered_.erase(id); + } + + it->second.lastMessage = now; + it->second.count = 0; + + if (erase) + peers_.erase(it); + } +} + +template +void +Slot::resetCounts() +{ + for (auto& [_, peer] : peers_) + { + (void)_; + peer.count = 0; + } +} + +template +void +Slot::initCounting() +{ + state_ = SlotState::Counting; + considered_.clear(); + reachedThreshold_ = 0; + resetCounts(); +} + +template +std::uint16_t +Slot::inState(PeerState state) const +{ + return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) { + return (it.second.state == state); + }); +} + +template +std::uint16_t +Slot::notInState(PeerState state) const +{ + return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) { + return (it.second.state != state); + }); +} + +template +std::set +Slot::getSelected() const +{ + std::set init; + return std::accumulate( + peers_.begin(), peers_.end(), init, [](auto& init, auto const& it) { + if (it.second.state == PeerState::Selected) + { + init.insert(it.first); + return init; + } + return init; + }); +} + +template +std::unordered_map< + typename Peer::id_t, + std::tuple> +Slot::getPeers() const +{ + auto init = std::unordered_map< + id_t, + std::tuple>(); + return std::accumulate( + peers_.begin(), peers_.end(), init, [](auto& init, auto const& it) { + init.emplace(std::make_pair( + it.first, + std::move(std::make_tuple( + it.second.state, + it.second.count, + epoch(it.second.expire).count(), + epoch(it.second.lastMessage).count())))); + return init; + }); +} + +/** Slots is a container for validator's Slot and handles Slot update + * when a message is received from a validator. It also handles Slot aging + * and checks for peers which are disconnected or stopped relaying the messages. + */ +template +class Slots final +{ + using time_point = typename clock_type::time_point; + using id_t = typename Peer::id_t; + using messages = beast::aged_unordered_map< + uint256, + std::unordered_set, + clock_type, + hardened_hash>; + +public: + /** + * @param app Applicaton reference + * @param handler Squelch/unsquelch implementation + */ + Slots(Application& app, SquelchHandler const& handler) + : handler_(handler), app_(app), journal_(app.journal("Slots")) + { + } + ~Slots() = default; + /** Calls Slot::update of Slot associated with the validator. + * @param key Message's hash + * @param validator Validator's public key + * @param id Peer's id which received the message + * @param id Peer's pointer which received the message + * @param type Received protocol message type + */ + void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + id_t id, + protocol::MessageType type); + + /** Check if peers stopped relaying messages + * and if slots stopped receiving messages from the validator. + */ + void + deleteIdlePeers(); + + /** Return number of peers in state */ + std::optional + inState(PublicKey const& validator, PeerState state) const + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.inState(state); + return {}; + } + + /** Return number of peers not in state */ + std::optional + notInState(PublicKey const& validator, PeerState state) const + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.notInState(state); + return {}; + } + + /** Return true if Slot is in state */ + bool + inState(PublicKey const& validator, SlotState state) const + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.state_ == state; + return false; + } + + /** Get selected peers */ + std::set + getSelected(PublicKey const& validator) + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.getSelected(); + return {}; + } + + /** Get peers info. Return map of peer's state, count, and squelch + * expiration milliseconds. + */ + std::unordered_map< + typename Peer::id_t, + std::tuple> + getPeers(PublicKey const& validator) + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.getPeers(); + return {}; + } + + /** Get Slot's state */ + std::optional + getState(PublicKey const& validator) + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.getState(); + return {}; + } + + /** Called when a peer is deleted. If the peer was selected to be the + * source of messages from the validator then squelched peers have to be + * unsquelched. + * @param id Peer's id + * @param erase If true then erase the peer + */ + void + deletePeer(id_t id, bool erase); + +private: + /** Add message/peer if have not seen this message + * from the peer. A message is aged after IDLED seconds. + * Return true if added */ + bool + addPeerMessage(uint256 const& key, id_t id); + + hash_map> slots_; + SquelchHandler const& handler_; // squelch/unsquelch handler + Application& app_; + beast::Journal const journal_; + // Maintain aged container of message/peers. This is required + // to discard duplicate message from the same peer. A message + // is aged after IDLED seconds. A message received IDLED seconds + // after it was relayed is ignored by PeerImp. + inline static messages peersWithMessage_{ + beast::get_abstract_clock()}; +}; + +template +bool +Slots::addPeerMessage(uint256 const& key, id_t id) +{ + beast::expire(peersWithMessage_, squelch::IDLED); + + if (key.isNonZero()) + { + auto it = peersWithMessage_.find(key); + if (it == peersWithMessage_.end()) + { + JLOG(journal_.trace()) + << "addPeerMessage: new " << to_string(key) << " " << id; + peersWithMessage_.emplace(key, std::unordered_set{id}); + return true; + } + + if (it->second.find(id) != it->second.end()) + { + JLOG(journal_.trace()) << "addPeerMessage: duplicate message " + << to_string(key) << " " << id; + return false; + } + + JLOG(journal_.trace()) + << "addPeerMessage: added " << to_string(key) << " " << id; + + it->second.insert(id); + } + + return true; +} + +template +void +Slots::updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + id_t id, + protocol::MessageType type) +{ + if (!addPeerMessage(key, id)) + return; + + auto it = slots_.find(validator); + if (it == slots_.end()) + { + JLOG(journal_.debug()) + << "updateSlotAndSquelch: new slot " << Slice(validator); + auto it = slots_ + .emplace(std::make_pair( + validator, + Slot(handler_, app_.journal("Slot")))) + .first; + it->second.update(validator, id, type); + } + else + it->second.update(validator, id, type); +} + +template +void +Slots::deletePeer(id_t id, bool erase) +{ + for (auto& [validator, slot] : slots_) + slot.deletePeer(validator, id, erase); +} + +template +void +Slots::deleteIdlePeers() +{ + auto now = clock_type::now(); + + for (auto it = slots_.begin(); it != slots_.end();) + { + it->second.deleteIdlePeer(it->first); + if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE) + { + JLOG(journal_.debug()) + << "deleteIdlePeers: deleting idle slot " << Slice(it->first); + it = slots_.erase(it); + } + else + ++it; + } +} + +} // namespace squelch + +} // namespace ripple + +#endif // RIPPLE_OVERLAY_SLOT_H_INCLUDED diff --git a/src/ripple/overlay/Squelch.h b/src/ripple/overlay/Squelch.h new file mode 100644 index 0000000000..ab2fa65fba --- /dev/null +++ b/src/ripple/overlay/Squelch.h @@ -0,0 +1,122 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_SQUELCH_H_INCLUDED +#define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED + +#include +#include +#include + +#include +#include + +namespace ripple { + +namespace squelch { + +/** Maintains squelching of relaying messages from validators */ +template +class Squelch +{ + using time_point = typename clock_type::time_point; + +public: + Squelch() = default; + virtual ~Squelch() = default; + + /** Squelch/Unsquelch relaying for the validator + * @param validator The validator's public key + * @param squelch Squelch/unsquelch flag + * @param squelchDuration Squelch duration time if squelch is true + */ + void + squelch(PublicKey const& validator, bool squelch, uint64_t squelchDuration); + + /** Are the messages to this validator squelched + * @param validator Validator's public key + * @return true if squelched + */ + bool + isSquelched(PublicKey const& validator); + + /** Get random squelch duration between MIN_UNSQUELCH_EXPIRE and + * MAX_UNSQUELCH_EXPIRE */ + static seconds + getSquelchDuration(); + +private: + /** Maintains the list of squelched relaying to downstream peers. + * Expiration time is included in the TMSquelch message. */ + hash_map squelched_; +}; + +template +void +Squelch::squelch( + PublicKey const& validator, + bool squelch, + uint64_t squelchDuration) +{ + if (squelch) + { + squelched_[validator] = [squelchDuration]() { + seconds duration = seconds(squelchDuration); + return clock_type::now() + + ((duration >= MIN_UNSQUELCH_EXPIRE && + duration <= MAX_UNSQUELCH_EXPIRE) + ? duration + : getSquelchDuration()); + }(); + } + else + squelched_.erase(validator); +} + +template +bool +Squelch::isSquelched(PublicKey const& validator) +{ + auto now = clock_type::now(); + + auto const& it = squelched_.find(validator); + if (it == squelched_.end()) + return false; + else if (it->second > now) + return true; + + squelched_.erase(it); + + return false; +} + +template +seconds +Squelch::getSquelchDuration() +{ + auto d = seconds(ripple::rand_int( + MIN_UNSQUELCH_EXPIRE.count(), MAX_UNSQUELCH_EXPIRE.count())); + return d; +} + +} // namespace squelch + +} // namespace ripple + +#endif // RIPPLED_SQUELCH_H diff --git a/src/ripple/overlay/SquelchCommon.h b/src/ripple/overlay/SquelchCommon.h new file mode 100644 index 0000000000..dea68a72d2 --- /dev/null +++ b/src/ripple/overlay/SquelchCommon.h @@ -0,0 +1,52 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_SQUELCHCOMMON_H_INCLUDED +#define RIPPLE_OVERLAY_SQUELCHCOMMON_H_INCLUDED +#include + +namespace ripple { + +namespace squelch { + +using namespace std::chrono; + +// Peer's squelch is limited in time to +// rand{MIN_UNSQUELCH_EXPIRE, MAX_UNSQUELCH_EXPIRE} +static constexpr seconds MIN_UNSQUELCH_EXPIRE = seconds{300}; +static constexpr seconds MAX_UNSQUELCH_EXPIRE = seconds{600}; +// No message received threshold before identifying a peer as idled +static constexpr seconds IDLED = seconds{8}; +// Message count threshold to start selecting peers as the source +// of messages from the validator. We add peers who reach +// MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS +// reach MAX_MESSAGE_THRESHOLD. +static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 9; +static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 10; +// Max selected peers to choose as the source of messages from validator +static constexpr uint16_t MAX_SELECTED_PEERS = 3; +// Wait before reduce-relay feature is enabled on boot up to let +// the server establish peer connections +static constexpr minutes WAIT_ON_BOOTUP = minutes{10}; + +} // namespace squelch + +} // namespace ripple + +#endif // RIPPLED_SQUELCHCOMMON_H diff --git a/src/ripple/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp index 29440c44e4..b5b24c3d3e 100644 --- a/src/ripple/overlay/impl/Message.cpp +++ b/src/ripple/overlay/impl/Message.cpp @@ -23,8 +23,12 @@ namespace ripple { -Message::Message(::google::protobuf::Message const& message, int type) +Message::Message( + ::google::protobuf::Message const& message, + int type, + boost::optional const& validator) : category_(TrafficCount::categorize(message, type, false)) + , validatorKey_(validator) { using namespace ripple::compression; diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 9e8d58bc7e..af8c90711e 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -103,6 +103,9 @@ OverlayImpl::Timer::on_timer(error_code ec) if ((++overlay_.timer_count_ % Tuning::checkSeconds) == 0) overlay_.check(); + if ((overlay_.timer_count_ % Tuning::checkIdlePeers) == 0) + overlay_.deleteIdlePeers(); + timer_.expires_from_now(std::chrono::seconds(1)); timer_.async_wait(overlay_.strand_.wrap(std::bind( &Timer::on_timer, shared_from_this(), std::placeholders::_1))); @@ -139,6 +142,7 @@ OverlayImpl::OverlayImpl( , m_resolver(resolver) , next_id_(1) , timer_count_(0) + , slots_(app, *this) , m_stats( std::bind(&OverlayImpl::collect_metrics, this), collector, @@ -1216,7 +1220,7 @@ OverlayImpl::check() } std::shared_ptr -OverlayImpl::findPeerByShortID(Peer::id_t const& id) +OverlayImpl::findPeerByShortID(Peer::id_t const& id) const { std::lock_guard lock(mutex_); auto const iter = ids_.find(id); @@ -1249,18 +1253,23 @@ OverlayImpl::broadcast(protocol::TMProposeSet& m) for_each([&](std::shared_ptr&& p) { p->send(sm); }); } -void -OverlayImpl::relay(protocol::TMProposeSet& m, uint256 const& uid) +std::set +OverlayImpl::relay( + protocol::TMProposeSet& m, + uint256 const& uid, + PublicKey const& validator) { if (auto const toSkip = app_.getHashRouter().shouldRelay(uid)) { auto const sm = - std::make_shared(m, protocol::mtPROPOSE_LEDGER); + std::make_shared(m, protocol::mtPROPOSE_LEDGER, validator); for_each([&](std::shared_ptr&& p) { if (toSkip->find(p->id()) == toSkip->end()) p->send(sm); }); + return *toSkip; } + return {}; } void @@ -1270,17 +1279,23 @@ OverlayImpl::broadcast(protocol::TMValidation& m) for_each([sm](std::shared_ptr&& p) { p->send(sm); }); } -void -OverlayImpl::relay(protocol::TMValidation& m, uint256 const& uid) +std::set +OverlayImpl::relay( + protocol::TMValidation& m, + uint256 const& uid, + PublicKey const& validator) { if (auto const toSkip = app_.getHashRouter().shouldRelay(uid)) { - auto const sm = std::make_shared(m, protocol::mtVALIDATION); + auto const sm = + std::make_shared(m, protocol::mtVALIDATION, validator); for_each([&](std::shared_ptr&& p) { if (toSkip->find(p->id()) == toSkip->end()) p->send(sm); }); + return *toSkip; } + return {}; } //------------------------------------------------------------------------------ @@ -1352,6 +1367,100 @@ OverlayImpl::sendEndpoints() } } +std::shared_ptr +makeSquelchMessage( + PublicKey const& validator, + bool squelch, + uint64_t squelchDuration) +{ + protocol::TMSquelch m; + m.set_squelch(squelch); + m.set_validatorpubkey(validator.data(), validator.size()); + if (squelch) + m.set_squelchduration(squelchDuration); + return std::make_shared(m, protocol::mtSQUELCH); +} + +void +OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const +{ + if (auto peer = findPeerByShortID(id); + peer && app_.config().REDUCE_RELAY_SQUELCH) + { + // optimize - multiple message with different + // validator might be sent to the same peer + auto m = makeSquelchMessage(validator, false, 0); + peer->send(m); + } +} + +void +OverlayImpl::squelch( + PublicKey const& validator, + Peer::id_t id, + uint32_t squelchDuration) const +{ + if (auto peer = findPeerByShortID(id); + peer && app_.config().REDUCE_RELAY_SQUELCH) + { + auto m = makeSquelchMessage(validator, true, squelchDuration); + peer->send(m); + } +} + +void +OverlayImpl::updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + std::set&& peers, + protocol::MessageType type) +{ + if (!strand_.running_in_this_thread()) + return post( + strand_, + [this, key, validator, peers = std::move(peers), type]() mutable { + updateSlotAndSquelch(key, validator, std::move(peers), type); + }); + + for (auto id : peers) + slots_.updateSlotAndSquelch(key, validator, id, type); +} + +void +OverlayImpl::updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + Peer::id_t peer, + protocol::MessageType type) +{ + if (!strand_.running_in_this_thread()) + return post(strand_, [this, key, validator, peer, type]() { + updateSlotAndSquelch(key, validator, peer, type); + }); + + slots_.updateSlotAndSquelch(key, validator, peer, type); +} + +void +OverlayImpl::deletePeer(Peer::id_t id) +{ + if (!strand_.running_in_this_thread()) + return post(strand_, std::bind(&OverlayImpl::deletePeer, this, id)); + + slots_.deletePeer(id, true); +} + +void +OverlayImpl::deleteIdlePeers() +{ + if (!strand_.running_in_this_thread()) + return post(strand_, std::bind(&OverlayImpl::deleteIdlePeers, this)); + + slots_.deleteIdlePeers(); +} + +//------------------------------------------------------------------------------ + Overlay::Setup setup_Overlay(BasicConfig const& config) { diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 099b90556d..abbae3c925 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -52,7 +53,7 @@ namespace ripple { class PeerImp; class BasicConfig; -class OverlayImpl : public Overlay +class OverlayImpl : public Overlay, public squelch::SquelchHandler { public: class Child @@ -124,6 +125,8 @@ private: boost::optional networkID_; + squelch::Slots slots_; + //-------------------------------------------------------------------------- public: @@ -195,7 +198,7 @@ public: void checkSanity(std::uint32_t) override; std::shared_ptr - findPeerByShortID(Peer::id_t const& id) override; + findPeerByShortID(Peer::id_t const& id) const override; std::shared_ptr findPeerByPublicKey(PublicKey const& pubKey) override; @@ -206,11 +209,17 @@ public: void broadcast(protocol::TMValidation& m) override; - void - relay(protocol::TMProposeSet& m, uint256 const& uid) override; + std::set + relay( + protocol::TMProposeSet& m, + uint256 const& uid, + PublicKey const& validator) override; - void - relay(protocol::TMValidation& m, uint256 const& uid) override; + std::set + relay( + protocol::TMValidation& m, + uint256 const& uid, + PublicKey const& validator) override; //-------------------------------------------------------------------------- // @@ -364,7 +373,49 @@ public: void lastLink(std::uint32_t id); + /** Updates message count for validator/peer. Sends TMSquelch if the number + * of messages for N peers reaches threshold T. A message is counted + * if a peer receives the message for the first time and if + * the message has been relayed. + * @param key Unique message's key + * @param validator Validator's public key + * @param peers Peers' id to update the slots for + * @param type Received protocol message type + */ + void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + std::set&& peers, + protocol::MessageType type); + + /** Overload to reduce allocation in case of single peer + */ + void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + Peer::id_t peer, + protocol::MessageType type); + + /** Called when the peer is deleted. If the peer was selected to be the + * source of messages from the validator then squelched peers have to be + * unsquelched. + * @param id Peer's id + */ + void + deletePeer(Peer::id_t id); + private: + void + squelch( + PublicKey const& validator, + Peer::id_t const id, + std::uint32_t squelchDuration) const override; + + void + unsquelch(PublicKey const& validator, Peer::id_t id) const override; + std::shared_ptr makeRedirectResponse( std::shared_ptr const& slot, @@ -481,6 +532,11 @@ private: void sendEndpoints(); + /** Check if peers stopped relaying messages + * and if slots stopped receiving messages from the validator */ + void + deleteIdlePeers(); + private: struct TrafficGauges { diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index b3c00894d8..3e0c1ae26c 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -100,6 +100,7 @@ PeerImp::~PeerImp() { const bool inCluster{cluster()}; + overlay_.deletePeer(id_); if (state_ == State::active) overlay_.onPeerDeactivate(id_); overlay_.peerFinder().on_closed(slot_); @@ -226,6 +227,10 @@ PeerImp::send(std::shared_ptr const& m) if (detaching_) return; + auto validator = m->getValidatorKey(); + if (validator && squelch_.isSquelched(*validator)) + return; + overlay_.reportTraffic( safe_cast(m->getCategory()), false, @@ -1647,8 +1652,20 @@ PeerImp::onMessage(std::shared_ptr const& m) publicKey.slice(), sig); - if (!app_.getHashRouter().addSuppressionPeer(suppression, id_)) + if (auto [added, relayed] = + app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_); + !added) { + // Count unique messages (Slots has it's own 'HashRouter'), which a peer + // receives within IDLED seconds since the message has been relayed. + // Wait WAIT_ON_BOOTUP time to let the server establish connections to + // peers. + if (app_.config().REDUCE_RELAY_ENABLE && relayed && + (stopwatch().now() - *relayed) < squelch::IDLED && + squelch::epoch(UptimeClock::now()) > + squelch::WAIT_ON_BOOTUP) + overlay_.updateSlotAndSquelch( + suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER); JLOG(p_journal_.trace()) << "Proposal: duplicate"; return; } @@ -2146,9 +2163,21 @@ PeerImp::onMessage(std::shared_ptr const& m) return; } - if (!app_.getHashRouter().addSuppressionPeer( - sha512Half(makeSlice(m->validation())), id_)) + auto key = sha512Half(makeSlice(m->validation())); + if (auto [added, relayed] = + app_.getHashRouter().addSuppressionPeerWithStatus(key, id_); + !added) { + // Count unique messages (Slots has it's own 'HashRouter'), which a + // peer receives within IDLED seconds since the message has been + // relayed. Wait WAIT_ON_BOOTUP time to let the server establish + // connections to peers. + if (app_.config().REDUCE_RELAY_ENABLE && (bool)relayed && + (stopwatch().now() - *relayed) < squelch::IDLED && + squelch::epoch(UptimeClock::now()) > + squelch::WAIT_ON_BOOTUP) + overlay_.updateSlotAndSquelch( + key, val->getSignerPublic(), id_, protocol::mtVALIDATION); JLOG(p_journal_.trace()) << "Validation: duplicate"; return; } @@ -2324,6 +2353,45 @@ PeerImp::onMessage(std::shared_ptr const& m) } } +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + if (!m->has_validatorpubkey()) + { + charge(Resource::feeBadData); + return; + } + auto validator = m->validatorpubkey(); + auto const slice{makeSlice(validator)}; + if (!publicKeyType(slice)) + { + charge(Resource::feeBadData); + return; + } + PublicKey key(slice); + auto squelch = m->squelch(); + auto duration = m->has_squelchduration() ? m->squelchduration() : 0; + auto sp = shared_from_this(); + + // Ignore the squelch for validator's own messages. + if (key == app_.getValidationPublicKey()) + { + JLOG(p_journal_.debug()) + << "onMessage: TMSquelch discarding validator's squelch " << slice; + return; + } + + if (!strand_.running_in_this_thread()) + return post(strand_, [sp, key, squelch, duration]() { + sp->squelch_.squelch(key, squelch, duration); + }); + + JLOG(p_journal_.debug()) + << "onMessage: TMSquelch " << slice << " " << id() << " " << duration; + + squelch_.squelch(key, squelch, duration); +} + //-------------------------------------------------------------------------- void @@ -2478,7 +2546,22 @@ PeerImp::checkPropose( relay = app_.config().RELAY_UNTRUSTED_PROPOSALS || cluster(); if (relay) - app_.overlay().relay(*packet, peerPos.suppressionID()); + { + // haveMessage contains peers, which are suppressed; i.e. the peers + // are the source of the message, consequently the message should + // not be relayed to these peers. But the message must be counted + // as part of the squelch logic. + auto haveMessage = app_.overlay().relay( + *packet, peerPos.suppressionID(), peerPos.publicKey()); + if (app_.config().REDUCE_RELAY_ENABLE && !haveMessage.empty() && + squelch::epoch(UptimeClock::now()) > + squelch::WAIT_ON_BOOTUP) + overlay_.updateSlotAndSquelch( + peerPos.suppressionID(), + peerPos.publicKey(), + std::move(haveMessage), + protocol::mtPROPOSE_LEDGER); + } } void @@ -2501,7 +2584,22 @@ PeerImp::checkValidation( { auto const suppression = sha512Half(makeSlice(val->getSerialized())); - overlay_.relay(*packet, suppression); + // haveMessage contains peers, which are suppressed; i.e. the peers + // are the source of the message, consequently the message should + // not be relayed to these peers. But the message must be counted + // as part of the squelch logic. + auto haveMessage = + overlay_.relay(*packet, suppression, val->getSignerPublic()); + if (app_.config().REDUCE_RELAY_ENABLE && !haveMessage.empty() && + squelch::epoch(UptimeClock::now()) > + squelch::WAIT_ON_BOOTUP) + { + overlay_.updateSlotAndSquelch( + suppression, + val->getSignerPublic(), + std::move(haveMessage), + protocol::mtVALIDATION); + } } } catch (std::exception const&) diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 4b279dea65..e9937cdb06 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -142,6 +143,8 @@ private: clock_type::time_point lastPingTime_; clock_type::time_point const creationTime_; + squelch::Squelch squelch_; + // Notes on thread locking: // // During an audit it was noted that some member variables that looked @@ -549,6 +552,8 @@ public: onMessage(std::shared_ptr const& m); void onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); private: State diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index b929f91ba4..eb7dd18f09 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -77,6 +77,8 @@ protocolMessageName(int type) return "validation"; case protocol::mtGET_OBJECTS: return "get_objects"; + case protocol::mtSQUELCH: + return "squelch"; default: break; } @@ -375,6 +377,10 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler) success = detail::invoke( *header, buffers, handler); break; + case protocol::mtSQUELCH: + success = + detail::invoke(*header, buffers, handler); + break; default: handler.onMessageUnknown(header->message_type); success = true; diff --git a/src/ripple/overlay/impl/Tuning.h b/src/ripple/overlay/impl/Tuning.h index 0920c77e3a..bd199fc0a7 100644 --- a/src/ripple/overlay/impl/Tuning.h +++ b/src/ripple/overlay/impl/Tuning.h @@ -71,6 +71,9 @@ enum { /** How often to log send queue size */ sendQueueLogFreq = 64, + + /** How often we check for idle peers (seconds) */ + checkIdlePeers = 4, }; /** The threshold above which we treat a peer connection as high latency */ diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 6038dac10a..fea849d89c 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -23,6 +23,7 @@ enum MessageType mtGET_PEER_SHARD_INFO = 52; mtPEER_SHARD_INFO = 53; mtVALIDATORLIST = 54; + mtSQUELCH = 55; } // token, iterations, target, challenge = issue demand for proof of work @@ -356,3 +357,10 @@ message TMPing optional uint64 netTime = 4; } +message TMSquelch +{ + required bool squelch = 1; // squelch if true, otherwise unsquelch + required bytes validatorPubKey = 2; // validator's public key + optional uint32 squelchDuration = 3; // squelch duration in milliseconds +} + diff --git a/src/test/app/HashRouter_test.cpp b/src/test/app/HashRouter_test.cpp index 4e5c7e2696..9162fb9b1e 100644 --- a/src/test/app/HashRouter_test.cpp +++ b/src/test/app/HashRouter_test.cpp @@ -191,7 +191,7 @@ class HashRouter_test : public beast::unit_test::suite uint256 const key1(1); - boost::optional> peers; + std::optional> peers; peers = router.shouldRelay(key1); BEAST_EXPECT(peers && peers->empty()); diff --git a/src/test/overlay/reduce_relay_test.cpp b/src/test/overlay/reduce_relay_test.cpp new file mode 100644 index 0000000000..5e8f05d63b --- /dev/null +++ b/src/test/overlay/reduce_relay_test.cpp @@ -0,0 +1,1414 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +namespace ripple { + +namespace test { + +using namespace std::chrono; + +class Link; + +using MessageSPtr = std::shared_ptr; +using LinkSPtr = std::shared_ptr; +using PeerSPtr = std::shared_ptr; +using PeerWPtr = std::weak_ptr; +using SquelchCB = + std::function; +using UnsquelchCB = std::function; +using LinkIterCB = std::function; + +static constexpr std::uint32_t MAX_PEERS = 10; +static constexpr std::uint32_t MAX_VALIDATORS = 10; +static constexpr std::uint32_t MAX_MESSAGES = 200000; + +/** Simulate two entities - peer directly connected to the server + * (via squelch in PeerSim) and PeerImp (via Overlay) + */ +class PeerPartial : public Peer +{ +public: + virtual ~PeerPartial() + { + } + virtual void + onMessage(MessageSPtr const& m, SquelchCB f) = 0; + virtual void + onMessage(protocol::TMSquelch const& squelch) = 0; + void + send(protocol::TMSquelch const& squelch) + { + onMessage(squelch); + } + + // dummy implementation + void + send(std::shared_ptr const& m) override + { + } + beast::IP::Endpoint + getRemoteAddress() const override + { + return {}; + } + void + charge(Resource::Charge const& fee) override + { + } + bool + cluster() const override + { + return false; + } + bool + isHighLatency() const override + { + return false; + } + int + getScore(bool) const override + { + return 0; + } + PublicKey const& + getNodePublic() const override + { + static PublicKey key{}; + return key; + } + Json::Value + json() override + { + return {}; + } + bool + supportsFeature(ProtocolFeature f) const override + { + return false; + } + boost::optional + publisherListSequence(PublicKey const&) const override + { + return {}; + } + void + setPublisherListSequence(PublicKey const&, std::size_t const) override + { + } + uint256 const& + getClosedLedgerHash() const override + { + static uint256 hash{}; + return hash; + } + bool + hasLedger(uint256 const& hash, std::uint32_t seq) const override + { + return false; + } + void + ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const override + { + } + bool + hasShard(std::uint32_t shardIndex) const override + { + return false; + } + bool + hasTxSet(uint256 const& hash) const override + { + return false; + } + void + cycleStatus() override + { + } + bool + hasRange(std::uint32_t uMin, std::uint32_t uMax) override + { + return false; + } + bool + compressionEnabled() const override + { + return false; + } +}; + +/** Manually advanced clock. */ +class ManualClock +{ +public: + typedef uint64_t rep; + typedef std::milli period; + typedef std::chrono::duration duration; + typedef std::chrono::time_point time_point; + inline static const bool is_steady = false; + + static void + advance(duration d) noexcept + { + now_ += d; + } + + static void + randAdvance(milliseconds min, milliseconds max) + { + now_ += randDuration(min, max); + } + + static void + reset() noexcept + { + now_ = time_point(seconds(0)); + } + + static time_point + now() noexcept + { + return now_; + } + + static duration + randDuration(milliseconds min, milliseconds max) + { + return duration(milliseconds(rand_int(min.count(), max.count()))); + } + + explicit ManualClock() = default; + +private: + inline static time_point now_ = time_point(seconds(0)); +}; + +/** Simulate server's OverlayImpl */ +class Overlay +{ +public: + Overlay() = default; + virtual ~Overlay() = default; + + virtual void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + Peer::id_t id, + SquelchCB f, + protocol::MessageType type = protocol::mtVALIDATION) = 0; + + virtual void deleteIdlePeers(UnsquelchCB) = 0; + + virtual void deletePeer(Peer::id_t, UnsquelchCB) = 0; +}; + +class Validator; + +/** Simulate link from a validator to a peer directly connected + * to the server. + */ +class Link +{ + using Latency = std::pair; + +public: + Link( + Validator& validator, + PeerSPtr peer, + Latency const& latency = {milliseconds(5), milliseconds(15)}) + : validator_(validator), peer_(peer), latency_(latency), up_(true) + { + auto sp = peer_.lock(); + assert(sp); + } + ~Link() = default; + void + send(MessageSPtr const& m, SquelchCB f) + { + if (!up_) + return; + auto sp = peer_.lock(); + assert(sp); + auto peer = std::dynamic_pointer_cast(sp); + peer->onMessage(m, f); + } + Validator& + validator() + { + return validator_; + } + void + up(bool linkUp) + { + up_ = linkUp; + } + Peer::id_t + peerId() + { + auto p = peer_.lock(); + assert(p); + return p->id(); + } + PeerSPtr + getPeer() + { + auto p = peer_.lock(); + assert(p); + return p; + } + +private: + Validator& validator_; + PeerWPtr peer_; + Latency latency_; + bool up_; +}; + +/** Simulate Validator */ +class Validator +{ + using Links = std::unordered_map; + +public: + Validator() + { + pkey_ = std::get<0>(randomKeyPair(KeyType::ed25519)); + protocol::TMValidation v; + v.set_validation("validation"); + message_ = std::make_shared(v, protocol::mtVALIDATION, pkey_); + id_ = sid_++; + } + Validator(Validator const&) = default; + Validator(Validator&&) = default; + Validator& + operator=(Validator const&) = default; + Validator& + operator=(Validator&&) = default; + ~Validator() + { + clear(); + } + + void + clear() + { + links_.clear(); + } + + static void + resetId() + { + sid_ = 0; + } + + PublicKey const& + key() + { + return pkey_; + } + + operator PublicKey() const + { + return pkey_; + } + + void + addPeer(PeerSPtr peer) + { + links_.emplace( + std::make_pair(peer->id(), std::make_shared(*this, peer))); + } + + void + deletePeer(Peer::id_t id) + { + links_.erase(id); + } + + void + for_links(std::vector peers, LinkIterCB f) + { + for (auto id : peers) + { + assert(links_.find(id) != links_.end()); + f(*links_[id], message_); + } + } + + void + for_links(LinkIterCB f, bool simulateSlow = false) + { + std::vector v; + std::transform( + links_.begin(), links_.end(), std::back_inserter(v), [](auto& kv) { + return kv.second; + }); + std::random_device d; + std::mt19937 g(d()); + std::shuffle(v.begin(), v.end(), g); + + for (auto& link : v) + { + f(*link, message_); + } + } + + /** Send to specific peers */ + void + send(std::vector peers, SquelchCB f) + { + for_links(peers, [&](Link& link, MessageSPtr m) { link.send(m, f); }); + } + + /** Send to all peers */ + void + send(SquelchCB f) + { + for_links([&](Link& link, MessageSPtr m) { link.send(m, f); }); + } + + MessageSPtr + message() + { + return message_; + } + + std::uint16_t + id() + { + return id_; + } + + void + linkUp(Peer::id_t id) + { + auto it = links_.find(id); + assert(it != links_.end()); + it->second->up(true); + } + + void + linkDown(Peer::id_t id) + { + auto it = links_.find(id); + assert(it != links_.end()); + it->second->up(false); + } + +private: + Links links_; + PublicKey pkey_{}; + MessageSPtr message_ = nullptr; + inline static std::uint16_t sid_ = 0; + std::uint16_t id_ = 0; +}; + +class PeerSim : public PeerPartial, public std::enable_shared_from_this +{ +public: + using id_t = Peer::id_t; + PeerSim(Overlay& overlay) : overlay_(overlay) + { + id_ = sid_++; + } + + ~PeerSim() = default; + + id_t + id() const override + { + return id_; + } + + static void + resetId() + { + sid_ = 0; + } + + /** Local Peer (PeerImp) */ + void + onMessage(MessageSPtr const& m, SquelchCB f) override + { + auto validator = m->getValidatorKey(); + assert(validator); + if (squelch_.isSquelched(*validator)) + return; + + overlay_.updateSlotAndSquelch({}, *validator, id(), f); + } + + /** Remote Peer (Directly connected Peer) */ + virtual void + onMessage(protocol::TMSquelch const& squelch) override + { + auto validator = squelch.validatorpubkey(); + PublicKey key(Slice(validator.data(), validator.size())); + squelch_.squelch(key, squelch.squelch(), squelch.squelchduration()); + } + +private: + inline static id_t sid_ = 0; + id_t id_; + Overlay& overlay_; + squelch::Squelch squelch_; +}; + +class OverlaySim : public Overlay, public squelch::SquelchHandler +{ + using Peers = std::unordered_map; + +public: + using id_t = Peer::id_t; + using clock_type = ManualClock; + OverlaySim(Application& app) : slots_(app, *this) + { + } + + ~OverlaySim() = default; + + void + clear() + { + peers_.clear(); + ManualClock::advance(hours(1)); + slots_.deleteIdlePeers(); + } + + std::uint16_t + inState(PublicKey const& validator, squelch::PeerState state) + { + auto res = slots_.inState(validator, state); + return res ? *res : 0; + } + + void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + Peer::id_t id, + SquelchCB f, + protocol::MessageType type = protocol::mtVALIDATION) override + { + squelch_ = f; + slots_.updateSlotAndSquelch(key, validator, id, type); + } + + void + deletePeer(id_t id, UnsquelchCB f) override + { + unsquelch_ = f; + slots_.deletePeer(id, true); + } + + void + deleteIdlePeers(UnsquelchCB f) override + { + unsquelch_ = f; + slots_.deleteIdlePeers(); + } + + PeerSPtr + addPeer(bool useCache = true) + { + PeerSPtr peer{}; + Peer::id_t id; + if (peersCache_.empty() || !useCache) + { + peer = std::make_shared(*this); + id = peer->id(); + } + else + { + auto it = peersCache_.begin(); + peer = it->second; + id = it->first; + peersCache_.erase(it); + } + peers_.emplace(std::make_pair(id, peer)); + return peer; + } + + void + deletePeer(Peer::id_t id, bool useCache = true) + { + auto it = peers_.find(id); + assert(it != peers_.end()); + deletePeer(id, [&](PublicKey const&, PeerWPtr) {}); + if (useCache) + peersCache_.emplace(std::make_pair(id, it->second)); + peers_.erase(it); + } + + void + resetPeers() + { + while (!peers_.empty()) + deletePeer(peers_.begin()->first); + while (!peersCache_.empty()) + addPeer(); + } + + std::optional + deleteLastPeer() + { + if (peers_.empty()) + return {}; + + std::uint8_t maxId = 0; + + for (auto& [id, _] : peers_) + { + (void)_; + if (id > maxId) + maxId = id; + } + + deletePeer(maxId, false); + + return maxId; + } + + bool + isCountingState(PublicKey const& validator) + { + return slots_.inState(validator, squelch::SlotState::Counting); + } + + std::set + getSelected(PublicKey const& validator) + { + return slots_.getSelected(validator); + } + + bool + isSelected(PublicKey const& validator, Peer::id_t peer) + { + auto selected = slots_.getSelected(validator); + return selected.find(peer) != selected.end(); + } + + id_t + getSelectedPeer(PublicKey const& validator) + { + auto selected = slots_.getSelected(validator); + assert(selected.size()); + return *selected.begin(); + } + + std::unordered_map< + id_t, + std::tuple< + squelch::PeerState, + std::uint16_t, + std::uint32_t, + std::uint32_t>> + getPeers(PublicKey const& validator) + { + return slots_.getPeers(validator); + } + + std::uint16_t + getNumPeers() const + { + return peers_.size(); + } + +private: + void + squelch( + PublicKey const& validator, + Peer::id_t id, + std::uint32_t squelchDuration) const override + { + if (auto it = peers_.find(id); it != peers_.end()) + squelch_(validator, it->second, squelchDuration); + } + void + unsquelch(PublicKey const& validator, Peer::id_t id) const override + { + if (auto it = peers_.find(id); it != peers_.end()) + unsquelch_(validator, it->second); + } + SquelchCB squelch_; + UnsquelchCB unsquelch_; + Peers peers_; + Peers peersCache_; + squelch::Slots slots_; +}; + +class Network +{ +public: + Network(Application& app) : overlay_(app) + { + init(); + } + + void + init() + { + validators_.resize(MAX_VALIDATORS); + for (int p = 0; p < MAX_PEERS; p++) + { + auto peer = overlay_.addPeer(); + for (auto& v : validators_) + v.addPeer(peer); + } + } + + ~Network() = default; + + void + reset() + { + validators_.clear(); + overlay_.clear(); + PeerSim::resetId(); + Validator::resetId(); + init(); + } + + Peer::id_t + addPeer() + { + auto peer = overlay_.addPeer(); + for (auto& v : validators_) + v.addPeer(peer); + return peer->id(); + } + + void + deleteLastPeer() + { + auto id = overlay_.deleteLastPeer(); + + if (!id) + return; + + for (auto& validator : validators_) + validator.deletePeer(*id); + } + + void + purgePeers() + { + while (overlay_.getNumPeers() > MAX_PEERS) + deleteLastPeer(); + } + + Validator& + validator(std::uint16_t v) + { + assert(v < validators_.size()); + return validators_[v]; + } + + OverlaySim& + overlay() + { + return overlay_; + } + + void + enableLink(std::uint16_t validatorId, Peer::id_t peer, bool enable) + { + auto it = + std::find_if(validators_.begin(), validators_.end(), [&](auto& v) { + return v.id() == validatorId; + }); + assert(it != validators_.end()); + if (enable) + it->linkUp(peer); + else + it->linkDown(peer); + } + + void + onDisconnectPeer(Peer::id_t peer) + { + // Send unsquelch to the Peer on all links. This way when + // the Peer "reconnects" it starts sending messages on the link. + // We expect that if a Peer disconnects and then reconnects, it's + // unsquelched. + protocol::TMSquelch squelch; + squelch.set_squelch(false); + for (auto& v : validators_) + { + PublicKey key = v; + squelch.clear_validatorpubkey(); + squelch.set_validatorpubkey(key.data(), key.size()); + v.for_links({peer}, [&](Link& l, MessageSPtr) { + std::dynamic_pointer_cast(l.getPeer())->send(squelch); + }); + } + } + + void + for_rand( + std::uint32_t min, + std::uint32_t max, + std::function f) + { + auto size = max - min; + std::vector s(size); + std::iota(s.begin(), s.end(), min); + std::random_device d; + std::mt19937 g(d()); + std::shuffle(s.begin(), s.end(), g); + for (auto v : s) + f(v); + } + + void + propagate( + LinkIterCB link, + std::uint16_t nValidators = MAX_VALIDATORS, + std::uint32_t nMessages = MAX_MESSAGES, + bool purge = true, + bool resetClock = true) + { + if (resetClock) + ManualClock::reset(); + + if (purge) + { + purgePeers(); + overlay_.resetPeers(); + } + + for (int m = 0; m < nMessages; ++m) + { + ManualClock::randAdvance(milliseconds(1800), milliseconds(2200)); + for_rand(0, nValidators, [&](std::uint32_t v) { + validators_[v].for_links(link); + }); + } + } + + /** Is peer in Selected state in any of the slots */ + bool + isSelected(Peer::id_t id) + { + for (auto& v : validators_) + { + if (overlay_.isSelected(v, id)) + return true; + } + return false; + } + + /** Check if there are peers to unsquelch - peer is in Selected + * state in any of the slots and there are peers in Squelched state + * in those slots. + */ + bool + allCounting(Peer::id_t peer) + { + for (auto& v : validators_) + { + if (!overlay_.isSelected(v, peer)) + continue; + auto peers = overlay_.getPeers(v); + for (auto& [_, v] : peers) + { + (void)_; + if (std::get(v) == + squelch::PeerState::Squelched) + return false; + } + } + return true; + } + +private: + OverlaySim overlay_; + std::vector validators_; +}; + +class reduce_relay_test : public beast::unit_test::suite +{ + using Slot = squelch::Slot; + using id_t = Peer::id_t; + +protected: + void + printPeers(const std::string& msg, std::uint16_t validator = 0) + { + auto peers = network_.overlay().getPeers(network_.validator(validator)); + std::cout << msg << " " + << "num peers " << (int)network_.overlay().getNumPeers() + << std::endl; + for (auto& [k, v] : peers) + std::cout << k << ":" << (int)std::get(v) + << " "; + std::cout << std::endl; + } + + /** Send squelch (if duration is set) or unsquelch (if duration not set) */ + Peer::id_t + sendSquelch( + PublicKey const& validator, + PeerWPtr const& peerPtr, + std::optional duration) + { + protocol::TMSquelch squelch; + bool res = duration ? true : false; + squelch.set_squelch(res); + squelch.set_validatorpubkey(validator.data(), validator.size()); + if (res) + squelch.set_squelchduration(*duration); + auto sp = peerPtr.lock(); + assert(sp); + std::dynamic_pointer_cast(sp)->send(squelch); + return sp->id(); + } + + enum State { On, Off, WaitReset }; + enum EventType { LinkDown = 0, PeerDisconnected = 1 }; + // Link down or Peer disconnect event + // TBD - add new peer event + // TBD - add overlapping type of events at any + // time in any quantity + struct Event + { + State state_ = State::Off; + std::uint32_t cnt_ = 0; + std::uint32_t handledCnt_ = 0; + bool isSelected_ = false; + Peer::id_t peer_; + std::uint16_t validator_; + PublicKey key_; + time_point time_; + bool handled_ = false; + }; + + /** Randomly brings the link between a validator and a peer down. + * Randomly disconnects a peer. Those events are generated one at a time. + */ + void + random(bool log) + { + std::unordered_map events{ + {LinkDown, {}}, {PeerDisconnected, {}}}; + time_point lastCheck = ManualClock::now(); + + network_.reset(); + network_.propagate([&](Link& link, MessageSPtr m) { + auto& validator = link.validator(); + auto now = ManualClock::now(); + + bool squelched = false; + std::stringstream str; + + link.send( + m, + [&](PublicKey const& key, + PeerWPtr const& peerPtr, + std::uint32_t duration) { + assert(key == validator); + auto p = sendSquelch(key, peerPtr, duration); + squelched = true; + str << p << " "; + }); + + if (squelched) + { + auto selected = network_.overlay().getSelected(validator); + str << " selected: "; + for (auto s : selected) + str << s << " "; + if (log) + std::cout + << (double)squelch::epoch(now).count() / + 1000. + << " random, squelched, validator: " << validator.id() + << " peers: " << str.str() << std::endl; + auto countingState = + network_.overlay().isCountingState(validator); + BEAST_EXPECT( + countingState == false && + selected.size() == squelch::MAX_SELECTED_PEERS); + } + + // Trigger Link Down or Peer Disconnect event + // Only one Link Down at a time + if (events[EventType::LinkDown].state_ == State::Off) + { + auto update = [&](EventType event) { + events[event].cnt_++; + events[event].validator_ = validator.id(); + events[event].key_ = validator; + events[event].peer_ = link.peerId(); + events[event].state_ = State::On; + events[event].time_ = now; + if (event == EventType::LinkDown) + { + network_.enableLink( + validator.id(), link.peerId(), false); + events[event].isSelected_ = + network_.overlay().isSelected( + validator, link.peerId()); + } + else + events[event].isSelected_ = + network_.isSelected(link.peerId()); + }; + auto r = rand_int(0, 1000); + if (r == (int)EventType::LinkDown || + r == (int)EventType::PeerDisconnected) + { + update(static_cast(r)); + } + } + + if (events[EventType::PeerDisconnected].state_ == State::On) + { + auto& event = events[EventType::PeerDisconnected]; + bool allCounting = network_.allCounting(event.peer_); + network_.overlay().deletePeer( + event.peer_, + [&](PublicKey const& v, PeerWPtr const& peerPtr) { + if (event.isSelected_) + sendSquelch(v, peerPtr, {}); + event.handled_ = true; + }); + // Should only be unsquelched if the peer is in Selected state + // If in Selected state it's possible unsquelching didn't + // take place because there is no peers in Squelched state in + // any of the slots where the peer is in Selected state + // (allCounting is true) + bool handled = + (event.isSelected_ == false && !event.handled_) || + (event.isSelected_ == true && + (event.handled_ || allCounting)); + BEAST_EXPECT(handled); + event.state_ = State::Off; + event.isSelected_ = false; + event.handledCnt_ += handled; + event.handled_ = false; + network_.onDisconnectPeer(event.peer_); + } + + auto& event = events[EventType::LinkDown]; + // Check every sec for idled peers. Idled peers are + // created by Link Down event. + if (now - lastCheck > milliseconds(1000)) + { + lastCheck = now; + // Check if Link Down event must be handled by + // deleteIdlePeer(): 1) the peer is in Selected state; + // 2) the peer has not received any messages for IDLED time; + // 3) there are peers in Squelched state in the slot. + // 4) peer is in Slot's peers_ (if not then it is deleted + // by Slots::deleteIdlePeers()) + bool mustHandle = false; + if (event.state_ == State::On) + { + event.isSelected_ = + network_.overlay().isSelected(event.key_, event.peer_); + auto peers = network_.overlay().getPeers(event.key_); + auto d = squelch::epoch(now).count() - + std::get<3>(peers[event.peer_]); + mustHandle = event.isSelected_ && + d > milliseconds(squelch::IDLED).count() && + network_.overlay().inState( + event.key_, squelch::PeerState::Squelched) > 0 && + peers.find(event.peer_) != peers.end(); + } + network_.overlay().deleteIdlePeers( + [&](PublicKey const& v, PeerWPtr const& ptr) { + event.handled_ = true; + if (mustHandle && v == event.key_) + { + event.state_ = State::WaitReset; + sendSquelch(validator, ptr, {}); + } + }); + bool handled = + (event.handled_ && event.state_ == State::WaitReset) || + (!event.handled_ && !mustHandle); + BEAST_EXPECT(handled); + } + if (event.state_ == State::WaitReset || + (event.state_ == State::On && + (now - event.time_ > (squelch::IDLED + seconds(2))))) + { + bool handled = + event.state_ == State::WaitReset || !event.handled_; + BEAST_EXPECT(handled); + event.state_ = State::Off; + event.isSelected_ = false; + event.handledCnt_ += handled; + event.handled_ = false; + network_.enableLink(event.validator_, event.peer_, true); + } + }); + + auto& down = events[EventType::LinkDown]; + auto& disconnected = events[EventType::PeerDisconnected]; + // It's possible the last Down Link event is not handled + BEAST_EXPECT(down.handledCnt_ >= down.cnt_ - 1); + // All Peer Disconnect events must be handled + BEAST_EXPECT(disconnected.cnt_ == disconnected.handledCnt_); + if (log) + std::cout << "link down count: " << down.cnt_ << "/" + << down.handledCnt_ + << " peer disconnect count: " << disconnected.cnt_ << "/" + << disconnected.handledCnt_; + } + + bool + checkCounting(PublicKey const& validator, bool isCountingState) + { + auto countingState = network_.overlay().isCountingState(validator); + BEAST_EXPECT(countingState == isCountingState); + return countingState == isCountingState; + } + + void + doTest(const std::string& msg, bool log, std::function f) + { + testcase(msg); + f(log); + } + + /** Initial counting round: three peers receive message "faster" then + * others. Once the message count for the three peers reaches threshold + * the rest of the peers are squelched and the slot for the given validator + * is in Selected state. + */ + void + testInitialRound(bool log) + { + doTest("Initial Round", log, [this](bool log) { + BEAST_EXPECT(propagateAndSquelch(log)); + }); + } + + /** Receiving message from squelched peer too soon should not change the + * slot's state to Counting. + */ + void + testPeerUnsquelchedTooSoon(bool log) + { + doTest("Peer Unsquelched Too Soon", log, [this](bool log) { + BEAST_EXPECT(propagateNoSquelch(log, 1, false, false, false)); + }); + } + + /** Receiving message from squelched peer should change the + * slot's state to Counting. + */ + void + testPeerUnsquelched(bool log) + { + ManualClock::advance(seconds(601)); + doTest("Peer Unsquelched", log, [this](bool log) { + BEAST_EXPECT(propagateNoSquelch(log, 2, true, true, false)); + }); + } + + /** Propagate enough messages to generate one squelch event */ + bool + propagateAndSquelch(bool log, bool purge = true, bool resetClock = true) + { + int n = 0; + network_.propagate( + [&](Link& link, MessageSPtr message) { + std::uint16_t squelched = 0; + link.send( + message, + [&](PublicKey const& key, + PeerWPtr const& peerPtr, + std::uint32_t duration) { + squelched++; + sendSquelch(key, peerPtr, duration); + }); + if (squelched) + { + BEAST_EXPECT( + squelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); + n++; + } + }, + 1, + squelch::MAX_MESSAGE_THRESHOLD + 2, + purge, + resetClock); + auto selected = network_.overlay().getSelected(network_.validator(0)); + BEAST_EXPECT(selected.size() == squelch::MAX_SELECTED_PEERS); + BEAST_EXPECT(n == 1); // only one selection round + auto res = checkCounting(network_.validator(0), false); + BEAST_EXPECT(res); + return n == 1 && res; + } + + /** Send fewer message so that squelch event is not generated */ + bool + propagateNoSquelch( + bool log, + std::uint16_t nMessages, + bool countingState, + bool purge = true, + bool resetClock = true) + { + bool squelched = false; + network_.propagate( + [&](Link& link, MessageSPtr message) { + link.send( + message, + [&](PublicKey const& key, + PeerWPtr const& peerPtr, + std::uint32_t duration) { + squelched = true; + BEAST_EXPECT(false); + }); + }, + 1, + nMessages, + purge, + resetClock); + auto res = checkCounting(network_.validator(0), countingState); + return !squelched && res; + } + + /** Receiving a message from new peer should change the + * slot's state to Counting. + */ + void + testNewPeer(bool log) + { + doTest("New Peer", log, [this](bool log) { + BEAST_EXPECT(propagateAndSquelch(log, true, false)); + network_.addPeer(); + BEAST_EXPECT(propagateNoSquelch(log, 1, true, false, false)); + }); + } + + /** Selected peer disconnects. Should change the state to counting and + * unsquelch squelched peers. */ + void + testSelectedPeerDisconnects(bool log) + { + doTest("Selected Peer Disconnects", log, [this](bool log) { + ManualClock::advance(seconds(601)); + BEAST_EXPECT(propagateAndSquelch(log, true, false)); + auto id = network_.overlay().getSelectedPeer(network_.validator(0)); + std::uint16_t unsquelched = 0; + network_.overlay().deletePeer( + id, [&](PublicKey const& key, PeerWPtr const& peer) { + unsquelched++; + }); + BEAST_EXPECT( + unsquelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); + BEAST_EXPECT(checkCounting(network_.validator(0), true)); + }); + } + + /** Selected peer stops relaying. Should change the state to counting and + * unsquelch squelched peers. */ + void + testSelectedPeerStopsRelaying(bool log) + { + doTest("Selected Peer Stops Relaying", log, [this](bool log) { + ManualClock::advance(seconds(601)); + BEAST_EXPECT(propagateAndSquelch(log, true, false)); + ManualClock::advance(squelch::IDLED + seconds(1)); + std::uint16_t unsquelched = 0; + network_.overlay().deleteIdlePeers( + [&](PublicKey const& key, PeerWPtr const& peer) { + unsquelched++; + }); + auto peers = network_.overlay().getPeers(network_.validator(0)); + BEAST_EXPECT( + unsquelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); + BEAST_EXPECT(checkCounting(network_.validator(0), true)); + }); + } + + /** Squelched peer disconnects. Should not change the state to counting. + */ + void + testSquelchedPeerDisconnects(bool log) + { + doTest("Squelched Peer Disconnects", log, [this](bool log) { + ManualClock::advance(seconds(601)); + BEAST_EXPECT(propagateAndSquelch(log, true, false)); + auto peers = network_.overlay().getPeers(network_.validator(0)); + auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) { + return std::get(it.second) == + squelch::PeerState::Squelched; + }); + assert(it != peers.end()); + std::uint16_t unsquelched = 0; + network_.overlay().deletePeer( + it->first, [&](PublicKey const& key, PeerWPtr const& peer) { + unsquelched++; + }); + BEAST_EXPECT(unsquelched == 0); + BEAST_EXPECT(checkCounting(network_.validator(0), false)); + }); + } + + void + testConfig(bool log) + { + doTest("Config Test", log, [&](bool log) { + Config c; + + std::string toLoad(R"rippleConfig( +[reduce_relay] +enable=1 +squelch=1 +)rippleConfig"); + + c.loadFromString(toLoad); + BEAST_EXPECT(c.REDUCE_RELAY_ENABLE == true); + BEAST_EXPECT(c.REDUCE_RELAY_SQUELCH == true); + + Config c1; + + toLoad = (R"rippleConfig( +[reduce_relay] +enable=0 +squelch=0 +)rippleConfig"); + + c1.loadFromString(toLoad); + BEAST_EXPECT(c1.REDUCE_RELAY_ENABLE == false); + BEAST_EXPECT(c1.REDUCE_RELAY_SQUELCH == false); + + Config c2; + + toLoad = R"rippleConfig( +[reduce_relay] +enabled=1 +squelched=1 +)rippleConfig"; + + c2.loadFromString(toLoad); + BEAST_EXPECT(c2.REDUCE_RELAY_ENABLE == false); + BEAST_EXPECT(c2.REDUCE_RELAY_SQUELCH == false); + }); + } + + void + testInternalHashRouter(bool log) + { + doTest("Duplicate Message", log, [&](bool log) { + network_.reset(); + // update message count for the same peer/validator + std::int16_t nMessages = 5; + for (int i = 0; i < nMessages; i++) + { + uint256 key(i); + network_.overlay().updateSlotAndSquelch( + key, + network_.validator(0), + 0, + [&](PublicKey const&, PeerWPtr, std::uint32_t) {}); + } + auto peers = network_.overlay().getPeers(network_.validator(0)); + // first message changes Slot state to Counting and is not counted, + // hence '-1'. + BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1)); + // add duplicate + uint256 key(nMessages - 1); + network_.overlay().updateSlotAndSquelch( + key, + network_.validator(0), + 0, + [&](PublicKey const&, PeerWPtr, std::uint32_t) {}); + // confirm the same number of messages + peers = network_.overlay().getPeers(network_.validator(0)); + BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1)); + // advance the clock + ManualClock::advance(squelch::IDLED + seconds(1)); + network_.overlay().updateSlotAndSquelch( + key, + network_.validator(0), + 0, + [&](PublicKey const&, PeerWPtr, std::uint32_t) {}); + peers = network_.overlay().getPeers(network_.validator(0)); + // confirm message number increased + BEAST_EXPECT(std::get<1>(peers[0]) == nMessages); + }); + } + + jtx::Env env_; + Network network_; + +public: + reduce_relay_test() : env_(*this), network_(env_.app()) + { + } + + void + run() override + { + bool log = false; + testConfig(log); + testInitialRound(log); + testPeerUnsquelchedTooSoon(log); + testPeerUnsquelched(log); + testNewPeer(log); + testSquelchedPeerDisconnects(log); + testSelectedPeerDisconnects(log); + testSelectedPeerStopsRelaying(log); + testInternalHashRouter(log); + } +}; + +class reduce_relay_simulate_test : public reduce_relay_test +{ + void + testRandom(bool log) + { + doTest("Random Test", log, [&](bool log) { random(log); }); + } + + void + run() override + { + bool log = false; + testRandom(log); + } +}; + +BEAST_DEFINE_TESTSUITE(reduce_relay, ripple_data, ripple); +BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, ripple_data, ripple); + +} // namespace test + +} // namespace ripple \ No newline at end of file