Optimize relaying of validation and proposal messages:

With few exceptions, servers will typically receive multiple copies
of any given message from its directly connected peers. For servers
with several peers this can impact the processing latency and force
it to do redundant work. Proposal and validation messages are often
relayed with extremely high redundancy.

This commit, if merged, introduces experimental code that attempts
to optimize the relaying of proposals and validations by allowing
servers to instruct their peers to "squelch" delivery of selected
proposals and validations. Servers making squelching decisions by
a process that evaluates the fitness and performance of a given
server and randomly selecting a subset of the best candidates.

The experimental code is presently disabled and must be explicitly
enabled by server operators that wish to test it.
This commit is contained in:
Gregory Tsipenyuk
2020-04-12 10:03:03 -04:00
committed by Nik Bougalis
parent 7724cca384
commit 9b9f34f881
23 changed files with 2714 additions and 36 deletions

View File

@@ -867,6 +867,7 @@ target_sources (rippled PRIVATE
src/test/overlay/cluster_test.cpp
src/test/overlay/short_read_test.cpp
src/test/overlay/compression_test.cpp
src/test/overlay/reduce_relay_test.cpp
#[===============================[
test sources:
subdir: peerfinder

View File

@@ -136,6 +136,7 @@ else
# ORDER matters here...sorted in approximately
# descending execution time (longest running tests at top)
declare -a manual_tests=(
'ripple.ripple_data.reduce_relay_simulate'
'ripple.ripple_data.digest'
'ripple.tx.Offer_manual'
'ripple.app.PayStrandAllPairs'

View File

@@ -157,7 +157,7 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos)
auto const sig = peerPos.signature();
prop.set_signature(sig.data(), sig.size());
app_.overlay().relay(prop, peerPos.suppressionID());
app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
}
void

View File

@@ -49,12 +49,18 @@ HashRouter::addSuppression(uint256 const& key)
bool
HashRouter::addSuppressionPeer(uint256 const& key, PeerShortID peer)
{
return addSuppressionPeerWithStatus(key, peer).first;
}
std::pair<bool, std::optional<Stopwatch::time_point>>
HashRouter::addSuppressionPeerWithStatus(const uint256& key, PeerShortID peer)
{
std::lock_guard lock(mutex_);
auto result = emplace(key);
result.first.addPeer(peer);
return result.second;
return {result.second, result.first.relayed()};
}
bool
@@ -110,14 +116,14 @@ HashRouter::setFlags(uint256 const& key, int flags)
auto
HashRouter::shouldRelay(uint256 const& key)
-> boost::optional<std::set<PeerShortID>>
-> std::optional<std::set<PeerShortID>>
{
std::lock_guard lock(mutex_);
auto& s = emplace(key).first;
if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_))
return boost::none;
return {};
return s.releasePeerSet();
}

View File

