diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 97d48bde8a..ceda0df4bc 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -867,6 +867,7 @@ target_sources (rippled PRIVATE src/test/overlay/cluster_test.cpp src/test/overlay/short_read_test.cpp src/test/overlay/compression_test.cpp + src/test/overlay/reduce_relay_test.cpp #[===============================[ test sources: subdir: peerfinder diff --git a/bin/ci/ubuntu/build-and-test.sh b/bin/ci/ubuntu/build-and-test.sh index 7ffad801dd..0b77406c94 100755 --- a/bin/ci/ubuntu/build-and-test.sh +++ b/bin/ci/ubuntu/build-and-test.sh @@ -136,6 +136,7 @@ else # ORDER matters here...sorted in approximately # descending execution time (longest running tests at top) declare -a manual_tests=( + 'ripple.ripple_data.reduce_relay_simulate' 'ripple.ripple_data.digest' 'ripple.tx.Offer_manual' 'ripple.app.PayStrandAllPairs' diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index 9b5ae65cd4..fd61ae2bf4 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -157,7 +157,7 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos) auto const sig = peerPos.signature(); prop.set_signature(sig.data(), sig.size()); - app_.overlay().relay(prop, peerPos.suppressionID()); + app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey()); } void diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index ea47450826..8a8170f486 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -49,12 +49,18 @@ HashRouter::addSuppression(uint256 const& key) bool HashRouter::addSuppressionPeer(uint256 const& key, PeerShortID peer) +{ + return addSuppressionPeerWithStatus(key, peer).first; +} + +std::pair> +HashRouter::addSuppressionPeerWithStatus(const uint256& key, PeerShortID peer) { std::lock_guard lock(mutex_); auto result = emplace(key); result.first.addPeer(peer); - return result.second; + return {result.second, result.first.relayed()}; } bool @@ -110,14 +116,14 @@ HashRouter::setFlags(uint256 const& key, int flags) auto HashRouter::shouldRelay(uint256 const& key) - -> boost::optional> + -> std::optional> { std::lock_guard lock(mutex_); auto& s = emplace(key).first; if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_)) - return boost::none; + return {}; return s.releasePeerSet(); } diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index 9038cf2922..76cf4431ec 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -97,6 +97,13 @@ private: return std::move(peers_); } + /** Return seated relay time point if the message has been relayed */ + std::optional + relayed() const + { + return relayed_; + } + /** Determines if this item should be relayed. Checks whether the item has been recently relayed. @@ -142,8 +149,8 @@ private: std::set peers_; // This could be generalized to a map, if more // than one flag needs to expire independently. - boost::optional relayed_; - boost::optional processed_; + std::optional relayed_; + std::optional processed_; std::uint32_t recoveries_ = 0; }; @@ -185,6 +192,14 @@ public: bool addSuppressionPeer(uint256 const& key, PeerShortID peer); + /** Add a suppression peer and get message's relay status. + * Return pair: + * element 1: true if the peer is added. + * element 2: optional is seated to the relay time point or + * is unseated if has not relayed yet. */ + std::pair> + addSuppressionPeerWithStatus(uint256 const& key, PeerShortID peer); + bool addSuppressionPeer(uint256 const& key, PeerShortID peer, int& flags); @@ -214,11 +229,11 @@ public: return `true` again until the hold time has expired. The internal set of peers will also be reset. - @return A `boost::optional` set of peers which do not need to be + @return A `std::optional` set of peers which do not need to be relayed to. If the result is uninitialized, the item should _not_ be relayed. */ - boost::optional> + std::optional> shouldRelay(uint256 const& key); /** Determines whether the hashed item should be recovered diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index 7eec3bc076..4bf26a6141 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -179,6 +179,12 @@ public: // Thread pool configuration std::size_t WORKERS = 0; + // Reduce-relay - these parameters are experimental. + // Enable reduce-relay functionality + bool REDUCE_RELAY_ENABLE = false; + // Send squelch message to peers + bool REDUCE_RELAY_SQUELCH = false; + // These override the command line client settings boost::optional rpc_ip; diff --git a/src/ripple/core/ConfigSections.h b/src/ripple/core/ConfigSections.h index 3aae9774d1..313a759c6f 100644 --- a/src/ripple/core/ConfigSections.h +++ b/src/ripple/core/ConfigSections.h @@ -70,6 +70,7 @@ struct ConfigSection #define SECTION_PATH_SEARCH_MAX "path_search_max" #define SECTION_PEER_PRIVATE "peer_private" #define SECTION_PEERS_MAX "peers_max" +#define SECTION_REDUCE_RELAY "reduce_relay" #define SECTION_RELAY_PROPOSALS "relay_proposals" #define SECTION_RELAY_VALIDATIONS "relay_validations" #define SECTION_RPC_STARTUP "rpc_startup" diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index a7aeca8617..bdaa47b28e 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -481,6 +481,13 @@ Config::loadFromString(std::string const& fileContents) if (getSingleSection(secConfig, SECTION_COMPRESSION, strTemp, j_)) COMPRESSION = beast::lexicalCastThrow(strTemp); + if (exists(SECTION_REDUCE_RELAY)) + { + auto sec = section(SECTION_REDUCE_RELAY); + REDUCE_RELAY_ENABLE = sec.value_or("enable", false); + REDUCE_RELAY_SQUELCH = sec.value_or("squelch", false); + } + if (getSingleSection( secConfig, SECTION_AMENDMENT_MAJORITY_TIME, strTemp, j_)) { diff --git a/src/ripple/overlay/Message.h b/src/ripple/overlay/Message.h index e2c081d123..724fad07e6 100644 --- a/src/ripple/overlay/Message.h +++ b/src/ripple/overlay/Message.h @@ -21,6 +21,7 @@ #define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED #include +#include #include #include #include @@ -55,8 +56,13 @@ public: /** Constructor * @param message Protocol message to serialize * @param type Protocol message type + * @param validator Public Key of the source validator for Validation or + * Proposal message. Used to check if the message should be squelched. */ - Message(::google::protobuf::Message const& message, int type); + Message( + ::google::protobuf::Message const& message, + int type, + boost::optional const& validator = {}); /** Retrieve the packed message data. If compressed message is requested but * the message is not compressible then the uncompressed buffer is returned. @@ -74,11 +80,19 @@ public: return category_; } + /** Get the validator's key */ + boost::optional const& + getValidatorKey() const + { + return validatorKey_; + } + private: std::vector buffer_; std::vector bufferCompressed_; std::size_t category_; std::once_flag once_flag_; + boost::optional validatorKey_; /** Set the payload header * @param in Pointer to the payload diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index d5f0c4bae2..fd1b9ca34f 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -133,7 +133,7 @@ public: /** Returns the peer with the matching short id, or null. */ virtual std::shared_ptr - findPeerByShortID(Peer::id_t const& id) = 0; + findPeerByShortID(Peer::id_t const& id) const = 0; /** Returns the peer with the matching public key, or null. */ virtual std::shared_ptr @@ -147,13 +147,29 @@ public: virtual void broadcast(protocol::TMValidation& m) = 0; - /** Relay a proposal. */ - virtual void - relay(protocol::TMProposeSet& m, uint256 const& uid) = 0; + /** Relay a proposal. + * @param m the serialized proposal + * @param uid the id used to identify this proposal + * @param validator The pubkey of the validator that issued this proposal + * @return the set of peers which have already sent us this proposal + */ + virtual std::set + relay( + protocol::TMProposeSet& m, + uint256 const& uid, + PublicKey const& validator) = 0; - /** Relay a validation. */ - virtual void - relay(protocol::TMValidation& m, uint256 const& uid) = 0; + /** Relay a validation. + * @param m the serialized validation + * @param uid the id used to identify this validation + * @param validator The pubkey of the validator that issued this validation + * @return the set of peers which have already sent us this validation + */ + virtual std::set + relay( + protocol::TMValidation& m, + uint256 const& uid, + PublicKey const& validator) = 0; /** Visit every active peer. * diff --git a/src/ripple/overlay/Slot.h b/src/ripple/overlay/Slot.h new file mode 100644 index 0000000000..57d1c137a1 --- /dev/null +++ b/src/ripple/overlay/Slot.h @@ -0,0 +1,734 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED +#define RIPPLE_OVERLAY_SLOT_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +namespace squelch { + +template +class Slots; + +/** Peer's State */ +enum class PeerState : uint8_t { + Counting, // counting messages + Selected, // selected to relay, counting if Slot in Counting + Squelched, // squelched, doesn't relay +}; +/** Slot's State */ +enum class SlotState : uint8_t { + Counting, // counting messages + Selected, // peers selected, stop counting +}; + +template +Unit +epoch(TP const& t) +{ + return duration_cast(t.time_since_epoch()); +} + +/** Abstract class. Declares squelch and unsquelch handlers. + * OverlayImpl inherits from this class. Motivation is + * for easier unit tests to facilitate on the fly + * changing callbacks. */ +class SquelchHandler +{ +public: + virtual ~SquelchHandler() + { + } + /** Squelch handler + * @param validator Public key of the source validator + * @param id Peer's id to squelch + * @param duration Squelch duration in seconds + */ + virtual void + squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration) + const = 0; + /** Unsquelch handler + * @param validator Public key of the source validator + * @param id Peer's id to unsquelch + */ + virtual void + unsquelch(PublicKey const& validator, Peer::id_t id) const = 0; +}; + +/** + * Slot is associated with a specific validator via validator's public key. + * Slot counts messages from a validator, selects peers to be the source + * of the messages, and communicates the peers to be squelched. Slot can be + * in the following states: 1) Counting. This is the peer selection state + * when Slot counts the messages and selects the peers; 2) Selected. Slot + * doesn't count messages in Selected state. A message received from + * unsquelched, disconnected peer, or idling peer may transition Slot to + * Counting state. + */ +template +class Slot final +{ +private: + friend class Slots; + using id_t = Peer::id_t; + using time_point = typename clock_type::time_point; + + /** Constructor + * @param journal Journal for logging + * @param handler Squelch/Unsquelch implementation + */ + Slot(SquelchHandler const& handler, beast::Journal journal) + : reachedThreshold_(0) + , lastSelected_(clock_type::now()) + , state_(SlotState::Counting) + , handler_(handler) + , journal_(journal) + { + } + + /** Update peer info. If the message is from a new + * peer or from a previously expired squelched peer then switch + * the peer's and slot's state to Counting. If time of last + * selection round is > 2 * MAX_UNSQUELCH_EXPIRE then switch the slot's + * state to Counting. If the number of messages for the peer + * is > MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. + * If the number of considered peers who reached MAX_MESSAGE_THRESHOLD is + * MAX_SELECTED_PEERS then randomly select MAX_SELECTED_PEERS from + * considered peers, and call squelch handler for each peer, which is not + * selected and not already in Squelched state. Set the state for those + * peers to Squelched and reset the count of all peers. Set slot's state to + * Selected. Message count is not updated when the slot is in Selected + * state. + * @param validator Public key of the source validator + * @param id Peer id which received the message + * @param type Message type (Validation and Propose Set only, + * others are ignored, future use) + */ + void + update(PublicKey const& validator, id_t id, protocol::MessageType type); + + /** Handle peer deletion when a peer disconnects. + * If the peer is in Selected state then + * call unsquelch handler for every peer in squelched state and reset + * every peer's state to Counting. Switch Slot's state to Counting. + * @param validator Public key of the source validator + * @param id Deleted peer id + * @param erase If true then erase the peer. The peer is not erased + * when the peer when is idled. The peer is deleted when it + * disconnects + */ + void + deletePeer(PublicKey const& validator, id_t id, bool erase); + + /** Get the time of the last peer selection round */ + const time_point& + getLastSelected() const + { + return lastSelected_; + } + + /** Return number of peers in state */ + std::uint16_t + inState(PeerState state) const; + + /** Return number of peers not in state */ + std::uint16_t + notInState(PeerState state) const; + + /** Return Slot's state */ + SlotState + getState() const + { + return state_; + } + + /** Return selected peers */ + std::set + getSelected() const; + + /** Get peers info. Return map of peer's state, count, squelch + * expiration milsec, and last message time milsec. + */ + std:: + unordered_map> + getPeers() const; + + /** Check if peers stopped relaying messages. If a peer is + * selected peer then call unsquelch handler for all + * currently squelched peers and switch the slot to + * Counting state. + * @param validator Public key of the source validator + */ + void + deleteIdlePeer(PublicKey const& validator); + +private: + /** Reset counts of peers in Selected or Counting state */ + void + resetCounts(); + + /** Initialize slot to Counting state */ + void + initCounting(); + + /** Data maintained for each peer */ + struct PeerInfo + { + PeerState state; // peer's state + std::size_t count; // message count + time_point expire; // squelch expiration time + time_point lastMessage; // time last message received + }; + std::unordered_map peers_; // peer's data + // pool of peers considered as the source of messages + // from validator - peers that reached MIN_MESSAGE_THRESHOLD + std::unordered_set considered_; + // number of peers that reached MAX_MESSAGE_THRESHOLD + std::uint16_t reachedThreshold_; + // last time peers were selected, used to age the slot + typename clock_type::time_point lastSelected_; + SlotState state_; // slot's state + SquelchHandler const& handler_; // squelch/unsquelch handler + beast::Journal const journal_; // logging +}; + +template +void +Slot::deleteIdlePeer(PublicKey const& validator) +{ + auto now = clock_type::now(); + for (auto it = peers_.begin(); it != peers_.end();) + { + auto& peer = it->second; + auto id = it->first; + ++it; + if (now - peer.lastMessage > IDLED) + { + JLOG(journal_.debug()) + << "deleteIdlePeer: " << Slice(validator) << " " << id + << " idled " + << duration_cast(now - peer.lastMessage).count() + << " selected " << (peer.state == PeerState::Selected); + deletePeer(validator, id, false); + } + } +} + +template +void +Slot::update( + PublicKey const& validator, + id_t id, + protocol::MessageType type) +{ + auto now = clock_type::now(); + auto it = peers_.find(id); + // First message from this peer + if (it == peers_.end()) + { + JLOG(journal_.debug()) + << "update: adding peer " << Slice(validator) << " " << id; + peers_.emplace( + std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now})); + initCounting(); + return; + } + // Message from a peer with expired squelch + if (it->second.state == PeerState::Squelched && now > it->second.expire) + { + JLOG(journal_.debug()) + << "update: squelch expired " << Slice(validator) << " " << id; + it->second.state = PeerState::Counting; + it->second.lastMessage = now; + initCounting(); + return; + } + + auto& peer = it->second; + + JLOG(journal_.debug()) + << "update: existing peer " << Slice(validator) << " " << id + << " slot state " << static_cast(state_) << " peer state " + << static_cast(peer.state) << " count " << peer.count << " last " + << duration_cast(now - peer.lastMessage).count() + << " pool " << considered_.size() << " threshold " << reachedThreshold_ + << " " << (type == protocol::mtVALIDATION ? "validation" : "proposal"); + + peer.lastMessage = now; + + if (state_ != SlotState::Counting || peer.state == PeerState::Squelched) + return; + + if (++peer.count > MIN_MESSAGE_THRESHOLD) + considered_.insert(id); + if (peer.count == (MAX_MESSAGE_THRESHOLD + 1)) + ++reachedThreshold_; + + if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE) + { + JLOG(journal_.debug()) + << "update: resetting due to inactivity " << Slice(validator) << " " + << id << " " << duration_cast(now - lastSelected_).count(); + initCounting(); + return; + } + + if (reachedThreshold_ == MAX_SELECTED_PEERS) + { + // Randomly select MAX_SELECTED_PEERS peers from considered. + // Exclude peers that have been idling > IDLED - + // it's possible that deleteIdlePeer() has not been called yet. + // If number of remaining peers != MAX_SELECTED_PEERS + // then reset the Counting state and let deleteIdlePeer() handle + // idled peers. + std::unordered_set selected; + auto const consideredPoolSize = considered_.size(); + while (selected.size() != MAX_SELECTED_PEERS && considered_.size() != 0) + { + auto i = + considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1); + auto it = std::next(considered_.begin(), i); + auto id = *it; + considered_.erase(it); + auto const& itpeers = peers_.find(id); + if (itpeers == peers_.end()) + { + JLOG(journal_.error()) << "update: peer not found " + << Slice(validator) << " " << id; + continue; + } + if (now - itpeers->second.lastMessage < IDLED) + selected.insert(id); + } + + if (selected.size() != MAX_SELECTED_PEERS) + { + JLOG(journal_.debug()) + << "update: selection failed " << Slice(validator) << " " << id; + initCounting(); + return; + } + + lastSelected_ = now; + + auto s = selected.begin(); + JLOG(journal_.debug()) + << "update: " << Slice(validator) << " " << id << " pool size " + << consideredPoolSize << " selected " << *s << " " + << *std::next(s, 1) << " " << *std::next(s, 2); + + // squelch peers which are not selected and + // not already squelched + std::stringstream str; + for (auto& [k, v] : peers_) + { + v.count = 0; + + if (selected.find(k) != selected.end()) + v.state = PeerState::Selected; + else if (v.state != PeerState::Squelched) + { + if (journal_.debug()) + str << k << " "; + v.state = PeerState::Squelched; + auto duration = Squelch::getSquelchDuration(); + v.expire = now + duration; + handler_.squelch( + validator, + k, + duration_cast(duration).count()); + } + } + JLOG(journal_.debug()) << "update: squelching " << Slice(validator) + << " " << id << " " << str.str(); + considered_.clear(); + reachedThreshold_ = 0; + state_ = SlotState::Selected; + } +} + +template +void +Slot::deletePeer(PublicKey const& validator, id_t id, bool erase) +{ + auto it = peers_.find(id); + if (it != peers_.end()) + { + JLOG(journal_.debug()) + << "deletePeer: " << Slice(validator) << " " << id << " selected " + << (it->second.state == PeerState::Selected) << " considered " + << (considered_.find(id) != considered_.end()) << " erase " + << erase; + auto now = clock_type::now(); + if (it->second.state == PeerState::Selected) + { + for (auto& [k, v] : peers_) + { + if (v.state == PeerState::Squelched) + handler_.unsquelch(validator, k); + v.state = PeerState::Counting; + v.count = 0; + v.expire = now; + } + + considered_.clear(); + reachedThreshold_ = 0; + state_ = SlotState::Counting; + } + else if (considered_.find(id) != considered_.end()) + { + if (it->second.count > MAX_MESSAGE_THRESHOLD) + --reachedThreshold_; + considered_.erase(id); + } + + it->second.lastMessage = now; + it->second.count = 0; + + if (erase) + peers_.erase(it); + } +} + +template +void +Slot::resetCounts() +{ + for (auto& [_, peer] : peers_) + { + (void)_; + peer.count = 0; + } +} + +template +void +Slot::initCounting() +{ + state_ = SlotState::Counting; + considered_.clear(); + reachedThreshold_ = 0; + resetCounts(); +} + +template +std::uint16_t +Slot::inState(PeerState state) const +{ + return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) { + return (it.second.state == state); + }); +} + +template +std::uint16_t +Slot::notInState(PeerState state) const +{ + return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) { + return (it.second.state != state); + }); +} + +template +std::set +Slot::getSelected() const +{ + std::set init; + return std::accumulate( + peers_.begin(), peers_.end(), init, [](auto& init, auto const& it) { + if (it.second.state == PeerState::Selected) + { + init.insert(it.first); + return init; + } + return init; + }); +} + +template +std::unordered_map< + typename Peer::id_t, + std::tuple> +Slot::getPeers() const +{ + auto init = std::unordered_map< + id_t, + std::tuple>(); + return std::accumulate( + peers_.begin(), peers_.end(), init, [](auto& init, auto const& it) { + init.emplace(std::make_pair( + it.first, + std::move(std::make_tuple( + it.second.state, + it.second.count, + epoch(it.second.expire).count(), + epoch(it.second.lastMessage).count())))); + return init; + }); +} + +/** Slots is a container for validator's Slot and handles Slot update + * when a message is received from a validator. It also handles Slot aging + * and checks for peers which are disconnected or stopped relaying the messages. + */ +template +class Slots final +{ + using time_point = typename clock_type::time_point; + using id_t = typename Peer::id_t; + using messages = beast::aged_unordered_map< + uint256, + std::unordered_set, + clock_type, + hardened_hash>; + +public: + /** + * @param app Applicaton reference + * @param handler Squelch/unsquelch implementation + */ + Slots(Application& app, SquelchHandler const& handler) + : handler_(handler), app_(app), journal_(app.journal("Slots")) + { + } + ~Slots() = default; + /** Calls Slot::update of Slot associated with the validator. + * @param key Message's hash + * @param validator Validator's public key + * @param id Peer's id which received the message + * @param id Peer's pointer which received the message + * @param type Received protocol message type + */ + void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + id_t id, + protocol::MessageType type); + + /** Check if peers stopped relaying messages + * and if slots stopped receiving messages from the validator. + */ + void + deleteIdlePeers(); + + /** Return number of peers in state */ + std::optional + inState(PublicKey const& validator, PeerState state) const + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.inState(state); + return {}; + } + + /** Return number of peers not in state */ + std::optional + notInState(PublicKey const& validator, PeerState state) const + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.notInState(state); + return {}; + } + + /** Return true if Slot is in state */ + bool + inState(PublicKey const& validator, SlotState state) const + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.state_ == state; + return false; + } + + /** Get selected peers */ + std::set + getSelected(PublicKey const& validator) + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.getSelected(); + return {}; + } + + /** Get peers info. Return map of peer's state, count, and squelch + * expiration milliseconds. + */ + std::unordered_map< + typename Peer::id_t, + std::tuple> + getPeers(PublicKey const& validator) + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.getPeers(); + return {}; + } + + /** Get Slot's state */ + std::optional + getState(PublicKey const& validator) + { + auto const& it = slots_.find(validator); + if (it != slots_.end()) + return it->second.getState(); + return {}; + } + + /** Called when a peer is deleted. If the peer was selected to be the + * source of messages from the validator then squelched peers have to be + * unsquelched. + * @param id Peer's id + * @param erase If true then erase the peer + */ + void + deletePeer(id_t id, bool erase); + +private: + /** Add message/peer if have not seen this message + * from the peer. A message is aged after IDLED seconds. + * Return true if added */ + bool + addPeerMessage(uint256 const& key, id_t id); + + hash_map> slots_; + SquelchHandler const& handler_; // squelch/unsquelch handler + Application& app_; + beast::Journal const journal_; + // Maintain aged container of message/peers. This is required + // to discard duplicate message from the same peer. A message + // is aged after IDLED seconds. A message received IDLED seconds + // after it was relayed is ignored by PeerImp. + inline static messages peersWithMessage_{ + beast::get_abstract_clock()}; +}; + +template +bool +Slots::addPeerMessage(uint256 const& key, id_t id) +{ + beast::expire(peersWithMessage_, squelch::IDLED); + + if (key.isNonZero()) + { + auto it = peersWithMessage_.find(key); + if (it == peersWithMessage_.end()) + { + JLOG(journal_.trace()) + << "addPeerMessage: new " << to_string(key) << " " << id; + peersWithMessage_.emplace(key, std::unordered_set{id}); + return true; + } + + if (it->second.find(id) != it->second.end()) + { + JLOG(journal_.trace()) << "addPeerMessage: duplicate message " + << to_string(key) << " " << id; + return false; + } + + JLOG(journal_.trace()) + << "addPeerMessage: added " << to_string(key) << " " << id; + + it->second.insert(id); + } + + return true; +} + +template +void +Slots::updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + id_t id, + protocol::MessageType type) +{ + if (!addPeerMessage(key, id)) + return; + + auto it = slots_.find(validator); + if (it == slots_.end()) + { + JLOG(journal_.debug()) + << "updateSlotAndSquelch: new slot " << Slice(validator); + auto it = slots_ + .emplace(std::make_pair( + validator, + Slot(handler_, app_.journal("Slot")))) + .first; + it->second.update(validator, id, type); + } + else + it->second.update(validator, id, type); +} + +template +void +Slots::deletePeer(id_t id, bool erase) +{ + for (auto& [validator, slot] : slots_) + slot.deletePeer(validator, id, erase); +} + +template +void +Slots::deleteIdlePeers() +{ + auto now = clock_type::now(); + + for (auto it = slots_.begin(); it != slots_.end();) + { + it->second.deleteIdlePeer(it->first); + if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE) + { + JLOG(journal_.debug()) + << "deleteIdlePeers: deleting idle slot " << Slice(it->first); + it = slots_.erase(it); + } + else + ++it; + } +} + +} // namespace squelch + +} // namespace ripple + +#endif // RIPPLE_OVERLAY_SLOT_H_INCLUDED diff --git a/src/ripple/overlay/Squelch.h b/src/ripple/overlay/Squelch.h new file mode 100644 index 0000000000..ab2fa65fba --- /dev/null +++ b/src/ripple/overlay/Squelch.h @@ -0,0 +1,122 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_SQUELCH_H_INCLUDED +#define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED + +#include +#include +#include + +#include +#include + +namespace ripple { + +namespace squelch { + +/** Maintains squelching of relaying messages from validators */ +template +class Squelch +{ + using time_point = typename clock_type::time_point; + +public: + Squelch() = default; + virtual ~Squelch() = default; + + /** Squelch/Unsquelch relaying for the validator + * @param validator The validator's public key + * @param squelch Squelch/unsquelch flag + * @param squelchDuration Squelch duration time if squelch is true + */ + void + squelch(PublicKey const& validator, bool squelch, uint64_t squelchDuration); + + /** Are the messages to this validator squelched + * @param validator Validator's public key + * @return true if squelched + */ + bool + isSquelched(PublicKey const& validator); + + /** Get random squelch duration between MIN_UNSQUELCH_EXPIRE and + * MAX_UNSQUELCH_EXPIRE */ + static seconds + getSquelchDuration(); + +private: + /** Maintains the list of squelched relaying to downstream peers. + * Expiration time is included in the TMSquelch message. */ + hash_map squelched_; +}; + +template +void +Squelch::squelch( + PublicKey const& validator, + bool squelch, + uint64_t squelchDuration) +{ + if (squelch) + { + squelched_[validator] = [squelchDuration]() { + seconds duration = seconds(squelchDuration); + return clock_type::now() + + ((duration >= MIN_UNSQUELCH_EXPIRE && + duration <= MAX_UNSQUELCH_EXPIRE) + ? duration + : getSquelchDuration()); + }(); + } + else + squelched_.erase(validator); +} + +template +bool +Squelch::isSquelched(PublicKey const& validator) +{ + auto now = clock_type::now(); + + auto const& it = squelched_.find(validator); + if (it == squelched_.end()) + return false; + else if (it->second > now) + return true; + + squelched_.erase(it); + + return false; +} + +template +seconds +Squelch::getSquelchDuration() +{ + auto d = seconds(ripple::rand_int( + MIN_UNSQUELCH_EXPIRE.count(), MAX_UNSQUELCH_EXPIRE.count())); + return d; +} + +} // namespace squelch + +} // namespace ripple + +#endif // RIPPLED_SQUELCH_H diff --git a/src/ripple/overlay/SquelchCommon.h b/src/ripple/overlay/SquelchCommon.h new file mode 100644 index 0000000000..dea68a72d2 --- /dev/null +++ b/src/ripple/overlay/SquelchCommon.h @@ -0,0 +1,52 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_SQUELCHCOMMON_H_INCLUDED +#define RIPPLE_OVERLAY_SQUELCHCOMMON_H_INCLUDED +#include + +namespace ripple { + +namespace squelch { + +using namespace std::chrono; + +// Peer's squelch is limited in time to +// rand{MIN_UNSQUELCH_EXPIRE, MAX_UNSQUELCH_EXPIRE} +static constexpr seconds MIN_UNSQUELCH_EXPIRE = seconds{300}; +static constexpr seconds MAX_UNSQUELCH_EXPIRE = seconds{600}; +// No message received threshold before identifying a peer as idled +static constexpr seconds IDLED = seconds{8}; +// Message count threshold to start selecting peers as the source +// of messages from the validator. We add peers who reach +// MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS +// reach MAX_MESSAGE_THRESHOLD. +static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 9; +static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 10; +// Max selected peers to choose as the source of messages from validator +static constexpr uint16_t MAX_SELECTED_PEERS = 3; +// Wait before reduce-relay feature is enabled on boot up to let +// the server establish peer connections +static constexpr minutes WAIT_ON_BOOTUP = minutes{10}; + +} // namespace squelch + +} // namespace ripple + +#endif // RIPPLED_SQUELCHCOMMON_H diff --git a/src/ripple/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp index 29440c44e4..b5b24c3d3e 100644 --- a/src/ripple/overlay/impl/Message.cpp +++ b/src/ripple/overlay/impl/Message.cpp @@ -23,8 +23,12 @@ namespace ripple { -Message::Message(::google::protobuf::Message const& message, int type) +Message::Message( + ::google::protobuf::Message const& message, + int type, + boost::optional const& validator) : category_(TrafficCount::categorize(message, type, false)) + , validatorKey_(validator) { using namespace ripple::compression; diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 9e8d58bc7e..af8c90711e 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -103,6 +103,9 @@ OverlayImpl::Timer::on_timer(error_code ec) if ((++overlay_.timer_count_ % Tuning::checkSeconds) == 0) overlay_.check(); + if ((overlay_.timer_count_ % Tuning::checkIdlePeers) == 0) + overlay_.deleteIdlePeers(); + timer_.expires_from_now(std::chrono::seconds(1)); timer_.async_wait(overlay_.strand_.wrap(std::bind( &Timer::on_timer, shared_from_this(), std::placeholders::_1))); @@ -139,6 +142,7 @@ OverlayImpl::OverlayImpl( , m_resolver(resolver) , next_id_(1) , timer_count_(0) + , slots_(app, *this) , m_stats( std::bind(&OverlayImpl::collect_metrics, this), collector, @@ -1216,7 +1220,7 @@ OverlayImpl::check() } std::shared_ptr -OverlayImpl::findPeerByShortID(Peer::id_t const& id) +OverlayImpl::findPeerByShortID(Peer::id_t const& id) const { std::lock_guard lock(mutex_); auto const iter = ids_.find(id); @@ -1249,18 +1253,23 @@ OverlayImpl::broadcast(protocol::TMProposeSet& m) for_each([&](std::shared_ptr&& p) { p->send(sm); }); } -void -OverlayImpl::relay(protocol::TMProposeSet& m, uint256 const& uid) +std::set +OverlayImpl::relay( + protocol::TMProposeSet& m, + uint256 const& uid, + PublicKey const& validator) { if (auto const toSkip = app_.getHashRouter().shouldRelay(uid)) { auto const sm = - std::make_shared(m, protocol::mtPROPOSE_LEDGER); + std::make_shared(m, protocol::mtPROPOSE_LEDGER, validator); for_each([&](std::shared_ptr&& p) { if (toSkip->find(p->id()) == toSkip->end()) p->send(sm); }); + return *toSkip; } + return {}; } void @@ -1270,17 +1279,23 @@ OverlayImpl::broadcast(protocol::TMValidation& m) for_each([sm](std::shared_ptr&& p) { p->send(sm); }); } -void -OverlayImpl::relay(protocol::TMValidation& m, uint256 const& uid) +std::set +OverlayImpl::relay( + protocol::TMValidation& m, + uint256 const& uid, + PublicKey const& validator) { if (auto const toSkip = app_.getHashRouter().shouldRelay(uid)) { - auto const sm = std::make_shared(m, protocol::mtVALIDATION); + auto const sm = + std::make_shared(m, protocol::mtVALIDATION, validator); for_each([&](std::shared_ptr&& p) { if (toSkip->find(p->id()) == toSkip->end()) p->send(sm); }); + return *toSkip; } + return {}; } //------------------------------------------------------------------------------ @@ -1352,6 +1367,100 @@ OverlayImpl::sendEndpoints() } } +std::shared_ptr +makeSquelchMessage( + PublicKey const& validator, + bool squelch, + uint64_t squelchDuration) +{ + protocol::TMSquelch m; + m.set_squelch(squelch); + m.set_validatorpubkey(validator.data(), validator.size()); + if (squelch) + m.set_squelchduration(squelchDuration); + return std::make_shared(m, protocol::mtSQUELCH); +} + +void +OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const +{ + if (auto peer = findPeerByShortID(id); + peer && app_.config().REDUCE_RELAY_SQUELCH) + { + // optimize - multiple message with different + // validator might be sent to the same peer + auto m = makeSquelchMessage(validator, false, 0); + peer->send(m); + } +} + +void +OverlayImpl::squelch( + PublicKey const& validator, + Peer::id_t id, + uint32_t squelchDuration) const +{ + if (auto peer = findPeerByShortID(id); + peer && app_.config().REDUCE_RELAY_SQUELCH) + { + auto m = makeSquelchMessage(validator, true, squelchDuration); + peer->send(m); + } +} + +void +OverlayImpl::updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + std::set&& peers, + protocol::MessageType type) +{ + if (!strand_.running_in_this_thread()) + return post( + strand_, + [this, key, validator, peers = std::move(peers), type]() mutable { + updateSlotAndSquelch(key, validator, std::move(peers), type); + }); + + for (auto id : peers) + slots_.updateSlotAndSquelch(key, validator, id, type); +} + +void +OverlayImpl::updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + Peer::id_t peer, + protocol::MessageType type) +{ + if (!strand_.running_in_this_thread()) + return post(strand_, [this, key, validator, peer, type]() { + updateSlotAndSquelch(key, validator, peer, type); + }); + + slots_.updateSlotAndSquelch(key, validator, peer, type); +} + +void +OverlayImpl::deletePeer(Peer::id_t id) +{ + if (!strand_.running_in_this_thread()) + return post(strand_, std::bind(&OverlayImpl::deletePeer, this, id)); + + slots_.deletePeer(id, true); +} + +void +OverlayImpl::deleteIdlePeers() +{ + if (!strand_.running_in_this_thread()) + return post(strand_, std::bind(&OverlayImpl::deleteIdlePeers, this)); + + slots_.deleteIdlePeers(); +} + +//------------------------------------------------------------------------------ + Overlay::Setup setup_Overlay(BasicConfig const& config) { diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 099b90556d..abbae3c925 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -52,7 +53,7 @@ namespace ripple { class PeerImp; class BasicConfig; -class OverlayImpl : public Overlay +class OverlayImpl : public Overlay, public squelch::SquelchHandler { public: class Child @@ -124,6 +125,8 @@ private: boost::optional networkID_; + squelch::Slots slots_; + //-------------------------------------------------------------------------- public: @@ -195,7 +198,7 @@ public: void checkSanity(std::uint32_t) override; std::shared_ptr - findPeerByShortID(Peer::id_t const& id) override; + findPeerByShortID(Peer::id_t const& id) const override; std::shared_ptr findPeerByPublicKey(PublicKey const& pubKey) override; @@ -206,11 +209,17 @@ public: void broadcast(protocol::TMValidation& m) override; - void - relay(protocol::TMProposeSet& m, uint256 const& uid) override; + std::set + relay( + protocol::TMProposeSet& m, + uint256 const& uid, + PublicKey const& validator) override; - void - relay(protocol::TMValidation& m, uint256 const& uid) override; + std::set + relay( + protocol::TMValidation& m, + uint256 const& uid, + PublicKey const& validator) override; //-------------------------------------------------------------------------- // @@ -364,7 +373,49 @@ public: void lastLink(std::uint32_t id); + /** Updates message count for validator/peer. Sends TMSquelch if the number + * of messages for N peers reaches threshold T. A message is counted + * if a peer receives the message for the first time and if + * the message has been relayed. + * @param key Unique message's key + * @param validator Validator's public key + * @param peers Peers' id to update the slots for + * @param type Received protocol message type + */ + void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + std::set&& peers, + protocol::MessageType type); + + /** Overload to reduce allocation in case of single peer + */ + void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + Peer::id_t peer, + protocol::MessageType type); + + /** Called when the peer is deleted. If the peer was selected to be the + * source of messages from the validator then squelched peers have to be + * unsquelched. + * @param id Peer's id + */ + void + deletePeer(Peer::id_t id); + private: + void + squelch( + PublicKey const& validator, + Peer::id_t const id, + std::uint32_t squelchDuration) const override; + + void + unsquelch(PublicKey const& validator, Peer::id_t id) const override; + std::shared_ptr makeRedirectResponse( std::shared_ptr const& slot, @@ -481,6 +532,11 @@ private: void sendEndpoints(); + /** Check if peers stopped relaying messages + * and if slots stopped receiving messages from the validator */ + void + deleteIdlePeers(); + private: struct TrafficGauges { diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index b3c00894d8..3e0c1ae26c 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -100,6 +100,7 @@ PeerImp::~PeerImp() { const bool inCluster{cluster()}; + overlay_.deletePeer(id_); if (state_ == State::active) overlay_.onPeerDeactivate(id_); overlay_.peerFinder().on_closed(slot_); @@ -226,6 +227,10 @@ PeerImp::send(std::shared_ptr const& m) if (detaching_) return; + auto validator = m->getValidatorKey(); + if (validator && squelch_.isSquelched(*validator)) + return; + overlay_.reportTraffic( safe_cast(m->getCategory()), false, @@ -1647,8 +1652,20 @@ PeerImp::onMessage(std::shared_ptr const& m) publicKey.slice(), sig); - if (!app_.getHashRouter().addSuppressionPeer(suppression, id_)) + if (auto [added, relayed] = + app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_); + !added) { + // Count unique messages (Slots has it's own 'HashRouter'), which a peer + // receives within IDLED seconds since the message has been relayed. + // Wait WAIT_ON_BOOTUP time to let the server establish connections to + // peers. + if (app_.config().REDUCE_RELAY_ENABLE && relayed && + (stopwatch().now() - *relayed) < squelch::IDLED && + squelch::epoch(UptimeClock::now()) > + squelch::WAIT_ON_BOOTUP) + overlay_.updateSlotAndSquelch( + suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER); JLOG(p_journal_.trace()) << "Proposal: duplicate"; return; } @@ -2146,9 +2163,21 @@ PeerImp::onMessage(std::shared_ptr const& m) return; } - if (!app_.getHashRouter().addSuppressionPeer( - sha512Half(makeSlice(m->validation())), id_)) + auto key = sha512Half(makeSlice(m->validation())); + if (auto [added, relayed] = + app_.getHashRouter().addSuppressionPeerWithStatus(key, id_); + !added) { + // Count unique messages (Slots has it's own 'HashRouter'), which a + // peer receives within IDLED seconds since the message has been + // relayed. Wait WAIT_ON_BOOTUP time to let the server establish + // connections to peers. + if (app_.config().REDUCE_RELAY_ENABLE && (bool)relayed && + (stopwatch().now() - *relayed) < squelch::IDLED && + squelch::epoch(UptimeClock::now()) > + squelch::WAIT_ON_BOOTUP) + overlay_.updateSlotAndSquelch( + key, val->getSignerPublic(), id_, protocol::mtVALIDATION); JLOG(p_journal_.trace()) << "Validation: duplicate"; return; } @@ -2324,6 +2353,45 @@ PeerImp::onMessage(std::shared_ptr const& m) } } +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + if (!m->has_validatorpubkey()) + { + charge(Resource::feeBadData); + return; + } + auto validator = m->validatorpubkey(); + auto const slice{makeSlice(validator)}; + if (!publicKeyType(slice)) + { + charge(Resource::feeBadData); + return; + } + PublicKey key(slice); + auto squelch = m->squelch(); + auto duration = m->has_squelchduration() ? m->squelchduration() : 0; + auto sp = shared_from_this(); + + // Ignore the squelch for validator's own messages. + if (key == app_.getValidationPublicKey()) + { + JLOG(p_journal_.debug()) + << "onMessage: TMSquelch discarding validator's squelch " << slice; + return; + } + + if (!strand_.running_in_this_thread()) + return post(strand_, [sp, key, squelch, duration]() { + sp->squelch_.squelch(key, squelch, duration); + }); + + JLOG(p_journal_.debug()) + << "onMessage: TMSquelch " << slice << " " << id() << " " << duration; + + squelch_.squelch(key, squelch, duration); +} + //-------------------------------------------------------------------------- void @@ -2478,7 +2546,22 @@ PeerImp::checkPropose( relay = app_.config().RELAY_UNTRUSTED_PROPOSALS || cluster(); if (relay) - app_.overlay().relay(*packet, peerPos.suppressionID()); + { + // haveMessage contains peers, which are suppressed; i.e. the peers + // are the source of the message, consequently the message should + // not be relayed to these peers. But the message must be counted + // as part of the squelch logic. + auto haveMessage = app_.overlay().relay( + *packet, peerPos.suppressionID(), peerPos.publicKey()); + if (app_.config().REDUCE_RELAY_ENABLE && !haveMessage.empty() && + squelch::epoch(UptimeClock::now()) > + squelch::WAIT_ON_BOOTUP) + overlay_.updateSlotAndSquelch( + peerPos.suppressionID(), + peerPos.publicKey(), + std::move(haveMessage), + protocol::mtPROPOSE_LEDGER); + } } void @@ -2501,7 +2584,22 @@ PeerImp::checkValidation( { auto const suppression = sha512Half(makeSlice(val->getSerialized())); - overlay_.relay(*packet, suppression); + // haveMessage contains peers, which are suppressed; i.e. the peers + // are the source of the message, consequently the message should + // not be relayed to these peers. But the message must be counted + // as part of the squelch logic. + auto haveMessage = + overlay_.relay(*packet, suppression, val->getSignerPublic()); + if (app_.config().REDUCE_RELAY_ENABLE && !haveMessage.empty() && + squelch::epoch(UptimeClock::now()) > + squelch::WAIT_ON_BOOTUP) + { + overlay_.updateSlotAndSquelch( + suppression, + val->getSignerPublic(), + std::move(haveMessage), + protocol::mtVALIDATION); + } } } catch (std::exception const&) diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 4b279dea65..e9937cdb06 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -142,6 +143,8 @@ private: clock_type::time_point lastPingTime_; clock_type::time_point const creationTime_; + squelch::Squelch squelch_; + // Notes on thread locking: // // During an audit it was noted that some member variables that looked @@ -549,6 +552,8 @@ public: onMessage(std::shared_ptr const& m); void onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); private: State diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index b929f91ba4..eb7dd18f09 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -77,6 +77,8 @@ protocolMessageName(int type) return "validation"; case protocol::mtGET_OBJECTS: return "get_objects"; + case protocol::mtSQUELCH: + return "squelch"; default: break; } @@ -375,6 +377,10 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler) success = detail::invoke( *header, buffers, handler); break; + case protocol::mtSQUELCH: + success = + detail::invoke(*header, buffers, handler); + break; default: handler.onMessageUnknown(header->message_type); success = true; diff --git a/src/ripple/overlay/impl/Tuning.h b/src/ripple/overlay/impl/Tuning.h index 0920c77e3a..bd199fc0a7 100644 --- a/src/ripple/overlay/impl/Tuning.h +++ b/src/ripple/overlay/impl/Tuning.h @@ -71,6 +71,9 @@ enum { /** How often to log send queue size */ sendQueueLogFreq = 64, + + /** How often we check for idle peers (seconds) */ + checkIdlePeers = 4, }; /** The threshold above which we treat a peer connection as high latency */ diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 6038dac10a..fea849d89c 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -23,6 +23,7 @@ enum MessageType mtGET_PEER_SHARD_INFO = 52; mtPEER_SHARD_INFO = 53; mtVALIDATORLIST = 54; + mtSQUELCH = 55; } // token, iterations, target, challenge = issue demand for proof of work @@ -356,3 +357,10 @@ message TMPing optional uint64 netTime = 4; } +message TMSquelch +{ + required bool squelch = 1; // squelch if true, otherwise unsquelch + required bytes validatorPubKey = 2; // validator's public key + optional uint32 squelchDuration = 3; // squelch duration in milliseconds +} + diff --git a/src/test/app/HashRouter_test.cpp b/src/test/app/HashRouter_test.cpp index 4e5c7e2696..9162fb9b1e 100644 --- a/src/test/app/HashRouter_test.cpp +++ b/src/test/app/HashRouter_test.cpp @@ -191,7 +191,7 @@ class HashRouter_test : public beast::unit_test::suite uint256 const key1(1); - boost::optional> peers; + std::optional> peers; peers = router.shouldRelay(key1); BEAST_EXPECT(peers && peers->empty()); diff --git a/src/test/overlay/reduce_relay_test.cpp b/src/test/overlay/reduce_relay_test.cpp new file mode 100644 index 0000000000..5e8f05d63b --- /dev/null +++ b/src/test/overlay/reduce_relay_test.cpp @@ -0,0 +1,1414 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +namespace ripple { + +namespace test { + +using namespace std::chrono; + +class Link; + +using MessageSPtr = std::shared_ptr; +using LinkSPtr = std::shared_ptr; +using PeerSPtr = std::shared_ptr; +using PeerWPtr = std::weak_ptr; +using SquelchCB = + std::function; +using UnsquelchCB = std::function; +using LinkIterCB = std::function; + +static constexpr std::uint32_t MAX_PEERS = 10; +static constexpr std::uint32_t MAX_VALIDATORS = 10; +static constexpr std::uint32_t MAX_MESSAGES = 200000; + +/** Simulate two entities - peer directly connected to the server + * (via squelch in PeerSim) and PeerImp (via Overlay) + */ +class PeerPartial : public Peer +{ +public: + virtual ~PeerPartial() + { + } + virtual void + onMessage(MessageSPtr const& m, SquelchCB f) = 0; + virtual void + onMessage(protocol::TMSquelch const& squelch) = 0; + void + send(protocol::TMSquelch const& squelch) + { + onMessage(squelch); + } + + // dummy implementation + void + send(std::shared_ptr const& m) override + { + } + beast::IP::Endpoint + getRemoteAddress() const override + { + return {}; + } + void + charge(Resource::Charge const& fee) override + { + } + bool + cluster() const override + { + return false; + } + bool + isHighLatency() const override + { + return false; + } + int + getScore(bool) const override + { + return 0; + } + PublicKey const& + getNodePublic() const override + { + static PublicKey key{}; + return key; + } + Json::Value + json() override + { + return {}; + } + bool + supportsFeature(ProtocolFeature f) const override + { + return false; + } + boost::optional + publisherListSequence(PublicKey const&) const override + { + return {}; + } + void + setPublisherListSequence(PublicKey const&, std::size_t const) override + { + } + uint256 const& + getClosedLedgerHash() const override + { + static uint256 hash{}; + return hash; + } + bool + hasLedger(uint256 const& hash, std::uint32_t seq) const override + { + return false; + } + void + ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const override + { + } + bool + hasShard(std::uint32_t shardIndex) const override + { + return false; + } + bool + hasTxSet(uint256 const& hash) const override + { + return false; + } + void + cycleStatus() override + { + } + bool + hasRange(std::uint32_t uMin, std::uint32_t uMax) override + { + return false; + } + bool + compressionEnabled() const override + { + return false; + } +}; + +/** Manually advanced clock. */ +class ManualClock +{ +public: + typedef uint64_t rep; + typedef std::milli period; + typedef std::chrono::duration duration; + typedef std::chrono::time_point time_point; + inline static const bool is_steady = false; + + static void + advance(duration d) noexcept + { + now_ += d; + } + + static void + randAdvance(milliseconds min, milliseconds max) + { + now_ += randDuration(min, max); + } + + static void + reset() noexcept + { + now_ = time_point(seconds(0)); + } + + static time_point + now() noexcept + { + return now_; + } + + static duration + randDuration(milliseconds min, milliseconds max) + { + return duration(milliseconds(rand_int(min.count(), max.count()))); + } + + explicit ManualClock() = default; + +private: + inline static time_point now_ = time_point(seconds(0)); +}; + +/** Simulate server's OverlayImpl */ +class Overlay +{ +public: + Overlay() = default; + virtual ~Overlay() = default; + + virtual void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + Peer::id_t id, + SquelchCB f, + protocol::MessageType type = protocol::mtVALIDATION) = 0; + + virtual void deleteIdlePeers(UnsquelchCB) = 0; + + virtual void deletePeer(Peer::id_t, UnsquelchCB) = 0; +}; + +class Validator; + +/** Simulate link from a validator to a peer directly connected + * to the server. + */ +class Link +{ + using Latency = std::pair; + +public: + Link( + Validator& validator, + PeerSPtr peer, + Latency const& latency = {milliseconds(5), milliseconds(15)}) + : validator_(validator), peer_(peer), latency_(latency), up_(true) + { + auto sp = peer_.lock(); + assert(sp); + } + ~Link() = default; + void + send(MessageSPtr const& m, SquelchCB f) + { + if (!up_) + return; + auto sp = peer_.lock(); + assert(sp); + auto peer = std::dynamic_pointer_cast(sp); + peer->onMessage(m, f); + } + Validator& + validator() + { + return validator_; + } + void + up(bool linkUp) + { + up_ = linkUp; + } + Peer::id_t + peerId() + { + auto p = peer_.lock(); + assert(p); + return p->id(); + } + PeerSPtr + getPeer() + { + auto p = peer_.lock(); + assert(p); + return p; + } + +private: + Validator& validator_; + PeerWPtr peer_; + Latency latency_; + bool up_; +}; + +/** Simulate Validator */ +class Validator +{ + using Links = std::unordered_map; + +public: + Validator() + { + pkey_ = std::get<0>(randomKeyPair(KeyType::ed25519)); + protocol::TMValidation v; + v.set_validation("validation"); + message_ = std::make_shared(v, protocol::mtVALIDATION, pkey_); + id_ = sid_++; + } + Validator(Validator const&) = default; + Validator(Validator&&) = default; + Validator& + operator=(Validator const&) = default; + Validator& + operator=(Validator&&) = default; + ~Validator() + { + clear(); + } + + void + clear() + { + links_.clear(); + } + + static void + resetId() + { + sid_ = 0; + } + + PublicKey const& + key() + { + return pkey_; + } + + operator PublicKey() const + { + return pkey_; + } + + void + addPeer(PeerSPtr peer) + { + links_.emplace( + std::make_pair(peer->id(), std::make_shared(*this, peer))); + } + + void + deletePeer(Peer::id_t id) + { + links_.erase(id); + } + + void + for_links(std::vector peers, LinkIterCB f) + { + for (auto id : peers) + { + assert(links_.find(id) != links_.end()); + f(*links_[id], message_); + } + } + + void + for_links(LinkIterCB f, bool simulateSlow = false) + { + std::vector v; + std::transform( + links_.begin(), links_.end(), std::back_inserter(v), [](auto& kv) { + return kv.second; + }); + std::random_device d; + std::mt19937 g(d()); + std::shuffle(v.begin(), v.end(), g); + + for (auto& link : v) + { + f(*link, message_); + } + } + + /** Send to specific peers */ + void + send(std::vector peers, SquelchCB f) + { + for_links(peers, [&](Link& link, MessageSPtr m) { link.send(m, f); }); + } + + /** Send to all peers */ + void + send(SquelchCB f) + { + for_links([&](Link& link, MessageSPtr m) { link.send(m, f); }); + } + + MessageSPtr + message() + { + return message_; + } + + std::uint16_t + id() + { + return id_; + } + + void + linkUp(Peer::id_t id) + { + auto it = links_.find(id); + assert(it != links_.end()); + it->second->up(true); + } + + void + linkDown(Peer::id_t id) + { + auto it = links_.find(id); + assert(it != links_.end()); + it->second->up(false); + } + +private: + Links links_; + PublicKey pkey_{}; + MessageSPtr message_ = nullptr; + inline static std::uint16_t sid_ = 0; + std::uint16_t id_ = 0; +}; + +class PeerSim : public PeerPartial, public std::enable_shared_from_this +{ +public: + using id_t = Peer::id_t; + PeerSim(Overlay& overlay) : overlay_(overlay) + { + id_ = sid_++; + } + + ~PeerSim() = default; + + id_t + id() const override + { + return id_; + } + + static void + resetId() + { + sid_ = 0; + } + + /** Local Peer (PeerImp) */ + void + onMessage(MessageSPtr const& m, SquelchCB f) override + { + auto validator = m->getValidatorKey(); + assert(validator); + if (squelch_.isSquelched(*validator)) + return; + + overlay_.updateSlotAndSquelch({}, *validator, id(), f); + } + + /** Remote Peer (Directly connected Peer) */ + virtual void + onMessage(protocol::TMSquelch const& squelch) override + { + auto validator = squelch.validatorpubkey(); + PublicKey key(Slice(validator.data(), validator.size())); + squelch_.squelch(key, squelch.squelch(), squelch.squelchduration()); + } + +private: + inline static id_t sid_ = 0; + id_t id_; + Overlay& overlay_; + squelch::Squelch squelch_; +}; + +class OverlaySim : public Overlay, public squelch::SquelchHandler +{ + using Peers = std::unordered_map; + +public: + using id_t = Peer::id_t; + using clock_type = ManualClock; + OverlaySim(Application& app) : slots_(app, *this) + { + } + + ~OverlaySim() = default; + + void + clear() + { + peers_.clear(); + ManualClock::advance(hours(1)); + slots_.deleteIdlePeers(); + } + + std::uint16_t + inState(PublicKey const& validator, squelch::PeerState state) + { + auto res = slots_.inState(validator, state); + return res ? *res : 0; + } + + void + updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + Peer::id_t id, + SquelchCB f, + protocol::MessageType type = protocol::mtVALIDATION) override + { + squelch_ = f; + slots_.updateSlotAndSquelch(key, validator, id, type); + } + + void + deletePeer(id_t id, UnsquelchCB f) override + { + unsquelch_ = f; + slots_.deletePeer(id, true); + } + + void + deleteIdlePeers(UnsquelchCB f) override + { + unsquelch_ = f; + slots_.deleteIdlePeers(); + } + + PeerSPtr + addPeer(bool useCache = true) + { + PeerSPtr peer{}; + Peer::id_t id; + if (peersCache_.empty() || !useCache) + { + peer = std::make_shared(*this); + id = peer->id(); + } + else + { + auto it = peersCache_.begin(); + peer = it->second; + id = it->first; + peersCache_.erase(it); + } + peers_.emplace(std::make_pair(id, peer)); + return peer; + } + + void + deletePeer(Peer::id_t id, bool useCache = true) + { + auto it = peers_.find(id); + assert(it != peers_.end()); + deletePeer(id, [&](PublicKey const&, PeerWPtr) {}); + if (useCache) + peersCache_.emplace(std::make_pair(id, it->second)); + peers_.erase(it); + } + + void + resetPeers() + { + while (!peers_.empty()) + deletePeer(peers_.begin()->first); + while (!peersCache_.empty()) + addPeer(); + } + + std::optional + deleteLastPeer() + { + if (peers_.empty()) + return {}; + + std::uint8_t maxId = 0; + + for (auto& [id, _] : peers_) + { + (void)_; + if (id > maxId) + maxId = id; + } + + deletePeer(maxId, false); + + return maxId; + } + + bool + isCountingState(PublicKey const& validator) + { + return slots_.inState(validator, squelch::SlotState::Counting); + } + + std::set + getSelected(PublicKey const& validator) + { + return slots_.getSelected(validator); + } + + bool + isSelected(PublicKey const& validator, Peer::id_t peer) + { + auto selected = slots_.getSelected(validator); + return selected.find(peer) != selected.end(); + } + + id_t + getSelectedPeer(PublicKey const& validator) + { + auto selected = slots_.getSelected(validator); + assert(selected.size()); + return *selected.begin(); + } + + std::unordered_map< + id_t, + std::tuple< + squelch::PeerState, + std::uint16_t, + std::uint32_t, + std::uint32_t>> + getPeers(PublicKey const& validator) + { + return slots_.getPeers(validator); + } + + std::uint16_t + getNumPeers() const + { + return peers_.size(); + } + +private: + void + squelch( + PublicKey const& validator, + Peer::id_t id, + std::uint32_t squelchDuration) const override + { + if (auto it = peers_.find(id); it != peers_.end()) + squelch_(validator, it->second, squelchDuration); + } + void + unsquelch(PublicKey const& validator, Peer::id_t id) const override + { + if (auto it = peers_.find(id); it != peers_.end()) + unsquelch_(validator, it->second); + } + SquelchCB squelch_; + UnsquelchCB unsquelch_; + Peers peers_; + Peers peersCache_; + squelch::Slots slots_; +}; + +class Network +{ +public: + Network(Application& app) : overlay_(app) + { + init(); + } + + void + init() + { + validators_.resize(MAX_VALIDATORS); + for (int p = 0; p < MAX_PEERS; p++) + { + auto peer = overlay_.addPeer(); + for (auto& v : validators_) + v.addPeer(peer); + } + } + + ~Network() = default; + + void + reset() + { + validators_.clear(); + overlay_.clear(); + PeerSim::resetId(); + Validator::resetId(); + init(); + } + + Peer::id_t + addPeer() + { + auto peer = overlay_.addPeer(); + for (auto& v : validators_) + v.addPeer(peer); + return peer->id(); + } + + void + deleteLastPeer() + { + auto id = overlay_.deleteLastPeer(); + + if (!id) + return; + + for (auto& validator : validators_) + validator.deletePeer(*id); + } + + void + purgePeers() + { + while (overlay_.getNumPeers() > MAX_PEERS) + deleteLastPeer(); + } + + Validator& + validator(std::uint16_t v) + { + assert(v < validators_.size()); + return validators_[v]; + } + + OverlaySim& + overlay() + { + return overlay_; + } + + void + enableLink(std::uint16_t validatorId, Peer::id_t peer, bool enable) + { + auto it = + std::find_if(validators_.begin(), validators_.end(), [&](auto& v) { + return v.id() == validatorId; + }); + assert(it != validators_.end()); + if (enable) + it->linkUp(peer); + else + it->linkDown(peer); + } + + void + onDisconnectPeer(Peer::id_t peer) + { + // Send unsquelch to the Peer on all links. This way when + // the Peer "reconnects" it starts sending messages on the link. + // We expect that if a Peer disconnects and then reconnects, it's + // unsquelched. + protocol::TMSquelch squelch; + squelch.set_squelch(false); + for (auto& v : validators_) + { + PublicKey key = v; + squelch.clear_validatorpubkey(); + squelch.set_validatorpubkey(key.data(), key.size()); + v.for_links({peer}, [&](Link& l, MessageSPtr) { + std::dynamic_pointer_cast(l.getPeer())->send(squelch); + }); + } + } + + void + for_rand( + std::uint32_t min, + std::uint32_t max, + std::function f) + { + auto size = max - min; + std::vector s(size); + std::iota(s.begin(), s.end(), min); + std::random_device d; + std::mt19937 g(d()); + std::shuffle(s.begin(), s.end(), g); + for (auto v : s) + f(v); + } + + void + propagate( + LinkIterCB link, + std::uint16_t nValidators = MAX_VALIDATORS, + std::uint32_t nMessages = MAX_MESSAGES, + bool purge = true, + bool resetClock = true) + { + if (resetClock) + ManualClock::reset(); + + if (purge) + { + purgePeers(); + overlay_.resetPeers(); + } + + for (int m = 0; m < nMessages; ++m) + { + ManualClock::randAdvance(milliseconds(1800), milliseconds(2200)); + for_rand(0, nValidators, [&](std::uint32_t v) { + validators_[v].for_links(link); + }); + } + } + + /** Is peer in Selected state in any of the slots */ + bool + isSelected(Peer::id_t id) + { + for (auto& v : validators_) + { + if (overlay_.isSelected(v, id)) + return true; + } + return false; + } + + /** Check if there are peers to unsquelch - peer is in Selected + * state in any of the slots and there are peers in Squelched state + * in those slots. + */ + bool + allCounting(Peer::id_t peer) + { + for (auto& v : validators_) + { + if (!overlay_.isSelected(v, peer)) + continue; + auto peers = overlay_.getPeers(v); + for (auto& [_, v] : peers) + { + (void)_; + if (std::get(v) == + squelch::PeerState::Squelched) + return false; + } + } + return true; + } + +private: + OverlaySim overlay_; + std::vector validators_; +}; + +class reduce_relay_test : public beast::unit_test::suite +{ + using Slot = squelch::Slot; + using id_t = Peer::id_t; + +protected: + void + printPeers(const std::string& msg, std::uint16_t validator = 0) + { + auto peers = network_.overlay().getPeers(network_.validator(validator)); + std::cout << msg << " " + << "num peers " << (int)network_.overlay().getNumPeers() + << std::endl; + for (auto& [k, v] : peers) + std::cout << k << ":" << (int)std::get(v) + << " "; + std::cout << std::endl; + } + + /** Send squelch (if duration is set) or unsquelch (if duration not set) */ + Peer::id_t + sendSquelch( + PublicKey const& validator, + PeerWPtr const& peerPtr, + std::optional duration) + { + protocol::TMSquelch squelch; + bool res = duration ? true : false; + squelch.set_squelch(res); + squelch.set_validatorpubkey(validator.data(), validator.size()); + if (res) + squelch.set_squelchduration(*duration); + auto sp = peerPtr.lock(); + assert(sp); + std::dynamic_pointer_cast(sp)->send(squelch); + return sp->id(); + } + + enum State { On, Off, WaitReset }; + enum EventType { LinkDown = 0, PeerDisconnected = 1 }; + // Link down or Peer disconnect event + // TBD - add new peer event + // TBD - add overlapping type of events at any + // time in any quantity + struct Event + { + State state_ = State::Off; + std::uint32_t cnt_ = 0; + std::uint32_t handledCnt_ = 0; + bool isSelected_ = false; + Peer::id_t peer_; + std::uint16_t validator_; + PublicKey key_; + time_point time_; + bool handled_ = false; + }; + + /** Randomly brings the link between a validator and a peer down. + * Randomly disconnects a peer. Those events are generated one at a time. + */ + void + random(bool log) + { + std::unordered_map events{ + {LinkDown, {}}, {PeerDisconnected, {}}}; + time_point lastCheck = ManualClock::now(); + + network_.reset(); + network_.propagate([&](Link& link, MessageSPtr m) { + auto& validator = link.validator(); + auto now = ManualClock::now(); + + bool squelched = false; + std::stringstream str; + + link.send( + m, + [&](PublicKey const& key, + PeerWPtr const& peerPtr, + std::uint32_t duration) { + assert(key == validator); + auto p = sendSquelch(key, peerPtr, duration); + squelched = true; + str << p << " "; + }); + + if (squelched) + { + auto selected = network_.overlay().getSelected(validator); + str << " selected: "; + for (auto s : selected) + str << s << " "; + if (log) + std::cout + << (double)squelch::epoch(now).count() / + 1000. + << " random, squelched, validator: " << validator.id() + << " peers: " << str.str() << std::endl; + auto countingState = + network_.overlay().isCountingState(validator); + BEAST_EXPECT( + countingState == false && + selected.size() == squelch::MAX_SELECTED_PEERS); + } + + // Trigger Link Down or Peer Disconnect event + // Only one Link Down at a time + if (events[EventType::LinkDown].state_ == State::Off) + { + auto update = [&](EventType event) { + events[event].cnt_++; + events[event].validator_ = validator.id(); + events[event].key_ = validator; + events[event].peer_ = link.peerId(); + events[event].state_ = State::On; + events[event].time_ = now; + if (event == EventType::LinkDown) + { + network_.enableLink( + validator.id(), link.peerId(), false); + events[event].isSelected_ = + network_.overlay().isSelected( + validator, link.peerId()); + } + else + events[event].isSelected_ = + network_.isSelected(link.peerId()); + }; + auto r = rand_int(0, 1000); + if (r == (int)EventType::LinkDown || + r == (int)EventType::PeerDisconnected) + { + update(static_cast(r)); + } + } + + if (events[EventType::PeerDisconnected].state_ == State::On) + { + auto& event = events[EventType::PeerDisconnected]; + bool allCounting = network_.allCounting(event.peer_); + network_.overlay().deletePeer( + event.peer_, + [&](PublicKey const& v, PeerWPtr const& peerPtr) { + if (event.isSelected_) + sendSquelch(v, peerPtr, {}); + event.handled_ = true; + }); + // Should only be unsquelched if the peer is in Selected state + // If in Selected state it's possible unsquelching didn't + // take place because there is no peers in Squelched state in + // any of the slots where the peer is in Selected state + // (allCounting is true) + bool handled = + (event.isSelected_ == false && !event.handled_) || + (event.isSelected_ == true && + (event.handled_ || allCounting)); + BEAST_EXPECT(handled); + event.state_ = State::Off; + event.isSelected_ = false; + event.handledCnt_ += handled; + event.handled_ = false; + network_.onDisconnectPeer(event.peer_); + } + + auto& event = events[EventType::LinkDown]; + // Check every sec for idled peers. Idled peers are + // created by Link Down event. + if (now - lastCheck > milliseconds(1000)) + { + lastCheck = now; + // Check if Link Down event must be handled by + // deleteIdlePeer(): 1) the peer is in Selected state; + // 2) the peer has not received any messages for IDLED time; + // 3) there are peers in Squelched state in the slot. + // 4) peer is in Slot's peers_ (if not then it is deleted + // by Slots::deleteIdlePeers()) + bool mustHandle = false; + if (event.state_ == State::On) + { + event.isSelected_ = + network_.overlay().isSelected(event.key_, event.peer_); + auto peers = network_.overlay().getPeers(event.key_); + auto d = squelch::epoch(now).count() - + std::get<3>(peers[event.peer_]); + mustHandle = event.isSelected_ && + d > milliseconds(squelch::IDLED).count() && + network_.overlay().inState( + event.key_, squelch::PeerState::Squelched) > 0 && + peers.find(event.peer_) != peers.end(); + } + network_.overlay().deleteIdlePeers( + [&](PublicKey const& v, PeerWPtr const& ptr) { + event.handled_ = true; + if (mustHandle && v == event.key_) + { + event.state_ = State::WaitReset; + sendSquelch(validator, ptr, {}); + } + }); + bool handled = + (event.handled_ && event.state_ == State::WaitReset) || + (!event.handled_ && !mustHandle); + BEAST_EXPECT(handled); + } + if (event.state_ == State::WaitReset || + (event.state_ == State::On && + (now - event.time_ > (squelch::IDLED + seconds(2))))) + { + bool handled = + event.state_ == State::WaitReset || !event.handled_; + BEAST_EXPECT(handled); + event.state_ = State::Off; + event.isSelected_ = false; + event.handledCnt_ += handled; + event.handled_ = false; + network_.enableLink(event.validator_, event.peer_, true); + } + }); + + auto& down = events[EventType::LinkDown]; + auto& disconnected = events[EventType::PeerDisconnected]; + // It's possible the last Down Link event is not handled + BEAST_EXPECT(down.handledCnt_ >= down.cnt_ - 1); + // All Peer Disconnect events must be handled + BEAST_EXPECT(disconnected.cnt_ == disconnected.handledCnt_); + if (log) + std::cout << "link down count: " << down.cnt_ << "/" + << down.handledCnt_ + << " peer disconnect count: " << disconnected.cnt_ << "/" + << disconnected.handledCnt_; + } + + bool + checkCounting(PublicKey const& validator, bool isCountingState) + { + auto countingState = network_.overlay().isCountingState(validator); + BEAST_EXPECT(countingState == isCountingState); + return countingState == isCountingState; + } + + void + doTest(const std::string& msg, bool log, std::function f) + { + testcase(msg); + f(log); + } + + /** Initial counting round: three peers receive message "faster" then + * others. Once the message count for the three peers reaches threshold + * the rest of the peers are squelched and the slot for the given validator + * is in Selected state. + */ + void + testInitialRound(bool log) + { + doTest("Initial Round", log, [this](bool log) { + BEAST_EXPECT(propagateAndSquelch(log)); + }); + } + + /** Receiving message from squelched peer too soon should not change the + * slot's state to Counting. + */ + void + testPeerUnsquelchedTooSoon(bool log) + { + doTest("Peer Unsquelched Too Soon", log, [this](bool log) { + BEAST_EXPECT(propagateNoSquelch(log, 1, false, false, false)); + }); + } + + /** Receiving message from squelched peer should change the + * slot's state to Counting. + */ + void + testPeerUnsquelched(bool log) + { + ManualClock::advance(seconds(601)); + doTest("Peer Unsquelched", log, [this](bool log) { + BEAST_EXPECT(propagateNoSquelch(log, 2, true, true, false)); + }); + } + + /** Propagate enough messages to generate one squelch event */ + bool + propagateAndSquelch(bool log, bool purge = true, bool resetClock = true) + { + int n = 0; + network_.propagate( + [&](Link& link, MessageSPtr message) { + std::uint16_t squelched = 0; + link.send( + message, + [&](PublicKey const& key, + PeerWPtr const& peerPtr, + std::uint32_t duration) { + squelched++; + sendSquelch(key, peerPtr, duration); + }); + if (squelched) + { + BEAST_EXPECT( + squelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); + n++; + } + }, + 1, + squelch::MAX_MESSAGE_THRESHOLD + 2, + purge, + resetClock); + auto selected = network_.overlay().getSelected(network_.validator(0)); + BEAST_EXPECT(selected.size() == squelch::MAX_SELECTED_PEERS); + BEAST_EXPECT(n == 1); // only one selection round + auto res = checkCounting(network_.validator(0), false); + BEAST_EXPECT(res); + return n == 1 && res; + } + + /** Send fewer message so that squelch event is not generated */ + bool + propagateNoSquelch( + bool log, + std::uint16_t nMessages, + bool countingState, + bool purge = true, + bool resetClock = true) + { + bool squelched = false; + network_.propagate( + [&](Link& link, MessageSPtr message) { + link.send( + message, + [&](PublicKey const& key, + PeerWPtr const& peerPtr, + std::uint32_t duration) { + squelched = true; + BEAST_EXPECT(false); + }); + }, + 1, + nMessages, + purge, + resetClock); + auto res = checkCounting(network_.validator(0), countingState); + return !squelched && res; + } + + /** Receiving a message from new peer should change the + * slot's state to Counting. + */ + void + testNewPeer(bool log) + { + doTest("New Peer", log, [this](bool log) { + BEAST_EXPECT(propagateAndSquelch(log, true, false)); + network_.addPeer(); + BEAST_EXPECT(propagateNoSquelch(log, 1, true, false, false)); + }); + } + + /** Selected peer disconnects. Should change the state to counting and + * unsquelch squelched peers. */ + void + testSelectedPeerDisconnects(bool log) + { + doTest("Selected Peer Disconnects", log, [this](bool log) { + ManualClock::advance(seconds(601)); + BEAST_EXPECT(propagateAndSquelch(log, true, false)); + auto id = network_.overlay().getSelectedPeer(network_.validator(0)); + std::uint16_t unsquelched = 0; + network_.overlay().deletePeer( + id, [&](PublicKey const& key, PeerWPtr const& peer) { + unsquelched++; + }); + BEAST_EXPECT( + unsquelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); + BEAST_EXPECT(checkCounting(network_.validator(0), true)); + }); + } + + /** Selected peer stops relaying. Should change the state to counting and + * unsquelch squelched peers. */ + void + testSelectedPeerStopsRelaying(bool log) + { + doTest("Selected Peer Stops Relaying", log, [this](bool log) { + ManualClock::advance(seconds(601)); + BEAST_EXPECT(propagateAndSquelch(log, true, false)); + ManualClock::advance(squelch::IDLED + seconds(1)); + std::uint16_t unsquelched = 0; + network_.overlay().deleteIdlePeers( + [&](PublicKey const& key, PeerWPtr const& peer) { + unsquelched++; + }); + auto peers = network_.overlay().getPeers(network_.validator(0)); + BEAST_EXPECT( + unsquelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); + BEAST_EXPECT(checkCounting(network_.validator(0), true)); + }); + } + + /** Squelched peer disconnects. Should not change the state to counting. + */ + void + testSquelchedPeerDisconnects(bool log) + { + doTest("Squelched Peer Disconnects", log, [this](bool log) { + ManualClock::advance(seconds(601)); + BEAST_EXPECT(propagateAndSquelch(log, true, false)); + auto peers = network_.overlay().getPeers(network_.validator(0)); + auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) { + return std::get(it.second) == + squelch::PeerState::Squelched; + }); + assert(it != peers.end()); + std::uint16_t unsquelched = 0; + network_.overlay().deletePeer( + it->first, [&](PublicKey const& key, PeerWPtr const& peer) { + unsquelched++; + }); + BEAST_EXPECT(unsquelched == 0); + BEAST_EXPECT(checkCounting(network_.validator(0), false)); + }); + } + + void + testConfig(bool log) + { + doTest("Config Test", log, [&](bool log) { + Config c; + + std::string toLoad(R"rippleConfig( +[reduce_relay] +enable=1 +squelch=1 +)rippleConfig"); + + c.loadFromString(toLoad); + BEAST_EXPECT(c.REDUCE_RELAY_ENABLE == true); + BEAST_EXPECT(c.REDUCE_RELAY_SQUELCH == true); + + Config c1; + + toLoad = (R"rippleConfig( +[reduce_relay] +enable=0 +squelch=0 +)rippleConfig"); + + c1.loadFromString(toLoad); + BEAST_EXPECT(c1.REDUCE_RELAY_ENABLE == false); + BEAST_EXPECT(c1.REDUCE_RELAY_SQUELCH == false); + + Config c2; + + toLoad = R"rippleConfig( +[reduce_relay] +enabled=1 +squelched=1 +)rippleConfig"; + + c2.loadFromString(toLoad); + BEAST_EXPECT(c2.REDUCE_RELAY_ENABLE == false); + BEAST_EXPECT(c2.REDUCE_RELAY_SQUELCH == false); + }); + } + + void + testInternalHashRouter(bool log) + { + doTest("Duplicate Message", log, [&](bool log) { + network_.reset(); + // update message count for the same peer/validator + std::int16_t nMessages = 5; + for (int i = 0; i < nMessages; i++) + { + uint256 key(i); + network_.overlay().updateSlotAndSquelch( + key, + network_.validator(0), + 0, + [&](PublicKey const&, PeerWPtr, std::uint32_t) {}); + } + auto peers = network_.overlay().getPeers(network_.validator(0)); + // first message changes Slot state to Counting and is not counted, + // hence '-1'. + BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1)); + // add duplicate + uint256 key(nMessages - 1); + network_.overlay().updateSlotAndSquelch( + key, + network_.validator(0), + 0, + [&](PublicKey const&, PeerWPtr, std::uint32_t) {}); + // confirm the same number of messages + peers = network_.overlay().getPeers(network_.validator(0)); + BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1)); + // advance the clock + ManualClock::advance(squelch::IDLED + seconds(1)); + network_.overlay().updateSlotAndSquelch( + key, + network_.validator(0), + 0, + [&](PublicKey const&, PeerWPtr, std::uint32_t) {}); + peers = network_.overlay().getPeers(network_.validator(0)); + // confirm message number increased + BEAST_EXPECT(std::get<1>(peers[0]) == nMessages); + }); + } + + jtx::Env env_; + Network network_; + +public: + reduce_relay_test() : env_(*this), network_(env_.app()) + { + } + + void + run() override + { + bool log = false; + testConfig(log); + testInitialRound(log); + testPeerUnsquelchedTooSoon(log); + testPeerUnsquelched(log); + testNewPeer(log); + testSquelchedPeerDisconnects(log); + testSelectedPeerDisconnects(log); + testSelectedPeerStopsRelaying(log); + testInternalHashRouter(log); + } +}; + +class reduce_relay_simulate_test : public reduce_relay_test +{ + void + testRandom(bool log) + { + doTest("Random Test", log, [&](bool log) { random(log); }); + } + + void + run() override + { + bool log = false; + testRandom(log); + } +}; + +BEAST_DEFINE_TESTSUITE(reduce_relay, ripple_data, ripple); +BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, ripple_data, ripple); + +} // namespace test + +} // namespace ripple \ No newline at end of file