@@ -97,6 +97,13 @@ private:
return std::move(peers_);
}
/** Return seated relay time point if the message has been relayed */
std::optional<Stopwatch::time_point>
relayed() const
{
return relayed_;
}
/** Determines if this item should be relayed.
Checks whether the item has been recently relayed.
@@ -142,8 +149,8 @@ private:
std::set<PeerShortID> peers_;
// This could be generalized to a map, if more
// than one flag needs to expire independently.
boost::optional<Stopwatch::time_point> relayed_;
boost::optional<Stopwatch::time_point> processed_;
std::optional<Stopwatch::time_point> relayed_;
std::optional<Stopwatch::time_point> processed_;
std::uint32_t recoveries_ = 0;
};
@@ -185,6 +192,14 @@ public:
bool
addSuppressionPeer(uint256 const& key, PeerShortID peer);
/** Add a suppression peer and get message's relay status.
* Return pair:
* element 1: true if the peer is added.
* element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>>
addSuppressionPeerWithStatus(uint256 const& key, PeerShortID peer);
bool
addSuppressionPeer(uint256 const& key, PeerShortID peer, int& flags);
@@ -214,11 +229,11 @@ public:
return `true` again until the hold time has expired.
The internal set of peers will also be reset.
@return A `boost::optional` set of peers which do not need to be
@return A `std::optional` set of peers which do not need to be
relayed to. If the result is uninitialized, the item should
_not_ be relayed.
*/
boost::optional<std::set<PeerShortID>>
std::optional<std::set<PeerShortID>>
shouldRelay(uint256 const& key);
/** Determines whether the hashed item should be recovered

View File

@@ -179,6 +179,12 @@ public:
// Thread pool configuration
std::size_t WORKERS = 0;
// Reduce-relay - these parameters are experimental.
// Enable reduce-relay functionality
bool REDUCE_RELAY_ENABLE = false;
// Send squelch message to peers
bool REDUCE_RELAY_SQUELCH = false;
// These override the command line client settings
boost::optional<beast::IP::Endpoint> rpc_ip;

View File

@@ -70,6 +70,7 @@ struct ConfigSection
#define SECTION_PATH_SEARCH_MAX "path_search_max"
#define SECTION_PEER_PRIVATE "peer_private"
#define SECTION_PEERS_MAX "peers_max"
#define SECTION_REDUCE_RELAY "reduce_relay"
#define SECTION_RELAY_PROPOSALS "relay_proposals"
#define SECTION_RELAY_VALIDATIONS "relay_validations"
#define SECTION_RPC_STARTUP "rpc_startup"

View File

@@ -481,6 +481,13 @@ Config::loadFromString(std::string const& fileContents)
if (getSingleSection(secConfig, SECTION_COMPRESSION, strTemp, j_))
COMPRESSION = beast::lexicalCastThrow<bool>(strTemp);
if (exists(SECTION_REDUCE_RELAY))
{
auto sec = section(SECTION_REDUCE_RELAY);
REDUCE_RELAY_ENABLE = sec.value_or("enable", false);
REDUCE_RELAY_SQUELCH = sec.value_or("squelch", false);
}
if (getSingleSection(
secConfig, SECTION_AMENDMENT_MAJORITY_TIME, strTemp, j_))
{

View File

@@ -21,6 +21,7 @@
#define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED
#include <ripple/overlay/Compression.h>
#include <ripple/protocol/PublicKey.h>
#include <ripple/protocol/messages.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/buffers_iterator.hpp>
@@ -55,8 +56,13 @@ public:
/** Constructor
* @param message Protocol message to serialize
* @param type Protocol message type
* @param validator Public Key of the source validator for Validation or
* Proposal message. Used to check if the message should be squelched.
*/
Message(::google::protobuf::Message const& message, int type);
Message(
::google::protobuf::Message const& message,
int type,
boost::optional<PublicKey> const& validator = {});
/** Retrieve the packed message data. If compressed message is requested but
* the message is not compressible then the uncompressed buffer is returned.
@@ -74,11 +80,19 @@ public:
return category_;
}
/** Get the validator's key */
boost::optional<PublicKey> const&
getValidatorKey() const
{
return validatorKey_;
}
private:
std::vector<uint8_t> buffer_;
std::vector<uint8_t> bufferCompressed_;
std::size_t category_;
std::once_flag once_flag_;
boost::optional<PublicKey> validatorKey_;
/** Set the payload header
* @param in Pointer to the payload

View File

@@ -133,7 +133,7 @@ public:
/** Returns the peer with the matching short id, or null. */
virtual std::shared_ptr<Peer>
findPeerByShortID(Peer::id_t const& id) = 0;
findPeerByShortID(Peer::id_t const& id) const = 0;
/** Returns the peer with the matching public key, or null. */
virtual std::shared_ptr<Peer>
@@ -147,13 +147,29 @@ public:
virtual void
broadcast(protocol::TMValidation& m) = 0;
/** Relay a proposal. */
virtual void
relay(protocol::TMProposeSet& m, uint256 const& uid) = 0;
/** Relay a proposal.
* @param m the serialized proposal
* @param uid the id used to identify this proposal
* @param validator The pubkey of the validator that issued this proposal
* @return the set of peers which have already sent us this proposal
*/
virtual std::set<Peer::id_t>
relay(
protocol::TMProposeSet& m,
uint256 const& uid,
PublicKey const& validator) = 0;
/** Relay a validation. */
virtual void
relay(protocol::TMValidation& m, uint256 const& uid) = 0;
/** Relay a validation.
* @param m the serialized validation
* @param uid the id used to identify this validation
* @param validator The pubkey of the validator that issued this validation
* @return the set of peers which have already sent us this validation
*/
virtual std::set<Peer::id_t>
relay(
protocol::TMValidation& m,
uint256 const& uid,
PublicKey const& validator) = 0;
/** Visit every active peer.
*

734
src/ripple/overlay/Slot.h Normal file
View File

@@ -0,0 +1,734 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED
#define RIPPLE_OVERLAY_SLOT_H_INCLUDED
#include <ripple/app/main/Application.h>
#include <ripple/basics/chrono.h>
#include <ripple/beast/container/aged_unordered_map.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/overlay/Peer.h>
#include <ripple/overlay/Squelch.h>
#include <ripple/overlay/SquelchCommon.h>
#include <ripple/protocol/PublicKey.h>
#include <ripple.pb.h>
#include <algorithm>
#include <memory>
#include <optional>
#include <set>
#include <tuple>
#include <unordered_map>
#include <unordered_set>
namespace ripple {
namespace squelch {
template <typename clock_type>
class Slots;
/** Peer's State */
enum class PeerState : uint8_t {
Counting, // counting messages
Selected, // selected to relay, counting if Slot in Counting
Squelched, // squelched, doesn't relay
};
/** Slot's State */
enum class SlotState : uint8_t {
Counting, // counting messages
Selected, // peers selected, stop counting
};
template <typename Unit, typename TP>
Unit
epoch(TP const& t)
{
return duration_cast<Unit>(t.time_since_epoch());
}
/** Abstract class. Declares squelch and unsquelch handlers.
* OverlayImpl inherits from this class. Motivation is
* for easier unit tests to facilitate on the fly
* changing callbacks. */
class SquelchHandler
{
public:
virtual ~SquelchHandler()
{
}
/** Squelch handler
* @param validator Public key of the source validator
* @param id Peer's id to squelch
* @param duration Squelch duration in seconds
*/
virtual void
squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration)
const = 0;
/** Unsquelch handler
* @param validator Public key of the source validator
* @param id Peer's id to unsquelch
*/
virtual void
unsquelch(PublicKey const& validator, Peer::id_t id) const = 0;
};
/**
* Slot is associated with a specific validator via validator's public key.
* Slot counts messages from a validator, selects peers to be the source
* of the messages, and communicates the peers to be squelched. Slot can be
* in the following states: 1) Counting. This is the peer selection state
* when Slot counts the messages and selects the peers; 2) Selected. Slot
* doesn't count messages in Selected state. A message received from
* unsquelched, disconnected peer, or idling peer may transition Slot to
* Counting state.
*/
template <typename clock_type>
class Slot final
{
private:
friend class Slots<clock_type>;
using id_t = Peer::id_t;
using time_point = typename clock_type::time_point;
/** Constructor
* @param journal Journal for logging
* @param handler Squelch/Unsquelch implementation
*/
Slot(SquelchHandler const& handler, beast::Journal journal)
: reachedThreshold_(0)
, lastSelected_(clock_type::now())
, state_(SlotState::Counting)
, handler_(handler)
, journal_(journal)
{
}
/** Update peer info. If the message is from a new
* peer or from a previously expired squelched peer then switch
* the peer's and slot's state to Counting. If time of last
* selection round is > 2 * MAX_UNSQUELCH_EXPIRE then switch the slot's
* state to Counting. If the number of messages for the peer
* is > MIN_MESSAGE_THRESHOLD then add peer to considered peers pool.
* If the number of considered peers who reached MAX_MESSAGE_THRESHOLD is
* MAX_SELECTED_PEERS then randomly select MAX_SELECTED_PEERS from
* considered peers, and call squelch handler for each peer, which is not
* selected and not already in Squelched state. Set the state for those
* peers to Squelched and reset the count of all peers. Set slot's state to
* Selected. Message count is not updated when the slot is in Selected
* state.
* @param validator Public key of the source validator
* @param id Peer id which received the message
* @param type Message type (Validation and Propose Set only,
* others are ignored, future use)
*/
void
update(PublicKey const& validator, id_t id, protocol::MessageType type);
/** Handle peer deletion when a peer disconnects.
* If the peer is in Selected state then
* call unsquelch handler for every peer in squelched state and reset
* every peer's state to Counting. Switch Slot's state to Counting.
* @param validator Public key of the source validator
* @param id Deleted peer id
* @param erase If true then erase the peer. The peer is not erased
* when the peer when is idled. The peer is deleted when it
* disconnects
*/
void
deletePeer(PublicKey const& validator, id_t id, bool erase);
/** Get the time of the last peer selection round */
const time_point&
getLastSelected() const
{
return lastSelected_;
}
/** Return number of peers in state */
std::uint16_t
inState(PeerState state) const;
/** Return number of peers not in state */
std::uint16_t
notInState(PeerState state) const;
/** Return Slot's state */
SlotState
getState() const
{
return state_;
}
/** Return selected peers */
std::set<id_t>
getSelected() const;
/** Get peers info. Return map of peer's state, count, squelch
* expiration milsec, and last message time milsec.
*/
std::
unordered_map<id_t, std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
getPeers() const;
/** Check if peers stopped relaying messages. If a peer is
* selected peer then call unsquelch handler for all
* currently squelched peers and switch the slot to
* Counting state.
* @param validator Public key of the source validator
*/
void
deleteIdlePeer(PublicKey const& validator);
private:
/** Reset counts of peers in Selected or Counting state */
void
resetCounts();
/** Initialize slot to Counting state */
void
initCounting();
/** Data maintained for each peer */
struct PeerInfo
{
PeerState state; // peer's state
std::size_t count; // message count
time_point expire; // squelch expiration time
time_point lastMessage; // time last message received
};
std::unordered_map<id_t, PeerInfo> peers_; // peer's data
// pool of peers considered as the source of messages
// from validator - peers that reached MIN_MESSAGE_THRESHOLD
std::unordered_set<id_t> considered_;
// number of peers that reached MAX_MESSAGE_THRESHOLD
std::uint16_t reachedThreshold_;
// last time peers were selected, used to age the slot
typename clock_type::time_point lastSelected_;
SlotState state_; // slot's state
SquelchHandler const& handler_; // squelch/unsquelch handler
beast::Journal const journal_; // logging
};
template <typename clock_type>
void
Slot<clock_type>::deleteIdlePeer(PublicKey const& validator)
{
auto now = clock_type::now();
for (auto it = peers_.begin(); it != peers_.end();)
{
auto& peer = it->second;
auto id = it->first;
++it;
if (now - peer.lastMessage > IDLED)
{
JLOG(journal_.debug())
<< "deleteIdlePeer: " << Slice(validator) << " " << id
<< " idled "
<< duration_cast<seconds>(now - peer.lastMessage).count()
<< " selected " << (peer.state == PeerState::Selected);
deletePeer(validator, id, false);
}
}
}
template <typename clock_type>
void
Slot<clock_type>::update(
PublicKey const& validator,
id_t id,
protocol::MessageType type)
{
auto now = clock_type::now();
auto it = peers_.find(id);
// First message from this peer
if (it == peers_.end())
{
JLOG(journal_.debug())
<< "update: adding peer " << Slice(validator) << " " << id;
peers_.emplace(
std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now}));
initCounting();
return;
}
// Message from a peer with expired squelch
if (it->second.state == PeerState::Squelched && now > it->second.expire)
{
JLOG(journal_.debug())
<< "update: squelch expired " << Slice(validator) << " " << id;
it->second.state = PeerState::Counting;
it->second.lastMessage = now;
initCounting();
return;
}
auto& peer = it->second;
JLOG(journal_.debug())
<< "update: existing peer " << Slice(validator) << " " << id
<< " slot state " << static_cast<int>(state_) << " peer state "
<< static_cast<int>(peer.state) << " count " << peer.count << " last "
<< duration_cast<milliseconds>(now - peer.lastMessage).count()
<< " pool " << considered_.size() << " threshold " << reachedThreshold_
<< " " << (type == protocol::mtVALIDATION ? "validation" : "proposal");
peer.lastMessage = now;
if (state_ != SlotState::Counting || peer.state == PeerState::Squelched)
return;
if (++peer.count > MIN_MESSAGE_THRESHOLD)
considered_.insert(id);
if (peer.count == (MAX_MESSAGE_THRESHOLD + 1))
++reachedThreshold_;
if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE)
{
JLOG(journal_.debug())
<< "update: resetting due to inactivity " << Slice(validator) << " "
<< id << " " << duration_cast<seconds>(now - lastSelected_).count();
initCounting();
return;
}
if (reachedThreshold_ == MAX_SELECTED_PEERS)
{
// Randomly select MAX_SELECTED_PEERS peers from considered.
// Exclude peers that have been idling > IDLED -
// it's possible that deleteIdlePeer() has not been called yet.
// If number of remaining peers != MAX_SELECTED_PEERS
// then reset the Counting state and let deleteIdlePeer() handle
// idled peers.
std::unordered_set<id_t> selected;
auto const consideredPoolSize = considered_.size();
while (selected.size() != MAX_SELECTED_PEERS && considered_.size() != 0)
{
auto i =
considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1);
auto it = std::next(considered_.begin(), i);
auto id = *it;
considered_.erase(it);
auto const& itpeers = peers_.find(id);
if (itpeers == peers_.end())
{
JLOG(journal_.error()) << "update: peer not found "
<< Slice(validator) << " " << id;
continue;
}
if (now - itpeers->second.lastMessage < IDLED)
selected.insert(id);
}
if (selected.size() != MAX_SELECTED_PEERS)
{
JLOG(journal_.debug())
<< "update: selection failed " << Slice(validator) << " " << id;
initCounting();
return;
}
lastSelected_ = now;
auto s = selected.begin();
JLOG(journal_.debug())
<< "update: " << Slice(validator) << " " << id << " pool size "
<< consideredPoolSize << " selected " << *s << " "
<< *std::next(s, 1) << " " << *std::next(s, 2);
// squelch peers which are not selected and
// not already squelched
std::stringstream str;
for (auto& [k, v] : peers_)
{
v.count = 0;
if (selected.find(k) != selected.end())
v.state = PeerState::Selected;
else if (v.state != PeerState::Squelched)
{
if (journal_.debug())
str << k << " ";
v.state = PeerState::Squelched;
auto duration = Squelch<clock_type>::getSquelchDuration();
v.expire = now + duration;
handler_.squelch(
validator,
k,
duration_cast<milliseconds>(duration).count());
}
}
JLOG(journal_.debug()) << "update: squelching " << Slice(validator)
<< " " << id << " " << str.str();
considered_.clear();
reachedThreshold_ = 0;
state_ = SlotState::Selected;
}
}
template <typename clock_type>
void
Slot<clock_type>::deletePeer(PublicKey const& validator, id_t id, bool erase)
{
auto it = peers_.find(id);
if (it != peers_.end())
{
JLOG(journal_.debug())
<< "deletePeer: " << Slice(validator) << " " << id << " selected "
<< (it->second.state == PeerState::Selected) << " considered "
<< (considered_.find(id) != considered_.end()) << " erase "
<< erase;
auto now = clock_type::now();
if (it->second.state == PeerState::Selected)
{
for (auto& [k, v] : peers_)
{
if (v.state == PeerState::Squelched)
handler_.unsquelch(validator, k);
v.state = PeerState::Counting;
v.count = 0;
v.expire = now;
}
considered_.clear();
reachedThreshold_ = 0;
state_ = SlotState::Counting;
}
else if (considered_.find(id) != considered_.end())
{
if (it->second.count > MAX_MESSAGE_THRESHOLD)
--reachedThreshold_;
considered_.erase(id);
}
it->second.lastMessage = now;
it->second.count = 0;
if (erase)
peers_.erase(it);
}
}
template <typename clock_type>
void
Slot<clock_type>::resetCounts()
{
for (auto& [_, peer] : peers_)
{
(void)_;
peer.count = 0;
}
}
template <typename clock_type>
void
Slot<clock_type>::initCounting()
{
state_ = SlotState::Counting;
considered_.clear();
reachedThreshold_ = 0;
resetCounts();
}
template <typename clock_type>
std::uint16_t
Slot<clock_type>::inState(PeerState state) const
{
return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) {
return (it.second.state == state);
});
}
template <typename clock_type>
std::uint16_t
Slot<clock_type>::notInState(PeerState state) const
{
return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) {
return (it.second.state != state);
});
}
template <typename clock_type>
std::set<typename Peer::id_t>
Slot<clock_type>::getSelected() const
{
std::set<id_t> init;
return std::accumulate(
peers_.begin(), peers_.end(), init, [](auto& init, auto const& it) {
if (it.second.state == PeerState::Selected)
{
init.insert(it.first);
return init;
}
return init;
});
}
template <typename clock_type>
std::unordered_map<
typename Peer::id_t,
std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
Slot<clock_type>::getPeers() const
{
auto init = std::unordered_map<
id_t,
std::tuple<PeerState, std::uint16_t, std::uint32_t, std::uint32_t>>();
return std::accumulate(
peers_.begin(), peers_.end(), init, [](auto& init, auto const& it) {
init.emplace(std::make_pair(
it.first,
std::move(std::make_tuple(
it.second.state,
it.second.count,
epoch<milliseconds>(it.second.expire).count(),
epoch<milliseconds>(it.second.lastMessage).count()))));
return init;
});
}
/** Slots is a container for validator's Slot and handles Slot update
* when a message is received from a validator. It also handles Slot aging
* and checks for peers which are disconnected or stopped relaying the messages.
*/
template <typename clock_type>
class Slots final
{
using time_point = typename clock_type::time_point;
using id_t = typename Peer::id_t;
using messages = beast::aged_unordered_map<
uint256,
std::unordered_set<Peer::id_t>,
clock_type,
hardened_hash<strong_hash>>;
public:
/**
* @param app Applicaton reference
* @param handler Squelch/unsquelch implementation
*/
Slots(Application& app, SquelchHandler const& handler)
: handler_(handler), app_(app), journal_(app.journal("Slots"))
{
}
~Slots() = default;
/** Calls Slot::update of Slot associated with the validator.
* @param key Message's hash
* @param validator Validator's public key
* @param id Peer's id which received the message
* @param id Peer's pointer which received the message
* @param type Received protocol message type
*/
void
updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type);
/** Check if peers stopped relaying messages
* and if slots stopped receiving messages from the validator.
*/
void
deleteIdlePeers();
/** Return number of peers in state */
std::optional<std::uint16_t>
inState(PublicKey const& validator, PeerState state) const
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.inState(state);
return {};
}
/** Return number of peers not in state */
std::optional<std::uint16_t>
notInState(PublicKey const& validator, PeerState state) const
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.notInState(state);
return {};
}
/** Return true if Slot is in state */
bool
inState(PublicKey const& validator, SlotState state) const
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.state_ == state;
return false;
}
/** Get selected peers */
std::set<id_t>
getSelected(PublicKey const& validator)
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.getSelected();
return {};
}
/** Get peers info. Return map of peer's state, count, and squelch
* expiration milliseconds.
*/
std::unordered_map<
typename Peer::id_t,
std::tuple<PeerState, uint16_t, uint32_t, std::uint32_t>>
getPeers(PublicKey const& validator)
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.getPeers();
return {};
}
/** Get Slot's state */
std::optional<SlotState>
getState(PublicKey const& validator)
{
auto const& it = slots_.find(validator);
if (it != slots_.end())
return it->second.getState();
return {};
}
/** Called when a peer is deleted. If the peer was selected to be the
* source of messages from the validator then squelched peers have to be
* unsquelched.
* @param id Peer's id
* @param erase If true then erase the peer
*/
void
deletePeer(id_t id, bool erase);
private:
/** Add message/peer if have not seen this message
* from the peer. A message is aged after IDLED seconds.
* Return true if added */
bool
addPeerMessage(uint256 const& key, id_t id);
hash_map<PublicKey, Slot<clock_type>> slots_;
SquelchHandler const& handler_; // squelch/unsquelch handler
Application& app_;
beast::Journal const journal_;
// Maintain aged container of message/peers. This is required
// to discard duplicate message from the same peer. A message
// is aged after IDLED seconds. A message received IDLED seconds
// after it was relayed is ignored by PeerImp.
inline static messages peersWithMessage_{
beast::get_abstract_clock<clock_type>()};
};
template <typename clock_type>
bool
Slots<clock_type>::addPeerMessage(uint256 const& key, id_t id)
{
beast::expire(peersWithMessage_, squelch::IDLED);
if (key.isNonZero())
{
auto it = peersWithMessage_.find(key);
if (it == peersWithMessage_.end())
{
JLOG(journal_.trace())
<< "addPeerMessage: new " << to_string(key) << " " << id;
peersWithMessage_.emplace(key, std::unordered_set<id_t>{id});
return true;
}
if (it->second.find(id) != it->second.end())
{
JLOG(journal_.trace()) << "addPeerMessage: duplicate message "
<< to_string(key) << " " << id;
return false;
}
JLOG(journal_.trace())
<< "addPeerMessage: added " << to_string(key) << " " << id;
it->second.insert(id);
}
return true;
}
template <typename clock_type>
void
Slots<clock_type>::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
id_t id,
protocol::MessageType type)
{
if (!addPeerMessage(key, id))
return;
auto it = slots_.find(validator);
if (it == slots_.end())
{
JLOG(journal_.debug())
<< "updateSlotAndSquelch: new slot " << Slice(validator);
auto it = slots_
.emplace(std::make_pair(
validator,
Slot<clock_type>(handler_, app_.journal("Slot"))))
.first;
it->second.update(validator, id, type);
}
else
it->second.update(validator, id, type);
}
template <typename clock_type>
void
Slots<clock_type>::deletePeer(id_t id, bool erase)
{
for (auto& [validator, slot] : slots_)
slot.deletePeer(validator, id, erase);
}
template <typename clock_type>
void
Slots<clock_type>::deleteIdlePeers()
{
auto now = clock_type::now();
for (auto it = slots_.begin(); it != slots_.end();)
{
it->second.deleteIdlePeer(it->first);
if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE)
{
JLOG(journal_.debug())
<< "deleteIdlePeers: deleting idle slot " << Slice(it->first);
it = slots_.erase(it);
}
else
++it;
}
}
} // namespace squelch
} // namespace ripple
#endif // RIPPLE_OVERLAY_SLOT_H_INCLUDED

View File

@@ -0,0 +1,122 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_SQUELCH_H_INCLUDED
#define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED
#include <ripple/basics/random.h>
#include <ripple/overlay/SquelchCommon.h>
#include <ripple/protocol/PublicKey.h>
#include <chrono>
#include <functional>
namespace ripple {
namespace squelch {
/** Maintains squelching of relaying messages from validators */
template <typename clock_type>
class Squelch
{
using time_point = typename clock_type::time_point;
public:
Squelch() = default;
virtual ~Squelch() = default;
/** Squelch/Unsquelch relaying for the validator
* @param validator The validator's public key
* @param squelch Squelch/unsquelch flag
* @param squelchDuration Squelch duration time if squelch is true
*/
void
squelch(PublicKey const& validator, bool squelch, uint64_t squelchDuration);
/** Are the messages to this validator squelched
* @param validator Validator's public key
* @return true if squelched
*/
bool
isSquelched(PublicKey const& validator);
/** Get random squelch duration between MIN_UNSQUELCH_EXPIRE and
* MAX_UNSQUELCH_EXPIRE */
static seconds
getSquelchDuration();
private:
/** Maintains the list of squelched relaying to downstream peers.
* Expiration time is included in the TMSquelch message. */
hash_map<PublicKey, time_point> squelched_;
};
template <typename clock_type>
void
Squelch<clock_type>::squelch(
PublicKey const& validator,
bool squelch,
uint64_t squelchDuration)
{
if (squelch)
{
squelched_[validator] = [squelchDuration]() {
seconds duration = seconds(squelchDuration);
return clock_type::now() +
((duration >= MIN_UNSQUELCH_EXPIRE &&
duration <= MAX_UNSQUELCH_EXPIRE)
? duration
: getSquelchDuration());
}();
}
else
squelched_.erase(validator);
}
template <typename clock_type>
bool
Squelch<clock_type>::isSquelched(PublicKey const& validator)
{
auto now = clock_type::now();
auto const& it = squelched_.find(validator);
if (it == squelched_.end())
return false;
else if (it->second > now)
return true;
squelched_.erase(it);
return false;
}
template <typename clock_type>
seconds
Squelch<clock_type>::getSquelchDuration()
{
auto d = seconds(ripple::rand_int(
MIN_UNSQUELCH_EXPIRE.count(), MAX_UNSQUELCH_EXPIRE.count()));
return d;
}
} // namespace squelch
} // namespace ripple
#endif // RIPPLED_SQUELCH_H

View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_SQUELCHCOMMON_H_INCLUDED
#define RIPPLE_OVERLAY_SQUELCHCOMMON_H_INCLUDED
#include <chrono>
namespace ripple {
namespace squelch {
using namespace std::chrono;
// Peer's squelch is limited in time to
// rand{MIN_UNSQUELCH_EXPIRE, MAX_UNSQUELCH_EXPIRE}
static constexpr seconds MIN_UNSQUELCH_EXPIRE = seconds{300};
static constexpr seconds MAX_UNSQUELCH_EXPIRE = seconds{600};
// No message received threshold before identifying a peer as idled
static constexpr seconds IDLED = seconds{8};
// Message count threshold to start selecting peers as the source
// of messages from the validator. We add peers who reach
// MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS
// reach MAX_MESSAGE_THRESHOLD.
static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 9;
static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 10;
// Max selected peers to choose as the source of messages from validator
static constexpr uint16_t MAX_SELECTED_PEERS = 3;
// Wait before reduce-relay feature is enabled on boot up to let
// the server establish peer connections
static constexpr minutes WAIT_ON_BOOTUP = minutes{10};
} // namespace squelch
} // namespace ripple
#endif // RIPPLED_SQUELCHCOMMON_H

View File

@@ -23,8 +23,12 @@
namespace ripple {
Message::Message(::google::protobuf::Message const& message, int type)
Message::Message(
::google::protobuf::Message const& message,
int type,
boost::optional<PublicKey> const& validator)
: category_(TrafficCount::categorize(message, type, false))
, validatorKey_(validator)
{
using namespace ripple::compression;

View File

@@ -103,6 +103,9 @@ OverlayImpl::Timer::on_timer(error_code ec)
if ((++overlay_.timer_count_ % Tuning::checkSeconds) == 0)
overlay_.check();
if ((overlay_.timer_count_ % Tuning::checkIdlePeers) == 0)
overlay_.deleteIdlePeers();
timer_.expires_from_now(std::chrono::seconds(1));
timer_.async_wait(overlay_.strand_.wrap(std::bind(
&Timer::on_timer, shared_from_this(), std::placeholders::_1)));
@@ -139,6 +142,7 @@ OverlayImpl::OverlayImpl(
, m_resolver(resolver)
, next_id_(1)
, timer_count_(0)
, slots_(app, *this)
, m_stats(
std::bind(&OverlayImpl::collect_metrics, this),
collector,
@@ -1216,7 +1220,7 @@ OverlayImpl::check()
}
std::shared_ptr<Peer>
OverlayImpl::findPeerByShortID(Peer::id_t const& id)
OverlayImpl::findPeerByShortID(Peer::id_t const& id) const
{
std::lock_guard lock(mutex_);
auto const iter = ids_.find(id);
@@ -1249,18 +1253,23 @@ OverlayImpl::broadcast(protocol::TMProposeSet& m)
for_each([&](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
}
void
OverlayImpl::relay(protocol::TMProposeSet& m, uint256 const& uid)
std::set<Peer::id_t>
OverlayImpl::relay(
protocol::TMProposeSet& m,
uint256 const& uid,
PublicKey const& validator)
{
if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
{
auto const sm =
std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER);
std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER, validator);
for_each([&](std::shared_ptr<PeerImp>&& p) {
if (toSkip->find(p->id()) == toSkip->end())
p->send(sm);
});
return *toSkip;
}
return {};
}
void
@@ -1270,17 +1279,23 @@ OverlayImpl::broadcast(protocol::TMValidation& m)
for_each([sm](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
}
void
OverlayImpl::relay(protocol::TMValidation& m, uint256 const& uid)
std::set<Peer::id_t>
OverlayImpl::relay(
protocol::TMValidation& m,
uint256 const& uid,
PublicKey const& validator)
{
if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
{
auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION);
auto const sm =
std::make_shared<Message>(m, protocol::mtVALIDATION, validator);
for_each([&](std::shared_ptr<PeerImp>&& p) {
if (toSkip->find(p->id()) == toSkip->end())
p->send(sm);
});
return *toSkip;
}
return {};
}
//------------------------------------------------------------------------------
@@ -1352,6 +1367,100 @@ OverlayImpl::sendEndpoints()
}
}
std::shared_ptr<Message>
makeSquelchMessage(
PublicKey const& validator,
bool squelch,
uint64_t squelchDuration)
{
protocol::TMSquelch m;
m.set_squelch(squelch);
m.set_validatorpubkey(validator.data(), validator.size());
if (squelch)
m.set_squelchduration(squelchDuration);
return std::make_shared<Message>(m, protocol::mtSQUELCH);
}
void
OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const
{
if (auto peer = findPeerByShortID(id);
peer && app_.config().REDUCE_RELAY_SQUELCH)
{
// optimize - multiple message with different
// validator might be sent to the same peer
auto m = makeSquelchMessage(validator, false, 0);
peer->send(m);
}
}
void
OverlayImpl::squelch(
PublicKey const& validator,
Peer::id_t id,
uint32_t squelchDuration) const
{
if (auto peer = findPeerByShortID(id);
peer && app_.config().REDUCE_RELAY_SQUELCH)
{
auto m = makeSquelchMessage(validator, true, squelchDuration);
peer->send(m);
}
}
void
OverlayImpl::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
std::set<Peer::id_t>&& peers,
protocol::MessageType type)
{
if (!strand_.running_in_this_thread())
return post(
strand_,
[this, key, validator, peers = std::move(peers), type]() mutable {
updateSlotAndSquelch(key, validator, std::move(peers), type);
});
for (auto id : peers)
slots_.updateSlotAndSquelch(key, validator, id, type);
}
void
OverlayImpl::updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
Peer::id_t peer,
protocol::MessageType type)
{
if (!strand_.running_in_this_thread())
return post(strand_, [this, key, validator, peer, type]() {
updateSlotAndSquelch(key, validator, peer, type);
});
slots_.updateSlotAndSquelch(key, validator, peer, type);
}
void
OverlayImpl::deletePeer(Peer::id_t id)
{
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&OverlayImpl::deletePeer, this, id));
slots_.deletePeer(id, true);
}
void
OverlayImpl::deleteIdlePeers()
{
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&OverlayImpl::deleteIdlePeers, this));
slots_.deleteIdlePeers();
}
//------------------------------------------------------------------------------
Overlay::Setup
setup_Overlay(BasicConfig const& config)
{

View File

@@ -26,6 +26,7 @@
#include <ripple/basics/chrono.h>
#include <ripple/core/Job.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/Slot.h>
#include <ripple/overlay/impl/Handshake.h>
#include <ripple/overlay/impl/TrafficCount.h>
#include <ripple/peerfinder/PeerfinderManager.h>
@@ -52,7 +53,7 @@ namespace ripple {
class PeerImp;
class BasicConfig;
class OverlayImpl : public Overlay
class OverlayImpl : public Overlay, public squelch::SquelchHandler
{
public:
class Child
@@ -124,6 +125,8 @@ private:
boost::optional<std::uint32_t> networkID_;
squelch::Slots<UptimeClock> slots_;
//--------------------------------------------------------------------------
public:
@@ -195,7 +198,7 @@ public:
void checkSanity(std::uint32_t) override;
std::shared_ptr<Peer>
findPeerByShortID(Peer::id_t const& id) override;
findPeerByShortID(Peer::id_t const& id) const override;
std::shared_ptr<Peer>
findPeerByPublicKey(PublicKey const& pubKey) override;
@@ -206,11 +209,17 @@ public:
void
broadcast(protocol::TMValidation& m) override;
void
relay(protocol::TMProposeSet& m, uint256 const& uid) override;
std::set<Peer::id_t>
relay(
protocol::TMProposeSet& m,
uint256 const& uid,
PublicKey const& validator) override;
void
relay(protocol::TMValidation& m, uint256 const& uid) override;
std::set<Peer::id_t>
relay(
protocol::TMValidation& m,
uint256 const& uid,
PublicKey const& validator) override;
//--------------------------------------------------------------------------
//
@@ -364,7 +373,49 @@ public:
void
lastLink(std::uint32_t id);
/** Updates message count for validator/peer. Sends TMSquelch if the number
* of messages for N peers reaches threshold T. A message is counted
* if a peer receives the message for the first time and if
* the message has been relayed.
* @param key Unique message's key
* @param validator Validator's public key
* @param peers Peers' id to update the slots for
* @param type Received protocol message type
*/
void
updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
std::set<Peer::id_t>&& peers,
protocol::MessageType type);
/** Overload to reduce allocation in case of single peer
*/
void
updateSlotAndSquelch(
uint256 const& key,
PublicKey const& validator,
Peer::id_t peer,
protocol::MessageType type);
/** Called when the peer is deleted. If the peer was selected to be the
* source of messages from the validator then squelched peers have to be
* unsquelched.
* @param id Peer's id
*/
void
deletePeer(Peer::id_t id);
private:
void
squelch(
PublicKey const& validator,
Peer::id_t const id,
std::uint32_t squelchDuration) const override;
void
unsquelch(PublicKey const& validator, Peer::id_t id) const override;
std::shared_ptr<Writer>
makeRedirectResponse(
std::shared_ptr<PeerFinder::Slot> const& slot,
@@ -481,6 +532,11 @@ private:
void
sendEndpoints();
/** Check if peers stopped relaying messages
* and if slots stopped receiving messages from the validator */
void
deleteIdlePeers();
private:
struct TrafficGauges
{

View File

@@ -100,6 +100,7 @@ PeerImp::~PeerImp()
{
const bool inCluster{cluster()};
overlay_.deletePeer(id_);
if (state_ == State::active)
overlay_.onPeerDeactivate(id_);
overlay_.peerFinder().on_closed(slot_);
@@ -226,6 +227,10 @@ PeerImp::send(std::shared_ptr<Message> const& m)
if (detaching_)
return;
auto validator = m->getValidatorKey();
if (validator && squelch_.isSquelched(*validator))
return;
overlay_.reportTraffic(
safe_cast<TrafficCount::category>(m->getCategory()),
false,
@@ -1647,8 +1652,20 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
publicKey.slice(),
sig);
if (!app_.getHashRouter().addSuppressionPeer(suppression, id_))
if (auto [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
// receives within IDLED seconds since the message has been relayed.
// Wait WAIT_ON_BOOTUP time to let the server establish connections to
// peers.
if (app_.config().REDUCE_RELAY_ENABLE && relayed &&
(stopwatch().now() - *relayed) < squelch::IDLED &&
squelch::epoch<std::chrono::minutes>(UptimeClock::now()) >
squelch::WAIT_ON_BOOTUP)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
JLOG(p_journal_.trace()) << "Proposal: duplicate";
return;
}
@@ -2146,9 +2163,21 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
return;
}
if (!app_.getHashRouter().addSuppressionPeer(
sha512Half(makeSlice(m->validation())), id_))
auto key = sha512Half(makeSlice(m->validation()));
if (auto [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed. Wait WAIT_ON_BOOTUP time to let the server establish
// connections to peers.
if (app_.config().REDUCE_RELAY_ENABLE && (bool)relayed &&
(stopwatch().now() - *relayed) < squelch::IDLED &&
squelch::epoch<std::chrono::minutes>(UptimeClock::now()) >
squelch::WAIT_ON_BOOTUP)
overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
JLOG(p_journal_.trace()) << "Validation: duplicate";
return;
}
@@ -2324,6 +2353,45 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
}
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
{
if (!m->has_validatorpubkey())
{
charge(Resource::feeBadData);
return;
}
auto validator = m->validatorpubkey();
auto const slice{makeSlice(validator)};
if (!publicKeyType(slice))
{
charge(Resource::feeBadData);
return;
}
PublicKey key(slice);
auto squelch = m->squelch();
auto duration = m->has_squelchduration() ? m->squelchduration() : 0;
auto sp = shared_from_this();
// Ignore the squelch for validator's own messages.
if (key == app_.getValidationPublicKey())
{
JLOG(p_journal_.debug())
<< "onMessage: TMSquelch discarding validator's squelch " << slice;
return;
}
if (!strand_.running_in_this_thread())
return post(strand_, [sp, key, squelch, duration]() {
sp->squelch_.squelch(key, squelch, duration);
});
JLOG(p_journal_.debug())
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
squelch_.squelch(key, squelch, duration);
}
//--------------------------------------------------------------------------
void
@@ -2478,7 +2546,22 @@ PeerImp::checkPropose(
relay = app_.config().RELAY_UNTRUSTED_PROPOSALS || cluster();
if (relay)
app_.overlay().relay(*packet, peerPos.suppressionID());
{
// haveMessage contains peers, which are suppressed; i.e. the peers
// are the source of the message, consequently the message should
// not be relayed to these peers. But the message must be counted
// as part of the squelch logic.
auto haveMessage = app_.overlay().relay(
*packet, peerPos.suppressionID(), peerPos.publicKey());
if (app_.config().REDUCE_RELAY_ENABLE && !haveMessage.empty() &&
squelch::epoch<std::chrono::minutes>(UptimeClock::now()) >
squelch::WAIT_ON_BOOTUP)
overlay_.updateSlotAndSquelch(
peerPos.suppressionID(),
peerPos.publicKey(),
std::move(haveMessage),
protocol::mtPROPOSE_LEDGER);
}
}
void
@@ -2501,7 +2584,22 @@ PeerImp::checkValidation(
{
auto const suppression =
sha512Half(makeSlice(val->getSerialized()));
overlay_.relay(*packet, suppression);
// haveMessage contains peers, which are suppressed; i.e. the peers
// are the source of the message, consequently the message should
// not be relayed to these peers. But the message must be counted
// as part of the squelch logic.
auto haveMessage =
overlay_.relay(*packet, suppression, val->getSignerPublic());
if (app_.config().REDUCE_RELAY_ENABLE && !haveMessage.empty() &&
squelch::epoch<std::chrono::minutes>(UptimeClock::now()) >
squelch::WAIT_ON_BOOTUP)
{
overlay_.updateSlotAndSquelch(
suppression,
val->getSignerPublic(),
std::move(haveMessage),
protocol::mtVALIDATION);
}
}
}
catch (std::exception const&)

View File

@@ -24,6 +24,7 @@
#include <ripple/basics/Log.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/beast/utility/WrappedSink.h>
#include <ripple/overlay/Squelch.h>
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/overlay/impl/ProtocolMessage.h>
#include <ripple/overlay/impl/ProtocolVersion.h>
@@ -142,6 +143,8 @@ private:
clock_type::time_point lastPingTime_;
clock_type::time_point const creationTime_;
squelch::Squelch<UptimeClock> squelch_;
// Notes on thread locking:
//
// During an audit it was noted that some member variables that looked
@@ -549,6 +552,8 @@ public:
onMessage(std::shared_ptr<protocol::TMValidation> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
void
onMessage(std::shared_ptr<protocol::TMSquelch> const& m);
private:
State

View File

@@ -77,6 +77,8 @@ protocolMessageName(int type)
return "validation";
case protocol::mtGET_OBJECTS:
return "get_objects";
case protocol::mtSQUELCH:
return "squelch";
default:
break;
}
@@ -375,6 +377,10 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler)
success = detail::invoke<protocol::TMGetObjectByHash>(
*header, buffers, handler);
break;
case protocol::mtSQUELCH:
success =
detail::invoke<protocol::TMSquelch>(*header, buffers, handler);
break;
default:
handler.onMessageUnknown(header->message_type);
success = true;

View File

@@ -71,6 +71,9 @@ enum {
/** How often to log send queue size */
sendQueueLogFreq = 64,
/** How often we check for idle peers (seconds) */
checkIdlePeers = 4,
};
/** The threshold above which we treat a peer connection as high latency */

View File

@@ -23,6 +23,7 @@ enum MessageType
mtGET_PEER_SHARD_INFO = 52;
mtPEER_SHARD_INFO = 53;
mtVALIDATORLIST = 54;
mtSQUELCH = 55;
}
// token, iterations, target, challenge = issue demand for proof of work
@@ -356,3 +357,10 @@ message TMPing
optional uint64 netTime = 4;
}
message TMSquelch
{
required bool squelch = 1; // squelch if true, otherwise unsquelch
required bytes validatorPubKey = 2; // validator's public key
optional uint32 squelchDuration = 3; // squelch duration in milliseconds
}

View File

@@ -191,7 +191,7 @@ class HashRouter_test : public beast::unit_test::suite
uint256 const key1(1);
boost::optional<std::set<HashRouter::PeerShortID>> peers;
std::optional<std::set<HashRouter::PeerShortID>> peers;
peers = router.shouldRelay(key1);
BEAST_EXPECT(peers && peers->empty());

File diff suppressed because it is too large Load Diff