Add experimental validation & proposal relay reduction support:

- Add validation/proposal reduce-relay feature negotiation to
  the handshake
- Make squelch duration proportional to a number of peers that
  can be squelched
- Refactor makeRequest()/makeResponse() to facilitate handshake
  unit-testing
- Fix compression enable flag for inbound peer
- Fix compression algorithm parsing in the header parser
- Fix squelch duration in onMessage(TMSquelch)

This commit fixes 3624, fixes 3639 and fixes 3641
This commit is contained in:
Gregory Tsipenyuk
2020-10-21 12:41:04 -04:00
committed by Nik Bougalis
parent 44fe0e1fc4
commit 74d96ff4bd
22 changed files with 1076 additions and 260 deletions

View File

@@ -859,6 +859,7 @@ target_sources (rippled PRIVATE
src/test/overlay/short_read_test.cpp src/test/overlay/short_read_test.cpp
src/test/overlay/compression_test.cpp src/test/overlay/compression_test.cpp
src/test/overlay/reduce_relay_test.cpp src/test/overlay/reduce_relay_test.cpp
src/test/overlay/handshake_test.cpp
#[===============================[ #[===============================[
test sources: test sources:
subdir: peerfinder subdir: peerfinder

View File

@@ -168,6 +168,7 @@ else
'ripple.tx.OversizeMeta' 'ripple.tx.OversizeMeta'
'ripple.consensus.DistributedValidators' 'ripple.consensus.DistributedValidators'
'ripple.app.NoRippleCheckLimits' 'ripple.app.NoRippleCheckLimits'
'ripple.ripple_data.compression'
'ripple.NodeStore.Timing' 'ripple.NodeStore.Timing'
'ripple.consensus.ByzantineFailureSim' 'ripple.consensus.ByzantineFailureSim'
'beast.chrono.abstract_clock' 'beast.chrono.abstract_clock'

View File

@@ -37,6 +37,19 @@ namespace rfc2616 {
namespace detail { namespace detail {
struct ci_equal_pred
{
explicit ci_equal_pred() = default;
bool
operator()(char c1, char c2)
{
// VFALCO TODO Use a table lookup here
return std::tolower(static_cast<unsigned char>(c1)) ==
std::tolower(static_cast<unsigned char>(c2));
}
};
/** Returns `true` if `c` is linear white space. /** Returns `true` if `c` is linear white space.
This excludes the CRLF sequence allowed for line continuations. This excludes the CRLF sequence allowed for line continuations.
@@ -195,6 +208,179 @@ split_commas(boost::beast::string_view const& s)
return split_commas(s.begin(), s.end()); return split_commas(s.begin(), s.end());
} }
//------------------------------------------------------------------------------
/** Iterates through a comma separated list.
Meets the requirements of ForwardIterator.
List defined in rfc2616 2.1.
@note Values returned may contain backslash escapes.
*/
class list_iterator
{
using iter_type = boost::string_ref::const_iterator;
iter_type it_;
iter_type end_;
boost::string_ref value_;
public:
using value_type = boost::string_ref;
using pointer = value_type const*;
using reference = value_type const&;
using difference_type = std::ptrdiff_t;
using iterator_category = std::forward_iterator_tag;
list_iterator(iter_type begin, iter_type end) : it_(begin), end_(end)
{
if (it_ != end_)
increment();
}
bool
operator==(list_iterator const& other) const
{
return other.it_ == it_ && other.end_ == end_ &&
other.value_.size() == value_.size();
}
bool
operator!=(list_iterator const& other) const
{
return !(*this == other);
}
reference
operator*() const
{
return value_;
}
pointer
operator->() const
{
return &*(*this);
}
list_iterator&
operator++()
{
increment();
return *this;
}
list_iterator
operator++(int)
{
auto temp = *this;
++(*this);
return temp;
}
private:
template <class = void>
void
increment();
};
template <class>
void
list_iterator::increment()
{
using namespace detail;
value_.clear();
while (it_ != end_)
{
if (*it_ == '"')
{
// quoted-string
++it_;
if (it_ == end_)
return;
if (*it_ != '"')
{
auto start = it_;
for (;;)
{
++it_;
if (it_ == end_)
{
value_ = boost::string_ref(
&*start, std::distance(start, it_));
return;
}
if (*it_ == '"')
{
value_ = boost::string_ref(
&*start, std::distance(start, it_));
++it_;
return;
}
}
}
++it_;
}
else if (*it_ == ',')
{
it_++;
continue;
}
else if (is_lws(*it_))
{
++it_;
continue;
}
else
{
auto start = it_;
for (;;)
{
++it_;
if (it_ == end_ || *it_ == ',' || is_lws(*it_))
{
value_ =
boost::string_ref(&*start, std::distance(start, it_));
return;
}
}
}
}
}
/** Returns true if two strings are equal.
A case-insensitive comparison is used.
*/
inline bool
ci_equal(boost::string_ref s1, boost::string_ref s2)
{
return boost::range::equal(s1, s2, detail::ci_equal_pred{});
}
/** Returns a range representing the list. */
inline boost::iterator_range<list_iterator>
make_list(boost::string_ref const& field)
{
return boost::iterator_range<list_iterator>{
list_iterator{field.begin(), field.end()},
list_iterator{field.end(), field.end()}};
}
/** Returns true if the specified token exists in the list.
A case-insensitive comparison is used.
*/
template <class = void>
bool
token_in_list(boost::string_ref const& value, boost::string_ref const& token)
{
for (auto const& item : make_list(value))
if (ci_equal(item, token))
return true;
return false;
}
template <bool isRequest, class Body, class Fields> template <bool isRequest, class Body, class Fields>
bool bool
is_keep_alive(boost::beast::http::message<isRequest, Body, Fields> const& m) is_keep_alive(boost::beast::http::message<isRequest, Body, Fields> const& m)

View File

@@ -191,10 +191,18 @@ public:
std::size_t WORKERS = 0; std::size_t WORKERS = 0;
// Reduce-relay - these parameters are experimental. // Reduce-relay - these parameters are experimental.
// Enable reduce-relay functionality // Enable reduce-relay features
bool REDUCE_RELAY_ENABLE = false; // Validation/proposal reduce-relay feature
// Send squelch message to peers bool VP_REDUCE_RELAY_ENABLE = false;
bool REDUCE_RELAY_SQUELCH = false; // Send squelch message to peers. Generally this config should
// have the same value as VP_REDUCE_RELAY_ENABLE. It can be
// used for testing the feature's function without
// affecting the message relaying. To use it for testing,
// set it to false and set VP_REDUCE_RELAY_ENABLE to true.
// Squelch messages will not be sent to the peers in this case.
// Set log level to debug so that the feature function can be
// analyzed.
bool VP_REDUCE_RELAY_SQUELCH = false;
// These override the command line client settings // These override the command line client settings
boost::optional<beast::IP::Endpoint> rpc_ip; boost::optional<beast::IP::Endpoint> rpc_ip;

View File

@@ -523,8 +523,8 @@ Config::loadFromString(std::string const& fileContents)
if (exists(SECTION_REDUCE_RELAY)) if (exists(SECTION_REDUCE_RELAY))
{ {
auto sec = section(SECTION_REDUCE_RELAY); auto sec = section(SECTION_REDUCE_RELAY);
REDUCE_RELAY_ENABLE = sec.value_or("enable", false); VP_REDUCE_RELAY_ENABLE = sec.value_or("vp_enable", false);
REDUCE_RELAY_SQUELCH = sec.value_or("squelch", false); VP_REDUCE_RELAY_SQUELCH = sec.value_or("vp_squelch", false);
} }
if (getSingleSection(secConfig, SECTION_MAX_TRANSACTIONS, strTemp, j_)) if (getSingleSection(secConfig, SECTION_MAX_TRANSACTIONS, strTemp, j_))

View File

@@ -75,7 +75,7 @@ public:
beast::IP::Address public_ip; beast::IP::Address public_ip;
int ipLimit = 0; int ipLimit = 0;
std::uint32_t crawlOptions = 0; std::uint32_t crawlOptions = 0;
boost::optional<std::uint32_t> networkID; std::optional<std::uint32_t> networkID;
bool vlEnabled = true; bool vlEnabled = true;
}; };

View File

@@ -17,22 +17,26 @@
*/ */
//============================================================================== //==============================================================================
#ifndef RIPPLE_OVERLAY_SQUELCHCOMMON_H_INCLUDED #ifndef RIPPLE_OVERLAY_REDUCERELAYCOMMON_H_INCLUDED
#define RIPPLE_OVERLAY_SQUELCHCOMMON_H_INCLUDED #define RIPPLE_OVERLAY_REDUCERELAYCOMMON_H_INCLUDED
#include <chrono> #include <chrono>
namespace ripple { namespace ripple {
namespace squelch { namespace reduce_relay {
using namespace std::chrono;
// Peer's squelch is limited in time to // Peer's squelch is limited in time to
// rand{MIN_UNSQUELCH_EXPIRE, MAX_UNSQUELCH_EXPIRE} // rand{MIN_UNSQUELCH_EXPIRE, max_squelch},
static constexpr seconds MIN_UNSQUELCH_EXPIRE = seconds{300}; // where max_squelch is
static constexpr seconds MAX_UNSQUELCH_EXPIRE = seconds{600}; // min(max(MAX_UNSQUELCH_EXPIRE_DEFAULT, SQUELCH_PER_PEER * number_of_peers),
// MAX_UNSQUELCH_EXPIRE_PEERS)
static constexpr auto MIN_UNSQUELCH_EXPIRE = std::chrono::seconds{300};
static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT = std::chrono::seconds{600};
static constexpr auto SQUELCH_PER_PEER = std::chrono::seconds(10);
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS = std::chrono::seconds{3600};
// No message received threshold before identifying a peer as idled // No message received threshold before identifying a peer as idled
static constexpr seconds IDLED = seconds{8}; static constexpr auto IDLED = std::chrono::seconds{8};
// Message count threshold to start selecting peers as the source // Message count threshold to start selecting peers as the source
// of messages from the validator. We add peers who reach // of messages from the validator. We add peers who reach
// MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS // MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS
@@ -40,13 +44,13 @@ static constexpr seconds IDLED = seconds{8};
static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 9; static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 9;
static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 10; static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 10;
// Max selected peers to choose as the source of messages from validator // Max selected peers to choose as the source of messages from validator
static constexpr uint16_t MAX_SELECTED_PEERS = 3; static constexpr uint16_t MAX_SELECTED_PEERS = 5;
// Wait before reduce-relay feature is enabled on boot up to let // Wait before reduce-relay feature is enabled on boot up to let
// the server establish peer connections // the server establish peer connections
static constexpr minutes WAIT_ON_BOOTUP = minutes{10}; static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10};
} // namespace squelch } // namespace reduce_relay
} // namespace ripple } // namespace ripple
#endif // RIPPLED_SQUELCHCOMMON_H #endif // RIPPLED_REDUCERELAYCOMMON_H_INCLUDED

View File

@@ -25,8 +25,8 @@
#include <ripple/beast/container/aged_unordered_map.h> #include <ripple/beast/container/aged_unordered_map.h>
#include <ripple/beast/utility/Journal.h> #include <ripple/beast/utility/Journal.h>
#include <ripple/overlay/Peer.h> #include <ripple/overlay/Peer.h>
#include <ripple/overlay/ReduceRelayCommon.h>
#include <ripple/overlay/Squelch.h> #include <ripple/overlay/Squelch.h>
#include <ripple/overlay/SquelchCommon.h>
#include <ripple/protocol/PublicKey.h> #include <ripple/protocol/PublicKey.h>
#include <ripple.pb.h> #include <ripple.pb.h>
@@ -40,7 +40,7 @@
namespace ripple { namespace ripple {
namespace squelch { namespace reduce_relay {
template <typename clock_type> template <typename clock_type>
class Slots; class Slots;
@@ -61,7 +61,7 @@ template <typename Unit, typename TP>
Unit Unit
epoch(TP const& t) epoch(TP const& t)
{ {
return duration_cast<Unit>(t.time_since_epoch()); return std::chrono::duration_cast<Unit>(t.time_since_epoch());
} }
/** Abstract class. Declares squelch and unsquelch handlers. /** Abstract class. Declares squelch and unsquelch handlers.
@@ -124,10 +124,10 @@ private:
/** Update peer info. If the message is from a new /** Update peer info. If the message is from a new
* peer or from a previously expired squelched peer then switch * peer or from a previously expired squelched peer then switch
* the peer's and slot's state to Counting. If time of last * the peer's and slot's state to Counting. If time of last
* selection round is > 2 * MAX_UNSQUELCH_EXPIRE then switch the slot's * selection round is > 2 * MAX_UNSQUELCH_EXPIRE_DEFAULT then switch the
* state to Counting. If the number of messages for the peer * slot's state to Counting. If the number of messages for the peer is >
* is > MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. * MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the
* If the number of considered peers who reached MAX_MESSAGE_THRESHOLD is * number of considered peers who reached MAX_MESSAGE_THRESHOLD is
* MAX_SELECTED_PEERS then randomly select MAX_SELECTED_PEERS from * MAX_SELECTED_PEERS then randomly select MAX_SELECTED_PEERS from
* considered peers, and call squelch handler for each peer, which is not * considered peers, and call squelch handler for each peer, which is not
* selected and not already in Squelched state. Set the state for those * selected and not already in Squelched state. Set the state for those
@@ -197,6 +197,14 @@ private:
void void
deleteIdlePeer(PublicKey const& validator); deleteIdlePeer(PublicKey const& validator);
/** Get random squelch duration between MIN_UNSQUELCH_EXPIRE and
* min(max(MAX_UNSQUELCH_EXPIRE_DEFAULT, SQUELCH_PER_PEER * npeers),
* MAX_UNSQUELCH_EXPIRE_PEERS)
* @param npeers number of peers that can be squelched in the Slot
*/
std::chrono::seconds
getSquelchDuration(std::size_t npeers);
private: private:
/** Reset counts of peers in Selected or Counting state */ /** Reset counts of peers in Selected or Counting state */
void void
@@ -231,6 +239,7 @@ template <typename clock_type>
void void
Slot<clock_type>::deleteIdlePeer(PublicKey const& validator) Slot<clock_type>::deleteIdlePeer(PublicKey const& validator)
{ {
using namespace std::chrono;
auto now = clock_type::now(); auto now = clock_type::now();
for (auto it = peers_.begin(); it != peers_.end();) for (auto it = peers_.begin(); it != peers_.end();)
{ {
@@ -239,7 +248,7 @@ Slot<clock_type>::deleteIdlePeer(PublicKey const& validator)
++it; ++it;
if (now - peer.lastMessage > IDLED) if (now - peer.lastMessage > IDLED)
{ {
JLOG(journal_.debug()) JLOG(journal_.trace())
<< "deleteIdlePeer: " << Slice(validator) << " " << id << "deleteIdlePeer: " << Slice(validator) << " " << id
<< " idled " << " idled "
<< duration_cast<seconds>(now - peer.lastMessage).count() << duration_cast<seconds>(now - peer.lastMessage).count()
@@ -256,12 +265,13 @@ Slot<clock_type>::update(
id_t id, id_t id,
protocol::MessageType type) protocol::MessageType type)
{ {
using namespace std::chrono;
auto now = clock_type::now(); auto now = clock_type::now();
auto it = peers_.find(id); auto it = peers_.find(id);
// First message from this peer // First message from this peer
if (it == peers_.end()) if (it == peers_.end())
{ {
JLOG(journal_.debug()) JLOG(journal_.trace())
<< "update: adding peer " << Slice(validator) << " " << id; << "update: adding peer " << Slice(validator) << " " << id;
peers_.emplace( peers_.emplace(
std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now})); std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now}));
@@ -271,7 +281,7 @@ Slot<clock_type>::update(
// Message from a peer with expired squelch // Message from a peer with expired squelch
if (it->second.state == PeerState::Squelched && now > it->second.expire) if (it->second.state == PeerState::Squelched && now > it->second.expire)
{ {
JLOG(journal_.debug()) JLOG(journal_.trace())
<< "update: squelch expired " << Slice(validator) << " " << id; << "update: squelch expired " << Slice(validator) << " " << id;
it->second.state = PeerState::Counting; it->second.state = PeerState::Counting;
it->second.lastMessage = now; it->second.lastMessage = now;
@@ -281,7 +291,7 @@ Slot<clock_type>::update(
auto& peer = it->second; auto& peer = it->second;
JLOG(journal_.debug()) JLOG(journal_.trace())
<< "update: existing peer " << Slice(validator) << " " << id << "update: existing peer " << Slice(validator) << " " << id
<< " slot state " << static_cast<int>(state_) << " peer state " << " slot state " << static_cast<int>(state_) << " peer state "
<< static_cast<int>(peer.state) << " count " << peer.count << " last " << static_cast<int>(peer.state) << " count " << peer.count << " last "
@@ -299,9 +309,9 @@ Slot<clock_type>::update(
if (peer.count == (MAX_MESSAGE_THRESHOLD + 1)) if (peer.count == (MAX_MESSAGE_THRESHOLD + 1))
++reachedThreshold_; ++reachedThreshold_;
if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE) if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE_DEFAULT)
{ {
JLOG(journal_.debug()) JLOG(journal_.trace())
<< "update: resetting due to inactivity " << Slice(validator) << " " << "update: resetting due to inactivity " << Slice(validator) << " "
<< id << " " << duration_cast<seconds>(now - lastSelected_).count(); << id << " " << duration_cast<seconds>(now - lastSelected_).count();
initCounting(); initCounting();
@@ -338,7 +348,7 @@ Slot<clock_type>::update(
if (selected.size() != MAX_SELECTED_PEERS) if (selected.size() != MAX_SELECTED_PEERS)
{ {
JLOG(journal_.debug()) JLOG(journal_.trace())
<< "update: selection failed " << Slice(validator) << " " << id; << "update: selection failed " << Slice(validator) << " " << id;
initCounting(); initCounting();
return; return;
@@ -347,11 +357,13 @@ Slot<clock_type>::update(
lastSelected_ = now; lastSelected_ = now;
auto s = selected.begin(); auto s = selected.begin();
JLOG(journal_.debug()) JLOG(journal_.trace())
<< "update: " << Slice(validator) << " " << id << " pool size " << "update: " << Slice(validator) << " " << id << " pool size "
<< consideredPoolSize << " selected " << *s << " " << consideredPoolSize << " selected " << *s << " "
<< *std::next(s, 1) << " " << *std::next(s, 2); << *std::next(s, 1) << " " << *std::next(s, 2);
assert(peers_.size() >= MAX_SELECTED_PEERS);
// squelch peers which are not selected and // squelch peers which are not selected and
// not already squelched // not already squelched
std::stringstream str; std::stringstream str;
@@ -363,18 +375,16 @@ Slot<clock_type>::update(
v.state = PeerState::Selected; v.state = PeerState::Selected;
else if (v.state != PeerState::Squelched) else if (v.state != PeerState::Squelched)
{ {
if (journal_.debug()) if (journal_.trace())
str << k << " "; str << k << " ";
v.state = PeerState::Squelched; v.state = PeerState::Squelched;
auto duration = Squelch<clock_type>::getSquelchDuration(); std::chrono::seconds duration =
getSquelchDuration(peers_.size() - MAX_SELECTED_PEERS);
v.expire = now + duration; v.expire = now + duration;
handler_.squelch( handler_.squelch(validator, k, duration.count());
validator,
k,
duration_cast<milliseconds>(duration).count());
} }
} }
JLOG(journal_.debug()) << "update: squelching " << Slice(validator) JLOG(journal_.trace()) << "update: squelching " << Slice(validator)
<< " " << id << " " << str.str(); << " " << id << " " << str.str();
considered_.clear(); considered_.clear();
reachedThreshold_ = 0; reachedThreshold_ = 0;
@@ -382,6 +392,22 @@ Slot<clock_type>::update(
} }
} }
template <typename clock_type>
std::chrono::seconds
Slot<clock_type>::getSquelchDuration(std::size_t npeers)
{
using namespace std::chrono;
auto m = std::max(
MAX_UNSQUELCH_EXPIRE_DEFAULT, seconds{SQUELCH_PER_PEER * npeers});
if (m > MAX_UNSQUELCH_EXPIRE_PEERS)
{
m = MAX_UNSQUELCH_EXPIRE_PEERS;
JLOG(journal_.warn())
<< "getSquelchDuration: unexpected squelch duration " << npeers;
}
return seconds{ripple::rand_int(MIN_UNSQUELCH_EXPIRE / 1s, m / 1s)};
}
template <typename clock_type> template <typename clock_type>
void void
Slot<clock_type>::deletePeer(PublicKey const& validator, id_t id, bool erase) Slot<clock_type>::deletePeer(PublicKey const& validator, id_t id, bool erase)
@@ -389,7 +415,7 @@ Slot<clock_type>::deletePeer(PublicKey const& validator, id_t id, bool erase)
auto it = peers_.find(id); auto it = peers_.find(id);
if (it != peers_.end()) if (it != peers_.end())
{ {
JLOG(journal_.debug()) JLOG(journal_.trace())
<< "deletePeer: " << Slice(validator) << " " << id << " selected " << "deletePeer: " << Slice(validator) << " " << id << " selected "
<< (it->second.state == PeerState::Selected) << " considered " << (it->second.state == PeerState::Selected) << " considered "
<< (considered_.find(id) != considered_.end()) << " erase " << (considered_.find(id) != considered_.end()) << " erase "
@@ -486,6 +512,7 @@ std::unordered_map<
std::tuple<PeerState, uint16_t, uint32_t, uint32_t>> std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
Slot<clock_type>::getPeers() const Slot<clock_type>::getPeers() const
{ {
using namespace std::chrono;
auto init = std::unordered_map< auto init = std::unordered_map<
id_t, id_t,
std::tuple<PeerState, std::uint16_t, std::uint32_t, std::uint32_t>>(); std::tuple<PeerState, std::uint16_t, std::uint32_t, std::uint32_t>>();
@@ -531,7 +558,6 @@ public:
* @param key Message's hash * @param key Message's hash
* @param validator Validator's public key * @param validator Validator's public key
* @param id Peer's id which received the message * @param id Peer's id which received the message
* @param id Peer's pointer which received the message
* @param type Received protocol message type * @param type Received protocol message type
*/ */
void void
@@ -643,7 +669,7 @@ template <typename clock_type>
bool bool
Slots<clock_type>::addPeerMessage(uint256 const& key, id_t id) Slots<clock_type>::addPeerMessage(uint256 const& key, id_t id)
{ {
beast::expire(peersWithMessage_, squelch::IDLED); beast::expire(peersWithMessage_, reduce_relay::IDLED);
if (key.isNonZero()) if (key.isNonZero())
{ {
@@ -686,7 +712,7 @@ Slots<clock_type>::updateSlotAndSquelch(
auto it = slots_.find(validator); auto it = slots_.find(validator);
if (it == slots_.end()) if (it == slots_.end())
{ {
JLOG(journal_.debug()) JLOG(journal_.trace())
<< "updateSlotAndSquelch: new slot " << Slice(validator); << "updateSlotAndSquelch: new slot " << Slice(validator);
auto it = slots_ auto it = slots_
.emplace(std::make_pair( .emplace(std::make_pair(
@@ -716,9 +742,9 @@ Slots<clock_type>::deleteIdlePeers()
for (auto it = slots_.begin(); it != slots_.end();) for (auto it = slots_.begin(); it != slots_.end();)
{ {
it->second.deleteIdlePeer(it->first); it->second.deleteIdlePeer(it->first);
if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE) if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT)
{ {
JLOG(journal_.debug()) JLOG(journal_.trace())
<< "deleteIdlePeers: deleting idle slot " << Slice(it->first); << "deleteIdlePeers: deleting idle slot " << Slice(it->first);
it = slots_.erase(it); it = slots_.erase(it);
} }
@@ -727,7 +753,7 @@ Slots<clock_type>::deleteIdlePeers()
} }
} }
} // namespace squelch } // namespace reduce_relay
} // namespace ripple } // namespace ripple

View File

@@ -21,15 +21,17 @@
#define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED #define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED
#include <ripple/basics/random.h> #include <ripple/basics/random.h>
#include <ripple/overlay/SquelchCommon.h> #include <ripple/beast/utility/Journal.h>
#include <ripple/overlay/ReduceRelayCommon.h>
#include <ripple/protocol/PublicKey.h> #include <ripple/protocol/PublicKey.h>
#include <algorithm>
#include <chrono> #include <chrono>
#include <functional> #include <functional>
namespace ripple { namespace ripple {
namespace squelch { namespace reduce_relay {
/** Maintains squelching of relaying messages from validators */ /** Maintains squelching of relaying messages from validators */
template <typename clock_type> template <typename clock_type>
@@ -38,84 +40,89 @@ class Squelch
using time_point = typename clock_type::time_point; using time_point = typename clock_type::time_point;
public: public:
Squelch() = default; explicit Squelch(beast::Journal journal) : journal_(journal)
{
}
virtual ~Squelch() = default; virtual ~Squelch() = default;
/** Squelch/Unsquelch relaying for the validator /** Squelch validation/proposal relaying for the validator
* @param validator The validator's public key * @param validator The validator's public key
* @param squelch Squelch/unsquelch flag * @param squelchDuration Squelch duration in seconds
* @param squelchDuration Squelch duration time if squelch is true * @return false if invalid squelch duration
*/
void
squelch(PublicKey const& validator, bool squelch, uint64_t squelchDuration);
/** Are the messages to this validator squelched
* @param validator Validator's public key
* @return true if squelched
*/ */
bool bool
isSquelched(PublicKey const& validator); addSquelch(
PublicKey const& validator,
std::chrono::seconds const& squelchDuration);
/** Get random squelch duration between MIN_UNSQUELCH_EXPIRE and /** Remove the squelch
* MAX_UNSQUELCH_EXPIRE */ * @param validator The validator's public key
static seconds */
getSquelchDuration(); void
removeSquelch(PublicKey const& validator);
/** Remove expired squelch
* @param validator Validator's public key
* @return true if removed or doesn't exist, false if still active
*/
bool
expireSquelch(PublicKey const& validator);
private: private:
/** Maintains the list of squelched relaying to downstream peers. /** Maintains the list of squelched relaying to downstream peers.
* Expiration time is included in the TMSquelch message. */ * Expiration time is included in the TMSquelch message. */
hash_map<PublicKey, time_point> squelched_; hash_map<PublicKey, time_point> squelched_;
beast::Journal const journal_;
}; };
template <typename clock_type>
void
Squelch<clock_type>::squelch(
PublicKey const& validator,
bool squelch,
uint64_t squelchDuration)
{
if (squelch)
{
squelched_[validator] = [squelchDuration]() {
seconds duration = seconds(squelchDuration);
return clock_type::now() +
((duration >= MIN_UNSQUELCH_EXPIRE &&
duration <= MAX_UNSQUELCH_EXPIRE)
? duration
: getSquelchDuration());
}();
}
else
squelched_.erase(validator);
}
template <typename clock_type> template <typename clock_type>
bool bool
Squelch<clock_type>::isSquelched(PublicKey const& validator) Squelch<clock_type>::addSquelch(
PublicKey const& validator,
std::chrono::seconds const& squelchDuration)
{ {
auto now = clock_type::now(); if (squelchDuration >= MIN_UNSQUELCH_EXPIRE &&
squelchDuration <= MAX_UNSQUELCH_EXPIRE_PEERS)
auto const& it = squelched_.find(validator); {
if (it == squelched_.end()) squelched_[validator] = clock_type::now() + squelchDuration;
return false;
else if (it->second > now)
return true; return true;
}
squelched_.erase(it); JLOG(journal_.error()) << "squelch: invalid squelch duration "
<< squelchDuration.count();
// unsquelch if invalid duration
removeSquelch(validator);
return false; return false;
} }
template <typename clock_type> template <typename clock_type>
seconds void
Squelch<clock_type>::getSquelchDuration() Squelch<clock_type>::removeSquelch(PublicKey const& validator)
{ {
auto d = seconds(ripple::rand_int( squelched_.erase(validator);
MIN_UNSQUELCH_EXPIRE.count(), MAX_UNSQUELCH_EXPIRE.count()));
return d;
} }
} // namespace squelch template <typename clock_type>
bool
Squelch<clock_type>::expireSquelch(PublicKey const& validator)
{
auto now = clock_type::now();
auto const& it = squelched_.find(validator);
if (it == squelched_.end())
return true;
else if (it->second > now)
return false;
// squelch expired
squelched_.erase(it);
return true;
}
} // namespace reduce_relay
} // namespace ripple } // namespace ripple

View File

@@ -202,7 +202,9 @@ ConnectAttempt::onHandshake(error_code ec)
return close(); // makeSharedValue logs return close(); // makeSharedValue logs
req_ = makeRequest( req_ = makeRequest(
!overlay_.peerFinder().config().peerPrivate, app_.config().COMPRESSION); !overlay_.peerFinder().config().peerPrivate,
app_.config().COMPRESSION,
app_.config().VP_REDUCE_RELAY_ENABLE);
buildHandshake( buildHandshake(
req_, req_,
@@ -281,23 +283,6 @@ ConnectAttempt::onShutdown(error_code ec)
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
auto
ConnectAttempt::makeRequest(bool crawl, bool compressionEnabled) -> request_type
{
request_type m;
m.method(boost::beast::http::verb::get);
m.target("/");
m.version(11);
m.insert("User-Agent", BuildInfo::getFullVersionString());
m.insert("Upgrade", supportedProtocolVersions());
m.insert("Connection", "Upgrade");
m.insert("Connect-As", "Peer");
m.insert("Crawl", crawl ? "public" : "private");
if (compressionEnabled)
m.insert("X-Offer-Compression", "lz4");
return m;
}
void void
ConnectAttempt::processResponse() ConnectAttempt::processResponse()
{ {

View File

@@ -104,10 +104,6 @@ private:
onRead(error_code ec); onRead(error_code ec);
void void
onShutdown(error_code ec); onShutdown(error_code ec);
static request_type
makeRequest(bool crawl, bool compressionEnabled);
void void
processResponse(); processResponse();

View File

@@ -34,6 +34,67 @@
namespace ripple { namespace ripple {
std::optional<std::string>
getFeatureValue(
boost::beast::http::fields const& headers,
std::string const& feature)
{
auto const header = headers.find("X-Protocol-Ctl");
if (header == headers.end())
return {};
boost::smatch match;
boost::regex rx(feature + "=([^;\\s]+)");
auto const value = header->value().to_string();
if (boost::regex_search(value, match, rx))
return {match[1]};
return {};
}
bool
isFeatureValue(
boost::beast::http::fields const& headers,
std::string const& feature,
std::string const& value)
{
if (auto const fvalue = getFeatureValue(headers, feature))
return beast::rfc2616::token_in_list(fvalue.value(), value);
return false;
}
bool
featureEnabled(
boost::beast::http::fields const& headers,
std::string const& feature)
{
return isFeatureValue(headers, feature, "1");
}
std::string
makeFeaturesRequestHeader(bool comprEnabled, bool vpReduceRelayEnabled)
{
std::stringstream str;
if (comprEnabled)
str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE;
if (vpReduceRelayEnabled)
str << FEATURE_VPRR << "=1";
return str.str();
}
std::string
makeFeaturesResponseHeader(
http_request_type const& headers,
bool comprEnabled,
bool vpReduceRelayEnabled)
{
std::stringstream str;
if (comprEnabled && isFeatureValue(headers, FEATURE_COMPR, "lz4"))
str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE;
if (vpReduceRelayEnabled && featureEnabled(headers, FEATURE_VPRR))
str << FEATURE_VPRR << "=1";
return str.str();
}
/** Hashes the latest finished message from an SSL stream. /** Hashes the latest finished message from an SSL stream.
@param ssl the session to get the message from. @param ssl the session to get the message from.
@@ -48,7 +109,7 @@ namespace ripple {
this topic, see https://github.com/openssl/openssl/issues/5509 and this topic, see https://github.com/openssl/openssl/issues/5509 and
https://github.com/ripple/rippled/issues/2413. https://github.com/ripple/rippled/issues/2413.
*/ */
static boost::optional<base_uint<512>> static std::optional<base_uint<512>>
hashLastMessage(SSL const* ssl, size_t (*get)(const SSL*, void*, size_t)) hashLastMessage(SSL const* ssl, size_t (*get)(const SSL*, void*, size_t))
{ {
constexpr std::size_t sslMinimumFinishedLength = 12; constexpr std::size_t sslMinimumFinishedLength = 12;
@@ -57,7 +118,7 @@ hashLastMessage(SSL const* ssl, size_t (*get)(const SSL*, void*, size_t))
size_t len = get(ssl, buf, sizeof(buf)); size_t len = get(ssl, buf, sizeof(buf));
if (len < sslMinimumFinishedLength) if (len < sslMinimumFinishedLength)
return boost::none; return std::nullopt;
sha512_hasher h; sha512_hasher h;
@@ -66,14 +127,14 @@ hashLastMessage(SSL const* ssl, size_t (*get)(const SSL*, void*, size_t))
return cookie; return cookie;
} }
boost::optional<uint256> std::optional<uint256>
makeSharedValue(stream_type& ssl, beast::Journal journal) makeSharedValue(stream_type& ssl, beast::Journal journal)
{ {
auto const cookie1 = hashLastMessage(ssl.native_handle(), SSL_get_finished); auto const cookie1 = hashLastMessage(ssl.native_handle(), SSL_get_finished);
if (!cookie1) if (!cookie1)
{ {
JLOG(journal.error()) << "Cookie generation: local setup not complete"; JLOG(journal.error()) << "Cookie generation: local setup not complete";
return boost::none; return std::nullopt;
} }
auto const cookie2 = auto const cookie2 =
@@ -81,7 +142,7 @@ makeSharedValue(stream_type& ssl, beast::Journal journal)
if (!cookie2) if (!cookie2)
{ {
JLOG(journal.error()) << "Cookie generation: peer setup not complete"; JLOG(journal.error()) << "Cookie generation: peer setup not complete";
return boost::none; return std::nullopt;
} }
auto const result = (*cookie1 ^ *cookie2); auto const result = (*cookie1 ^ *cookie2);
@@ -92,7 +153,7 @@ makeSharedValue(stream_type& ssl, beast::Journal journal)
{ {
JLOG(journal.error()) JLOG(journal.error())
<< "Cookie generation: identical finished messages"; << "Cookie generation: identical finished messages";
return boost::none; return std::nullopt;
} }
return sha512Half(Slice(result.data(), result.size())); return sha512Half(Slice(result.data(), result.size()));
@@ -102,7 +163,7 @@ void
buildHandshake( buildHandshake(
boost::beast::http::fields& h, boost::beast::http::fields& h,
ripple::uint256 const& sharedValue, ripple::uint256 const& sharedValue,
boost::optional<std::uint32_t> networkID, std::optional<std::uint32_t> networkID,
beast::IP::Address public_ip, beast::IP::Address public_ip,
beast::IP::Address remote_ip, beast::IP::Address remote_ip,
Application& app) Application& app)
@@ -155,7 +216,7 @@ PublicKey
verifyHandshake( verifyHandshake(
boost::beast::http::fields const& headers, boost::beast::http::fields const& headers,
ripple::uint256 const& sharedValue, ripple::uint256 const& sharedValue,
boost::optional<std::uint32_t> networkID, std::optional<std::uint32_t> networkID,
beast::IP::Address public_ip, beast::IP::Address public_ip,
beast::IP::Address remote, beast::IP::Address remote,
Application& app) Application& app)
@@ -291,4 +352,54 @@ verifyHandshake(
return publicKey; return publicKey;
} }
auto
makeRequest(bool crawlPublic, bool comprEnabled, bool vpReduceRelayEnabled)
-> request_type
{
request_type m;
m.method(boost::beast::http::verb::get);
m.target("/");
m.version(11);
m.insert("User-Agent", BuildInfo::getFullVersionString());
m.insert("Upgrade", supportedProtocolVersions());
m.insert("Connection", "Upgrade");
m.insert("Connect-As", "Peer");
m.insert("Crawl", crawlPublic ? "public" : "private");
m.insert(
"X-Protocol-Ctl",
makeFeaturesRequestHeader(comprEnabled, vpReduceRelayEnabled));
return m;
}
http_response_type
makeResponse(
bool crawlPublic,
http_request_type const& req,
beast::IP::Address public_ip,
beast::IP::Address remote_ip,
uint256 const& sharedValue,
std::optional<std::uint32_t> networkID,
ProtocolVersion protocol,
Application& app)
{
http_response_type resp;
resp.result(boost::beast::http::status::switching_protocols);
resp.version(req.version());
resp.insert("Connection", "Upgrade");
resp.insert("Upgrade", to_string(protocol));
resp.insert("Connect-As", "Peer");
resp.insert("Server", BuildInfo::getFullVersionString());
resp.insert("Crawl", crawlPublic ? "public" : "private");
resp.insert(
"X-Protocol-Ctl",
makeFeaturesResponseHeader(
req,
app.config().COMPRESSION,
app.config().VP_REDUCE_RELAY_ENABLE));
buildHandshake(resp, sharedValue, networkID, public_ip, remote_ip, app);
return resp;
}
} // namespace ripple } // namespace ripple

View File

@@ -22,6 +22,7 @@
#include <ripple/app/main/Application.h> #include <ripple/app/main/Application.h>
#include <ripple/beast/utility/Journal.h> #include <ripple/beast/utility/Journal.h>
#include <ripple/overlay/impl/ProtocolVersion.h>
#include <ripple/protocol/BuildInfo.h> #include <ripple/protocol/BuildInfo.h>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/context.hpp> #include <boost/asio/ssl/context.hpp>
@@ -30,14 +31,22 @@
#include <boost/beast/ssl/ssl_stream.hpp> #include <boost/beast/ssl/ssl_stream.hpp>
#include <boost/asio/ssl.hpp> #include <boost/asio/ssl.hpp>
#include <boost/beast/http/dynamic_body.hpp>
#include <boost/beast/http/empty_body.hpp>
#include <boost/beast/http/fields.hpp> #include <boost/beast/http/fields.hpp>
#include <boost/optional.hpp> #include <optional>
#include <utility> #include <utility>
namespace ripple { namespace ripple {
using socket_type = boost::beast::tcp_stream; using socket_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<socket_type>; using stream_type = boost::beast::ssl_stream<socket_type>;
using request_type =
boost::beast::http::request<boost::beast::http::empty_body>;
using http_request_type =
boost::beast::http::request<boost::beast::http::dynamic_body>;
using http_response_type =
boost::beast::http::response<boost::beast::http::dynamic_body>;
/** Computes a shared value based on the SSL connection state. /** Computes a shared value based on the SSL connection state.
@@ -48,7 +57,7 @@ using stream_type = boost::beast::ssl_stream<socket_type>;
@param ssl the SSL/TLS connection state. @param ssl the SSL/TLS connection state.
@return A 256-bit value on success; an unseated optional otherwise. @return A 256-bit value on success; an unseated optional otherwise.
*/ */
boost::optional<uint256> std::optional<uint256>
makeSharedValue(stream_type& ssl, beast::Journal journal); makeSharedValue(stream_type& ssl, beast::Journal journal);
/** Insert fields headers necessary for upgrading the link to the peer protocol. /** Insert fields headers necessary for upgrading the link to the peer protocol.
@@ -57,7 +66,7 @@ void
buildHandshake( buildHandshake(
boost::beast::http::fields& h, boost::beast::http::fields& h,
uint256 const& sharedValue, uint256 const& sharedValue,
boost::optional<std::uint32_t> networkID, std::optional<std::uint32_t> networkID,
beast::IP::Address public_ip, beast::IP::Address public_ip,
beast::IP::Address remote_ip, beast::IP::Address remote_ip,
Application& app); Application& app);
@@ -77,11 +86,144 @@ PublicKey
verifyHandshake( verifyHandshake(
boost::beast::http::fields const& headers, boost::beast::http::fields const& headers,
uint256 const& sharedValue, uint256 const& sharedValue,
boost::optional<std::uint32_t> networkID, std::optional<std::uint32_t> networkID,
beast::IP::Address public_ip, beast::IP::Address public_ip,
beast::IP::Address remote, beast::IP::Address remote,
Application& app); Application& app);
/** Make outbound http request
@param crawlPublic if true then server's IP/Port are included in crawl
@param comprEnabled if true then compression feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
@return http request with empty body
*/
request_type
makeRequest(bool crawlPublic, bool comprEnabled, bool vpReduceRelayEnabled);
/** Make http response
@param crawlPublic if true then server's IP/Port are included in crawl
@param req incoming http request
@param public_ip server's public IP
@param remote_ip peer's IP
@param sharedValue shared value based on the SSL connection state
@param networkID specifies what network we intend to connect to
@param version supported protocol version
@param app Application's reference to access some common properties
@return http response
*/
http_response_type
makeResponse(
bool crawlPublic,
http_request_type const& req,
beast::IP::Address public_ip,
beast::IP::Address remote_ip,
uint256 const& sharedValue,
std::optional<std::uint32_t> networkID,
ProtocolVersion version,
Application& app);
// Protocol features negotiated via HTTP handshake.
// The format is:
// X-Protocol-Ctl: feature1=value1[,value2]*[\s*;\s*feature2=value1[,value2]*]*
// value: \S+
static constexpr char FEATURE_COMPR[] = "compr"; // compression
static constexpr char FEATURE_VPRR[] =
"vprr"; // validation/proposal reduce-relay
static constexpr char DELIM_FEATURE[] = ";";
static constexpr char DELIM_VALUE[] = ",";
/** Get feature's header value
@param headers request/response header
@param feature name
@return seated optional with feature's value if the feature
is found in the header, unseated optional otherwise
*/
std::optional<std::string>
getFeatureValue(
boost::beast::http::fields const& headers,
std::string const& feature);
/** Check if a feature's value is equal to the specified value
@param headers request/response header
@param feature to check
@param value of the feature to check, must be a single value; i.e. not
value1,value2...
@return true if the feature's value matches the specified value, false if
doesn't match or the feature is not found in the header
*/
bool
isFeatureValue(
boost::beast::http::fields const& headers,
std::string const& feature,
std::string const& value);
/** Check if a feature is enabled
@param headers request/response header
@param feature to check
@return true if enabled
*/
bool
featureEnabled(
boost::beast::http::fields const& headers,
std::string const& feature);
/** Check if a feature should be enabled for a peer. The feature
is enabled if its configured value is true and the http header
has the specified feature value.
@tparam headers request (inbound) or response (outbound) header
@param request http headers
@param feature to check
@param config feature's configuration value
@param value feature's value to check in the headers
@return true if the feature is enabled
*/
template <typename headers>
bool
peerFeatureEnabled(
headers const& request,
std::string const& feature,
std::string value,
bool config)
{
return config && isFeatureValue(request, feature, value);
}
/** Wrapper for enable(1)/disable type(0) of feature */
template <typename headers>
bool
peerFeatureEnabled(
headers const& request,
std::string const& feature,
bool config)
{
return config && peerFeatureEnabled(request, feature, "1", config);
}
/** Make request header X-Protocol-Ctl value with supported features
@param comprEnabled if true then compression feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
@return X-Protocol-Ctl header value
*/
std::string
makeFeaturesRequestHeader(bool comprEnabled, bool vpReduceRelayEnabled);
/** Make response header X-Protocol-Ctl value with supported features.
If the request has a feature that we support enabled
and the feature's configuration is enabled then enable this feature in
the response header.
@param header request's header
@param comprEnabled if true then compression feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
@return X-Protocol-Ctl header value
*/
std::string
makeFeaturesResponseHeader(
http_request_type const& headers,
bool comprEnabled,
bool vpReduceRelayEnabled);
} // namespace ripple } // namespace ripple
#endif #endif

View File

@@ -1388,7 +1388,7 @@ std::shared_ptr<Message>
makeSquelchMessage( makeSquelchMessage(
PublicKey const& validator, PublicKey const& validator,
bool squelch, bool squelch,
uint64_t squelchDuration) uint32_t squelchDuration)
{ {
protocol::TMSquelch m; protocol::TMSquelch m;
m.set_squelch(squelch); m.set_squelch(squelch);
@@ -1402,12 +1402,11 @@ void
OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const
{ {
if (auto peer = findPeerByShortID(id); if (auto peer = findPeerByShortID(id);
peer && app_.config().REDUCE_RELAY_SQUELCH) peer && app_.config().VP_REDUCE_RELAY_SQUELCH)
{ {
// optimize - multiple message with different // optimize - multiple message with different
// validator might be sent to the same peer // validator might be sent to the same peer
auto m = makeSquelchMessage(validator, false, 0); peer->send(makeSquelchMessage(validator, false, 0));
peer->send(m);
} }
} }
@@ -1418,10 +1417,9 @@ OverlayImpl::squelch(
uint32_t squelchDuration) const uint32_t squelchDuration) const
{ {
if (auto peer = findPeerByShortID(id); if (auto peer = findPeerByShortID(id);
peer && app_.config().REDUCE_RELAY_SQUELCH) peer && app_.config().VP_REDUCE_RELAY_SQUELCH)
{ {
auto m = makeSquelchMessage(validator, true, squelchDuration); peer->send(makeSquelchMessage(validator, true, squelchDuration));
peer->send(m);
} }
} }

View File

@@ -54,7 +54,7 @@ namespace ripple {
class PeerImp; class PeerImp;
class BasicConfig; class BasicConfig;
class OverlayImpl : public Overlay, public squelch::SquelchHandler class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler
{ {
public: public:
class Child class Child
@@ -126,7 +126,7 @@ private:
boost::optional<std::uint32_t> networkID_; boost::optional<std::uint32_t> networkID_;
squelch::Slots<UptimeClock> slots_; reduce_relay::Slots<UptimeClock> slots_;
// A message with the list of manifests we send to peers // A message with the list of manifests we send to peers
std::shared_ptr<Message> manifestMessage_; std::shared_ptr<Message> manifestMessage_;

View File

@@ -94,16 +94,30 @@ PeerImp::PeerImp(
, publicKey_(publicKey) , publicKey_(publicKey)
, lastPingTime_(clock_type::now()) , lastPingTime_(clock_type::now())
, creationTime_(clock_type::now()) , creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
, usage_(consumer) , usage_(consumer)
, fee_(Resource::feeLightPeer) , fee_(Resource::feeLightPeer)
, slot_(slot) , slot_(slot)
, request_(std::move(request)) , request_(std::move(request))
, headers_(request_) , headers_(request_)
, compressionEnabled_( , compressionEnabled_(
headers_["X-Offer-Compression"] == "lz4" && app_.config().COMPRESSION peerFeatureEnabled(
headers_,
FEATURE_COMPR,
"lz4",
app_.config().COMPRESSION)
? Compressed::On ? Compressed::On
: Compressed::Off) : Compressed::Off)
, vpReduceRelayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_ENABLE))
{ {
JLOG(journal_.debug()) << " compression enabled "
<< (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay enabled "
<< vpReduceRelayEnabled_ << " on " << remote_address_
<< " " << id_;
} }
PeerImp::~PeerImp() PeerImp::~PeerImp()
@@ -223,7 +237,7 @@ PeerImp::send(std::shared_ptr<Message> const& m)
return; return;
auto validator = m->getValidatorKey(); auto validator = m->getValidatorKey();
if (validator && squelch_.isSquelched(*validator)) if (validator && !squelch_.expireSquelch(*validator))
return; return;
overlay_.reportTraffic( overlay_.reportTraffic(
@@ -739,36 +753,17 @@ PeerImp::doAccept()
// XXX Set timer: connection idle (idle may vary depending on connection // XXX Set timer: connection idle (idle may vary depending on connection
// type.) // type.)
auto write_buffer = [this, sharedValue]() { auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
auto buf = std::make_shared<boost::beast::multi_buffer>();
http_response_type resp; boost::beast::ostream(*write_buffer) << makeResponse(
resp.result(boost::beast::http::status::switching_protocols); !overlay_.peerFinder().config().peerPrivate,
resp.version(request_.version()); request_,
resp.insert("Connection", "Upgrade"); overlay_.setup().public_ip,
resp.insert("Upgrade", to_string(protocol_)); remote_address_.address(),
resp.insert("Connect-As", "Peer"); *sharedValue,
resp.insert("Server", BuildInfo::getFullVersionString()); overlay_.setup().networkID,
resp.insert( protocol_,
"Crawl", app_);
overlay_.peerFinder().config().peerPrivate ? "private" : "public");
if (request_["X-Offer-Compression"] == "lz4" &&
app_.config().COMPRESSION)
resp.insert("X-Offer-Compression", "lz4");
buildHandshake(
resp,
*sharedValue,
overlay_.setup().networkID,
overlay_.setup().public_ip,
remote_address_.address(),
app_);
boost::beast::ostream(*buf) << resp;
return buf;
}();
// Write the whole buffer and only start protocol when that's done. // Write the whole buffer and only start protocol when that's done.
boost::asio::async_write( boost::asio::async_write(
@@ -971,13 +966,17 @@ void
PeerImp::onMessageBegin( PeerImp::onMessageBegin(
std::uint16_t type, std::uint16_t type,
std::shared_ptr<::google::protobuf::Message> const& m, std::shared_ptr<::google::protobuf::Message> const& m,
std::size_t size) std::size_t size,
std::size_t uncompressed_size,
bool isCompressed)
{ {
load_event_ = load_event_ =
app_.getJobQueue().makeLoadEvent(jtPEER, protocolMessageName(type)); app_.getJobQueue().makeLoadEvent(jtPEER, protocolMessageName(type));
fee_ = Resource::feeLightPeer; fee_ = Resource::feeLightPeer;
overlay_.reportTraffic( overlay_.reportTraffic(
TrafficCount::categorize(*m, type, true), true, static_cast<int>(size)); TrafficCount::categorize(*m, type, true), true, static_cast<int>(size));
JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " "
<< uncompressed_size << " " << isCompressed;
} }
void void
@@ -1566,12 +1565,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
{ {
// Count unique messages (Slots has it's own 'HashRouter'), which a peer // Count unique messages (Slots has it's own 'HashRouter'), which a peer
// receives within IDLED seconds since the message has been relayed. // receives within IDLED seconds since the message has been relayed.
// Wait WAIT_ON_BOOTUP time to let the server establish connections to if (reduceRelayReady() && relayed &&
// peers. (stopwatch().now() - *relayed) < reduce_relay::IDLED)
if (app_.config().REDUCE_RELAY_ENABLE && relayed &&
(stopwatch().now() - *relayed) < squelch::IDLED &&
squelch::epoch<std::chrono::minutes>(UptimeClock::now()) >
squelch::WAIT_ON_BOOTUP)
overlay_.updateSlotAndSquelch( overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER); suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
JLOG(p_journal_.trace()) << "Proposal: duplicate"; JLOG(p_journal_.trace()) << "Proposal: duplicate";
@@ -2173,10 +2168,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
// peer receives within IDLED seconds since the message has been // peer receives within IDLED seconds since the message has been
// relayed. Wait WAIT_ON_BOOTUP time to let the server establish // relayed. Wait WAIT_ON_BOOTUP time to let the server establish
// connections to peers. // connections to peers.
if (app_.config().REDUCE_RELAY_ENABLE && (bool)relayed && if (reduceRelayReady() && relayed &&
(stopwatch().now() - *relayed) < squelch::IDLED && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
squelch::epoch<std::chrono::minutes>(UptimeClock::now()) >
squelch::WAIT_ON_BOOTUP)
overlay_.updateSlotAndSquelch( overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, protocol::mtVALIDATION); key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
JLOG(p_journal_.trace()) << "Validation: duplicate"; JLOG(p_journal_.trace()) << "Validation: duplicate";
@@ -2358,6 +2351,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
{ {
using on_message_fn =
void (PeerImp::*)(std::shared_ptr<protocol::TMSquelch> const&);
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
(on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
if (!m->has_validatorpubkey()) if (!m->has_validatorpubkey())
{ {
charge(Resource::feeBadData); charge(Resource::feeBadData);
@@ -2371,9 +2372,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
return; return;
} }
PublicKey key(slice); PublicKey key(slice);
auto squelch = m->squelch();
auto duration = m->has_squelchduration() ? m->squelchduration() : 0;
auto sp = shared_from_this();
// Ignore the squelch for validator's own messages. // Ignore the squelch for validator's own messages.
if (key == app_.getValidationPublicKey()) if (key == app_.getValidationPublicKey())
@@ -2383,15 +2381,15 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
return; return;
} }
if (!strand_.running_in_this_thread()) std::uint32_t duration =
return post(strand_, [sp, key, squelch, duration]() { m->has_squelchduration() ? m->squelchduration() : 0;
sp->squelch_.squelch(key, squelch, duration); if (!m->squelch())
}); squelch_.removeSquelch(key);
else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
charge(Resource::feeBadData);
JLOG(p_journal_.debug()) JLOG(p_journal_.debug())
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration; << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
squelch_.squelch(key, squelch, duration);
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
@@ -2555,9 +2553,7 @@ PeerImp::checkPropose(
// as part of the squelch logic. // as part of the squelch logic.
auto haveMessage = app_.overlay().relay( auto haveMessage = app_.overlay().relay(
*packet, peerPos.suppressionID(), peerPos.publicKey()); *packet, peerPos.suppressionID(), peerPos.publicKey());
if (app_.config().REDUCE_RELAY_ENABLE && !haveMessage.empty() && if (reduceRelayReady() && !haveMessage.empty())
squelch::epoch<std::chrono::minutes>(UptimeClock::now()) >
squelch::WAIT_ON_BOOTUP)
overlay_.updateSlotAndSquelch( overlay_.updateSlotAndSquelch(
peerPos.suppressionID(), peerPos.suppressionID(),
peerPos.publicKey(), peerPos.publicKey(),
@@ -2592,9 +2588,7 @@ PeerImp::checkValidation(
// as part of the squelch logic. // as part of the squelch logic.
auto haveMessage = auto haveMessage =
overlay_.relay(*packet, suppression, val->getSignerPublic()); overlay_.relay(*packet, suppression, val->getSignerPublic());
if (app_.config().REDUCE_RELAY_ENABLE && !haveMessage.empty() && if (reduceRelayReady() && !haveMessage.empty())
squelch::epoch<std::chrono::minutes>(UptimeClock::now()) >
squelch::WAIT_ON_BOOTUP)
{ {
overlay_.updateSlotAndSquelch( overlay_.updateSlotAndSquelch(
suppression, suppression,
@@ -3038,6 +3032,16 @@ PeerImp::isHighLatency() const
return latency_ >= peerHighLatency; return latency_ >= peerHighLatency;
} }
bool
PeerImp::reduceRelayReady()
{
if (!reduceRelayReady_)
reduceRelayReady_ =
reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
reduce_relay::WAIT_ON_BOOTUP;
return vpReduceRelayEnabled_ && reduceRelayReady_;
}
void void
PeerImp::Metrics::add_message(std::uint64_t bytes) PeerImp::Metrics::add_message(std::uint64_t bytes)
{ {

View File

@@ -118,7 +118,8 @@ private:
clock_type::time_point lastPingTime_; clock_type::time_point lastPingTime_;
clock_type::time_point const creationTime_; clock_type::time_point const creationTime_;
squelch::Squelch<UptimeClock> squelch_; reduce_relay::Squelch<UptimeClock> squelch_;
inline static std::atomic_bool reduceRelayReady_{false};
// Notes on thread locking: // Notes on thread locking:
// //
@@ -169,6 +170,10 @@ private:
Compressed compressionEnabled_ = Compressed::Off; Compressed compressionEnabled_ = Compressed::Off;
// true if validation/proposal reduce-relay feature is enabled
// on the peer.
bool vpReduceRelayEnabled_ = false;
friend class OverlayImpl; friend class OverlayImpl;
class Metrics class Metrics
@@ -459,6 +464,11 @@ private:
void void
onWriteMessage(error_code ec, std::size_t bytes_transferred); onWriteMessage(error_code ec, std::size_t bytes_transferred);
// Check if reduce-relay feature is enabled and
// reduce_relay::WAIT_ON_BOOTUP time passed since the start
bool
reduceRelayReady();
public: public:
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// //
@@ -473,7 +483,9 @@ public:
onMessageBegin( onMessageBegin(
std::uint16_t type, std::uint16_t type,
std::shared_ptr<::google::protobuf::Message> const& m, std::shared_ptr<::google::protobuf::Message> const& m,
std::size_t size); std::size_t size,
std::size_t uncompressed_size,
bool isCompressed);
void void
onMessageEnd( onMessageEnd(
@@ -594,18 +606,32 @@ PeerImp::PeerImp(
, publicKey_(publicKey) , publicKey_(publicKey)
, lastPingTime_(clock_type::now()) , lastPingTime_(clock_type::now())
, creationTime_(clock_type::now()) , creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
, usage_(usage) , usage_(usage)
, fee_(Resource::feeLightPeer) , fee_(Resource::feeLightPeer)
, slot_(std::move(slot)) , slot_(std::move(slot))
, response_(std::move(response)) , response_(std::move(response))
, headers_(response_) , headers_(response_)
, compressionEnabled_( , compressionEnabled_(
headers_["X-Offer-Compression"] == "lz4" && app_.config().COMPRESSION peerFeatureEnabled(
headers_,
FEATURE_COMPR,
"lz4",
app_.config().COMPRESSION)
? Compressed::On ? Compressed::On
: Compressed::Off) : Compressed::Off)
, vpReduceRelayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_ENABLE))
{ {
read_buffer_.commit(boost::asio::buffer_copy( read_buffer_.commit(boost::asio::buffer_copy(
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers)); read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
JLOG(journal_.debug()) << "compression enabled "
<< (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay enabled "
<< vpReduceRelayEnabled_ << " on " << remote_address_
<< " " << id_;
} }
template <class FwdIt, class> template <class FwdIt, class>

View File

@@ -31,6 +31,7 @@
#include <cassert> #include <cassert>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <optional>
#include <type_traits> #include <type_traits>
#include <vector> #include <vector>
@@ -124,15 +125,25 @@ buffersBegin(BufferSequence const& bufs)
bufs); bufs);
} }
template <typename BufferSequence>
auto
buffersEnd(BufferSequence const& bufs)
{
return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::end(
bufs);
}
/** Parse a message header /** Parse a message header
* @return a seated optional if the message header was successfully * @return a seated optional if the message header was successfully
* parsed. An unseated optional otherwise, in which case * parsed. An unseated optional otherwise, in which case
* @param ec contains more information: * @param ec contains more information:
* - set to `errc::success` if not enough bytes were present * - set to `errc::success` if not enough bytes were present
* - set to `errc::no_message` if a valid header was not present * - set to `errc::no_message` if a valid header was not present
* @bufs - sequence of input buffers, can't be empty
* @size input data size
*/ */
template <class BufferSequence> template <class BufferSequence>
boost::optional<MessageHeader> std::optional<MessageHeader>
parseMessageHeader( parseMessageHeader(
boost::system::error_code& ec, boost::system::error_code& ec,
BufferSequence const& bufs, BufferSequence const& bufs,
@@ -142,6 +153,7 @@ parseMessageHeader(
MessageHeader hdr; MessageHeader hdr;
auto iter = buffersBegin(bufs); auto iter = buffersBegin(bufs);
assert(iter != buffersEnd(bufs));
// Check valid header compressed message: // Check valid header compressed message:
// - 4 bits are the compression algorithm, 1st bit is always set to 1 // - 4 bits are the compression algorithm, 1st bit is always set to 1
@@ -156,13 +168,13 @@ parseMessageHeader(
if (size < hdr.header_size) if (size < hdr.header_size)
{ {
ec = make_error_code(boost::system::errc::success); ec = make_error_code(boost::system::errc::success);
return boost::none; return std::nullopt;
} }
if (*iter & 0x0C) if (*iter & 0x0C)
{ {
ec = make_error_code(boost::system::errc::protocol_error); ec = make_error_code(boost::system::errc::protocol_error);
return boost::none; return std::nullopt;
} }
hdr.algorithm = static_cast<compression::Algorithm>(*iter & 0xF0); hdr.algorithm = static_cast<compression::Algorithm>(*iter & 0xF0);
@@ -170,7 +182,7 @@ parseMessageHeader(
if (hdr.algorithm != compression::Algorithm::LZ4) if (hdr.algorithm != compression::Algorithm::LZ4)
{ {
ec = make_error_code(boost::system::errc::protocol_error); ec = make_error_code(boost::system::errc::protocol_error);
return boost::none; return std::nullopt;
} }
for (int i = 0; i != 4; ++i) for (int i = 0; i != 4; ++i)
@@ -200,7 +212,7 @@ parseMessageHeader(
if (size < hdr.header_size) if (size < hdr.header_size)
{ {
ec = make_error_code(boost::system::errc::success); ec = make_error_code(boost::system::errc::success);
return boost::none; return std::nullopt;
} }
hdr.algorithm = Algorithm::None; hdr.algorithm = Algorithm::None;
@@ -218,7 +230,7 @@ parseMessageHeader(
} }
ec = make_error_code(boost::system::errc::no_message); ec = make_error_code(boost::system::errc::no_message);
return boost::none; return std::nullopt;
} }
template < template <
@@ -268,7 +280,13 @@ invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
if (!m) if (!m)
return false; return false;
handler.onMessageBegin(header.message_type, m, header.payload_wire_size); using namespace ripple::compression;
handler.onMessageBegin(
header.message_type,
m,
header.payload_wire_size,
header.uncompressed_size,
header.algorithm != Algorithm::None);
handler.onMessage(m); handler.onMessage(m);
handler.onMessageEnd(header.message_type, m); handler.onMessageEnd(header.message_type, m);

View File

@@ -362,6 +362,6 @@ message TMSquelch
{ {
required bool squelch = 1; // squelch if true, otherwise unsquelch required bool squelch = 1; // squelch if true, otherwise unsquelch
required bytes validatorPubKey = 2; // validator's public key required bytes validatorPubKey = 2; // validator's public key
optional uint32 squelchDuration = 3; // squelch duration in milliseconds optional uint32 squelchDuration = 3; // squelch duration in seconds
} }

View File

@@ -25,6 +25,7 @@
#include <ripple/core/TimeKeeper.h> #include <ripple/core/TimeKeeper.h>
#include <ripple/overlay/Compression.h> #include <ripple/overlay/Compression.h>
#include <ripple/overlay/Message.h> #include <ripple/overlay/Message.h>
#include <ripple/overlay/impl/Handshake.h>
#include <ripple/overlay/impl/ProtocolMessage.h> #include <ripple/overlay/impl/ProtocolMessage.h>
#include <ripple/overlay/impl/ZeroCopyStream.h> #include <ripple/overlay/impl/ZeroCopyStream.h>
#include <ripple/protocol/HashPrefix.h> #include <ripple/protocol/HashPrefix.h>
@@ -112,7 +113,7 @@ public:
BEAST_EXPECT(header); BEAST_EXPECT(header);
if (header->algorithm == Algorithm::None) if (!header || header->algorithm == Algorithm::None)
return; return;
std::vector<std::uint8_t> decompressed; std::vector<std::uint8_t> decompressed;
@@ -242,7 +243,7 @@ public:
uint256 const hash(ripple::sha512Half(123456789)); uint256 const hash(ripple::sha512Half(123456789));
getLedger->set_ledgerhash(hash.begin(), hash.size()); getLedger->set_ledgerhash(hash.begin(), hash.size());
getLedger->set_ledgerseq(123456789); getLedger->set_ledgerseq(123456789);
ripple::SHAMapNodeID sha(17, hash); ripple::SHAMapNodeID sha(64, hash);
getLedger->add_nodeids(sha.getRawString()); getLedger->add_nodeids(sha.getRawString());
getLedger->set_requestcookie(123456789); getLedger->set_requestcookie(123456789);
getLedger->set_querytype(protocol::qtINDIRECT); getLedger->set_querytype(protocol::qtINDIRECT);
@@ -302,7 +303,7 @@ public:
uint256 hash(ripple::sha512Half(i)); uint256 hash(ripple::sha512Half(i));
auto object = getObject->add_objects(); auto object = getObject->add_objects();
object->set_hash(hash.data(), hash.size()); object->set_hash(hash.data(), hash.size());
ripple::SHAMapNodeID sha(i % 55, hash); ripple::SHAMapNodeID sha(64, hash);
object->set_nodeid(sha.getRawString()); object->set_nodeid(sha.getRawString());
object->set_index(""); object->set_index("");
object->set_data(""); object->set_data("");
@@ -458,14 +459,80 @@ public:
"TMValidatorListCollection"); "TMValidatorListCollection");
} }
void
testHandshake()
{
testcase("Handshake");
auto getEnv = [&](bool enable) {
Config c;
std::stringstream str;
str << "[reduce_relay]\n"
<< "vp_enable=1\n"
<< "vp_squelch=1\n"
<< "[compression]\n"
<< enable << "\n";
c.loadFromString(str.str());
auto env = std::make_shared<jtx::Env>(*this);
env->app().config().COMPRESSION = c.COMPRESSION;
env->app().config().VP_REDUCE_RELAY_ENABLE =
c.VP_REDUCE_RELAY_ENABLE;
env->app().config().VP_REDUCE_RELAY_SQUELCH =
c.VP_REDUCE_RELAY_SQUELCH;
return env;
};
auto handshake = [&](int outboundEnable, int inboundEnable) {
beast::IP::Address addr =
boost::asio::ip::address::from_string("172.1.1.100");
auto env = getEnv(outboundEnable);
auto request = ripple::makeRequest(
true,
env->app().config().COMPRESSION,
env->app().config().VP_REDUCE_RELAY_ENABLE);
http_request_type http_request;
http_request.version(request.version());
http_request.base() = request.base();
// feature enabled on the peer's connection only if both sides are
// enabled
auto const peerEnabled = inboundEnable && outboundEnable;
// inbound is enabled if the request's header has the feature
// enabled and the peer's configuration is enabled
auto const inboundEnabled = peerFeatureEnabled(
http_request, FEATURE_COMPR, "lz4", inboundEnable);
BEAST_EXPECT(!(peerEnabled ^ inboundEnabled));
env.reset();
env = getEnv(inboundEnable);
auto http_resp = ripple::makeResponse(
true,
http_request,
addr,
addr,
uint256{1},
1,
{1, 0},
env->app());
// outbound is enabled if the response's header has the feature
// enabled and the peer's configuration is enabled
auto const outboundEnabled = peerFeatureEnabled(
http_resp, FEATURE_COMPR, "lz4", outboundEnable);
BEAST_EXPECT(!(peerEnabled ^ outboundEnabled));
};
handshake(1, 1);
handshake(1, 0);
handshake(0, 1);
handshake(0, 0);
}
void void
run() override run() override
{ {
testProtocol(); testProtocol();
testHandshake();
} }
}; };
BEAST_DEFINE_TESTSUITE_MANUAL_PRIO(compression, ripple_data, ripple, 20); BEAST_DEFINE_TESTSUITE_MANUAL(compression, ripple_data, ripple);
} // namespace test } // namespace test
} // namespace ripple } // namespace ripple

View File

@@ -0,0 +1,64 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/beast/unit_test.h>
#include <ripple/overlay/impl/Handshake.h>
namespace ripple {
namespace test {
class handshake_test : public beast::unit_test::suite
{
public:
handshake_test() = default;
void
testHandshake()
{
testcase("X-Protocol-Ctl");
boost::beast::http::fields headers;
headers.insert(
"X-Protocol-Ctl",
"feature1=v1,v2,v3; feature2=v4; feature3=10; feature4=1; "
"feature5=v6");
BEAST_EXPECT(!featureEnabled(headers, "feature1"));
BEAST_EXPECT(!isFeatureValue(headers, "feature1", "2"));
BEAST_EXPECT(isFeatureValue(headers, "feature1", "v1"));
BEAST_EXPECT(isFeatureValue(headers, "feature1", "v2"));
BEAST_EXPECT(isFeatureValue(headers, "feature1", "v3"));
BEAST_EXPECT(isFeatureValue(headers, "feature2", "v4"));
BEAST_EXPECT(!isFeatureValue(headers, "feature3", "1"));
BEAST_EXPECT(isFeatureValue(headers, "feature3", "10"));
BEAST_EXPECT(!isFeatureValue(headers, "feature4", "10"));
BEAST_EXPECT(isFeatureValue(headers, "feature4", "1"));
BEAST_EXPECT(!featureEnabled(headers, "v6"));
}
void
run() override
{
testHandshake();
}
};
BEAST_DEFINE_TESTSUITE(handshake, ripple_data, ripple);
} // namespace test
} // namespace ripple

View File

@@ -21,6 +21,7 @@
#include <ripple/overlay/Message.h> #include <ripple/overlay/Message.h>
#include <ripple/overlay/Peer.h> #include <ripple/overlay/Peer.h>
#include <ripple/overlay/Slot.h> #include <ripple/overlay/Slot.h>
#include <ripple/overlay/impl/Handshake.h>
#include <ripple/protocol/SecretKey.h> #include <ripple/protocol/SecretKey.h>
#include <ripple.pb.h> #include <ripple.pb.h>
#include <test/jtx/Env.h> #include <test/jtx/Env.h>
@@ -437,7 +438,8 @@ class PeerSim : public PeerPartial, public std::enable_shared_from_this<PeerSim>
{ {
public: public:
using id_t = Peer::id_t; using id_t = Peer::id_t;
PeerSim(Overlay& overlay) : overlay_(overlay) PeerSim(Overlay& overlay, beast::Journal journal)
: overlay_(overlay), squelch_(journal)
{ {
id_ = sid_++; id_ = sid_++;
} }
@@ -462,7 +464,7 @@ public:
{ {
auto validator = m->getValidatorKey(); auto validator = m->getValidatorKey();
assert(validator); assert(validator);
if (squelch_.isSquelched(*validator)) if (!squelch_.expireSquelch(*validator))
return; return;
overlay_.updateSlotAndSquelch({}, *validator, id(), f); overlay_.updateSlotAndSquelch({}, *validator, id(), f);
@@ -474,24 +476,28 @@ public:
{ {
auto validator = squelch.validatorpubkey(); auto validator = squelch.validatorpubkey();
PublicKey key(Slice(validator.data(), validator.size())); PublicKey key(Slice(validator.data(), validator.size()));
squelch_.squelch(key, squelch.squelch(), squelch.squelchduration()); if (squelch.squelch())
squelch_.addSquelch(
key, std::chrono::seconds{squelch.squelchduration()});
else
squelch_.removeSquelch(key);
} }
private: private:
inline static id_t sid_ = 0; inline static id_t sid_ = 0;
id_t id_; id_t id_;
Overlay& overlay_; Overlay& overlay_;
squelch::Squelch<ManualClock> squelch_; reduce_relay::Squelch<ManualClock> squelch_;
}; };
class OverlaySim : public Overlay, public squelch::SquelchHandler class OverlaySim : public Overlay, public reduce_relay::SquelchHandler
{ {
using Peers = std::unordered_map<Peer::id_t, PeerSPtr>; using Peers = std::unordered_map<Peer::id_t, PeerSPtr>;
public: public:
using id_t = Peer::id_t; using id_t = Peer::id_t;
using clock_type = ManualClock; using clock_type = ManualClock;
OverlaySim(Application& app) : slots_(app, *this) OverlaySim(Application& app) : slots_(app, *this), app_(app)
{ {
} }
@@ -506,7 +512,7 @@ public:
} }
std::uint16_t std::uint16_t
inState(PublicKey const& validator, squelch::PeerState state) inState(PublicKey const& validator, reduce_relay::PeerState state)
{ {
auto res = slots_.inState(validator, state); auto res = slots_.inState(validator, state);
return res ? *res : 0; return res ? *res : 0;
@@ -545,7 +551,7 @@ public:
Peer::id_t id; Peer::id_t id;
if (peersCache_.empty() || !useCache) if (peersCache_.empty() || !useCache)
{ {
peer = std::make_shared<PeerSim>(*this); peer = std::make_shared<PeerSim>(*this, app_.journal("Squelch"));
id = peer->id(); id = peer->id();
} }
else else
@@ -602,7 +608,7 @@ public:
bool bool
isCountingState(PublicKey const& validator) isCountingState(PublicKey const& validator)
{ {
return slots_.inState(validator, squelch::SlotState::Counting); return slots_.inState(validator, reduce_relay::SlotState::Counting);
} }
std::set<id_t> std::set<id_t>
@@ -629,7 +635,7 @@ public:
std::unordered_map< std::unordered_map<
id_t, id_t,
std::tuple< std::tuple<
squelch::PeerState, reduce_relay::PeerState,
std::uint16_t, std::uint16_t,
std::uint32_t, std::uint32_t,
std::uint32_t>> std::uint32_t>>
@@ -664,7 +670,8 @@ private:
UnsquelchCB unsquelch_; UnsquelchCB unsquelch_;
Peers peers_; Peers peers_;
Peers peersCache_; Peers peersCache_;
squelch::Slots<ManualClock> slots_; reduce_relay::Slots<ManualClock> slots_;
Application& app_;
}; };
class Network class Network
@@ -843,8 +850,8 @@ public:
for (auto& [_, v] : peers) for (auto& [_, v] : peers)
{ {
(void)_; (void)_;
if (std::get<squelch::PeerState>(v) == if (std::get<reduce_relay::PeerState>(v) ==
squelch::PeerState::Squelched) reduce_relay::PeerState::Squelched)
return false; return false;
} }
} }
@@ -858,7 +865,7 @@ private:
class reduce_relay_test : public beast::unit_test::suite class reduce_relay_test : public beast::unit_test::suite
{ {
using Slot = squelch::Slot<ManualClock>; using Slot = reduce_relay::Slot<ManualClock>;
using id_t = Peer::id_t; using id_t = Peer::id_t;
protected: protected:
@@ -870,7 +877,7 @@ protected:
<< "num peers " << (int)network_.overlay().getNumPeers() << "num peers " << (int)network_.overlay().getNumPeers()
<< std::endl; << std::endl;
for (auto& [k, v] : peers) for (auto& [k, v] : peers)
std::cout << k << ":" << (int)std::get<squelch::PeerState>(v) std::cout << k << ":" << (int)std::get<reduce_relay::PeerState>(v)
<< " "; << " ";
std::cout << std::endl; std::cout << std::endl;
} }
@@ -950,7 +957,8 @@ protected:
str << s << " "; str << s << " ";
if (log) if (log)
std::cout std::cout
<< (double)squelch::epoch<milliseconds>(now).count() / << (double)reduce_relay::epoch<milliseconds>(now)
.count() /
1000. 1000.
<< " random, squelched, validator: " << validator.id() << " random, squelched, validator: " << validator.id()
<< " peers: " << str.str() << std::endl; << " peers: " << str.str() << std::endl;
@@ -958,7 +966,7 @@ protected:
network_.overlay().isCountingState(validator); network_.overlay().isCountingState(validator);
BEAST_EXPECT( BEAST_EXPECT(
countingState == false && countingState == false &&
selected.size() == squelch::MAX_SELECTED_PEERS); selected.size() == reduce_relay::MAX_SELECTED_PEERS);
} }
// Trigger Link Down or Peer Disconnect event // Trigger Link Down or Peer Disconnect event
@@ -1038,12 +1046,13 @@ protected:
event.isSelected_ = event.isSelected_ =
network_.overlay().isSelected(event.key_, event.peer_); network_.overlay().isSelected(event.key_, event.peer_);
auto peers = network_.overlay().getPeers(event.key_); auto peers = network_.overlay().getPeers(event.key_);
auto d = squelch::epoch<milliseconds>(now).count() - auto d = reduce_relay::epoch<milliseconds>(now).count() -
std::get<3>(peers[event.peer_]); std::get<3>(peers[event.peer_]);
mustHandle = event.isSelected_ && mustHandle = event.isSelected_ &&
d > milliseconds(squelch::IDLED).count() && d > milliseconds(reduce_relay::IDLED).count() &&
network_.overlay().inState( network_.overlay().inState(
event.key_, squelch::PeerState::Squelched) > 0 && event.key_, reduce_relay::PeerState::Squelched) >
0 &&
peers.find(event.peer_) != peers.end(); peers.find(event.peer_) != peers.end();
} }
network_.overlay().deleteIdlePeers( network_.overlay().deleteIdlePeers(
@@ -1062,7 +1071,7 @@ protected:
} }
if (event.state_ == State::WaitReset || if (event.state_ == State::WaitReset ||
(event.state_ == State::On && (event.state_ == State::On &&
(now - event.time_ > (squelch::IDLED + seconds(2))))) (now - event.time_ > (reduce_relay::IDLED + seconds(2)))))
{ {
bool handled = bool handled =
event.state_ == State::WaitReset || !event.handled_; event.state_ == State::WaitReset || !event.handled_;
@@ -1158,16 +1167,17 @@ protected:
if (squelched) if (squelched)
{ {
BEAST_EXPECT( BEAST_EXPECT(
squelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); squelched ==
MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS);
n++; n++;
} }
}, },
1, 1,
squelch::MAX_MESSAGE_THRESHOLD + 2, reduce_relay::MAX_MESSAGE_THRESHOLD + 2,
purge, purge,
resetClock); resetClock);
auto selected = network_.overlay().getSelected(network_.validator(0)); auto selected = network_.overlay().getSelected(network_.validator(0));
BEAST_EXPECT(selected.size() == squelch::MAX_SELECTED_PEERS); BEAST_EXPECT(selected.size() == reduce_relay::MAX_SELECTED_PEERS);
BEAST_EXPECT(n == 1); // only one selection round BEAST_EXPECT(n == 1); // only one selection round
auto res = checkCounting(network_.validator(0), false); auto res = checkCounting(network_.validator(0), false);
BEAST_EXPECT(res); BEAST_EXPECT(res);
@@ -1231,7 +1241,7 @@ protected:
unsquelched++; unsquelched++;
}); });
BEAST_EXPECT( BEAST_EXPECT(
unsquelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); unsquelched == MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS);
BEAST_EXPECT(checkCounting(network_.validator(0), true)); BEAST_EXPECT(checkCounting(network_.validator(0), true));
}); });
} }
@@ -1244,7 +1254,7 @@ protected:
doTest("Selected Peer Stops Relaying", log, [this](bool log) { doTest("Selected Peer Stops Relaying", log, [this](bool log) {
ManualClock::advance(seconds(601)); ManualClock::advance(seconds(601));
BEAST_EXPECT(propagateAndSquelch(log, true, false)); BEAST_EXPECT(propagateAndSquelch(log, true, false));
ManualClock::advance(squelch::IDLED + seconds(1)); ManualClock::advance(reduce_relay::IDLED + seconds(1));
std::uint16_t unsquelched = 0; std::uint16_t unsquelched = 0;
network_.overlay().deleteIdlePeers( network_.overlay().deleteIdlePeers(
[&](PublicKey const& key, PeerWPtr const& peer) { [&](PublicKey const& key, PeerWPtr const& peer) {
@@ -1252,7 +1262,7 @@ protected:
}); });
auto peers = network_.overlay().getPeers(network_.validator(0)); auto peers = network_.overlay().getPeers(network_.validator(0));
BEAST_EXPECT( BEAST_EXPECT(
unsquelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); unsquelched == MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS);
BEAST_EXPECT(checkCounting(network_.validator(0), true)); BEAST_EXPECT(checkCounting(network_.validator(0), true));
}); });
} }
@@ -1267,8 +1277,8 @@ protected:
BEAST_EXPECT(propagateAndSquelch(log, true, false)); BEAST_EXPECT(propagateAndSquelch(log, true, false));
auto peers = network_.overlay().getPeers(network_.validator(0)); auto peers = network_.overlay().getPeers(network_.validator(0));
auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) { auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) {
return std::get<squelch::PeerState>(it.second) == return std::get<reduce_relay::PeerState>(it.second) ==
squelch::PeerState::Squelched; reduce_relay::PeerState::Squelched;
}); });
assert(it != peers.end()); assert(it != peers.end());
std::uint16_t unsquelched = 0; std::uint16_t unsquelched = 0;
@@ -1289,37 +1299,37 @@ protected:
std::string toLoad(R"rippleConfig( std::string toLoad(R"rippleConfig(
[reduce_relay] [reduce_relay]
enable=1 vp_enable=1
squelch=1 vp_squelch=1
)rippleConfig"); )rippleConfig");
c.loadFromString(toLoad); c.loadFromString(toLoad);
BEAST_EXPECT(c.REDUCE_RELAY_ENABLE == true); BEAST_EXPECT(c.VP_REDUCE_RELAY_ENABLE == true);
BEAST_EXPECT(c.REDUCE_RELAY_SQUELCH == true); BEAST_EXPECT(c.VP_REDUCE_RELAY_SQUELCH == true);
Config c1; Config c1;
toLoad = (R"rippleConfig( toLoad = (R"rippleConfig(
[reduce_relay] [reduce_relay]
enable=0 vp_enable=0
squelch=0 vp_squelch=0
)rippleConfig"); )rippleConfig");
c1.loadFromString(toLoad); c1.loadFromString(toLoad);
BEAST_EXPECT(c1.REDUCE_RELAY_ENABLE == false); BEAST_EXPECT(c1.VP_REDUCE_RELAY_ENABLE == false);
BEAST_EXPECT(c1.REDUCE_RELAY_SQUELCH == false); BEAST_EXPECT(c1.VP_REDUCE_RELAY_SQUELCH == false);
Config c2; Config c2;
toLoad = R"rippleConfig( toLoad = R"rippleConfig(
[reduce_relay] [reduce_relay]
enabled=1 vp_enabled=1
squelched=1 vp_squelched=1
)rippleConfig"; )rippleConfig";
c2.loadFromString(toLoad); c2.loadFromString(toLoad);
BEAST_EXPECT(c2.REDUCE_RELAY_ENABLE == false); BEAST_EXPECT(c2.VP_REDUCE_RELAY_ENABLE == false);
BEAST_EXPECT(c2.REDUCE_RELAY_SQUELCH == false); BEAST_EXPECT(c2.VP_REDUCE_RELAY_SQUELCH == false);
}); });
} }
@@ -1354,7 +1364,7 @@ squelched=1
peers = network_.overlay().getPeers(network_.validator(0)); peers = network_.overlay().getPeers(network_.validator(0));
BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1)); BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
// advance the clock // advance the clock
ManualClock::advance(squelch::IDLED + seconds(1)); ManualClock::advance(reduce_relay::IDLED + seconds(1));
network_.overlay().updateSlotAndSquelch( network_.overlay().updateSlotAndSquelch(
key, key,
network_.validator(0), network_.validator(0),
@@ -1366,6 +1376,166 @@ squelched=1
}); });
} }
struct Handler : public reduce_relay::SquelchHandler
{
Handler() : maxDuration_(0)
{
}
void
squelch(PublicKey const&, Peer::id_t, std::uint32_t duration)
const override
{
if (duration > maxDuration_)
maxDuration_ = duration;
}
void
unsquelch(PublicKey const&, Peer::id_t) const override
{
}
mutable int maxDuration_;
};
void
testRandomSquelch(bool l)
{
doTest("Random Squelch", l, [&](bool l) {
PublicKey validator = std::get<0>(randomKeyPair(KeyType::ed25519));
Handler handler;
auto run = [&](int npeers) {
handler.maxDuration_ = 0;
reduce_relay::Slots<ManualClock> slots(env_.app(), handler);
// 1st message from a new peer switches the slot
// to counting state and resets the counts of all peers +
// MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold
// and switch the slot's state to peer selection.
for (int m = 1; m <= reduce_relay::MAX_MESSAGE_THRESHOLD + 2;
m++)
{
for (int peer = 0; peer < npeers; peer++)
{
// make unique message hash to make the
// slot's internal hash router accept the message
std::uint64_t mid = m * 1000 + peer;
uint256 const message{mid};
slots.updateSlotAndSquelch(
message,
validator,
peer,
protocol::MessageType::mtVALIDATION);
}
}
// make Slot's internal hash router expire all messages
ManualClock::advance(hours(1));
};
using namespace reduce_relay;
// expect max duration less than MAX_UNSQUELCH_EXPIRE_DEFAULT with
// less than or equal to 60 peers
run(20);
BEAST_EXPECT(
handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
run(60);
BEAST_EXPECT(
handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
// expect max duration greater than MIN_UNSQUELCH_EXPIRE and less
// than MAX_UNSQUELCH_EXPIRE_PEERS with peers greater than 60
// and less than 360
run(350);
// can't make this condition stronger. squelch
// duration is probabilistic and max condition may still fail.
// log when the value is low
BEAST_EXPECT(
handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
using namespace beast::unit_test::detail;
if (handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
log << make_reason(
"warning: squelch duration is low",
__FILE__,
__LINE__)
<< std::endl
<< std::flush;
// more than 400 is still less than MAX_UNSQUELCH_EXPIRE_PEERS
run(400);
BEAST_EXPECT(
handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
if (handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
log << make_reason(
"warning: squelch duration is low",
__FILE__,
__LINE__)
<< std::endl
<< std::flush;
});
}
void
testHandshake(bool log)
{
doTest("Handshake", log, [&](bool log) {
auto setEnv = [&](bool enable) {
Config c;
std::stringstream str;
str << "[reduce_relay]\n"
<< "vp_enable=" << enable << "\n"
<< "vp_squelch=" << enable << "\n"
<< "[compression]\n"
<< "1\n";
c.loadFromString(str.str());
env_.app().config().VP_REDUCE_RELAY_ENABLE =
c.VP_REDUCE_RELAY_ENABLE;
env_.app().config().VP_REDUCE_RELAY_SQUELCH =
c.VP_REDUCE_RELAY_SQUELCH;
env_.app().config().COMPRESSION = c.COMPRESSION;
};
auto handshake = [&](int outboundEnable, int inboundEnable) {
beast::IP::Address addr =
boost::asio::ip::address::from_string("172.1.1.100");
setEnv(outboundEnable);
auto request = ripple::makeRequest(
true,
env_.app().config().COMPRESSION,
env_.app().config().VP_REDUCE_RELAY_ENABLE);
http_request_type http_request;
http_request.version(request.version());
http_request.base() = request.base();
// feature enabled on the peer's connection only if both sides
// are enabled
auto const peerEnabled = inboundEnable && outboundEnable;
// inbound is enabled if the request's header has the feature
// enabled and the peer's configuration is enabled
auto const inboundEnabled = peerFeatureEnabled(
http_request, FEATURE_VPRR, inboundEnable);
BEAST_EXPECT(!(peerEnabled ^ inboundEnabled));
setEnv(inboundEnable);
auto http_resp = ripple::makeResponse(
true,
http_request,
addr,
addr,
uint256{1},
1,
{1, 0},
env_.app());
// outbound is enabled if the response's header has the feature
// enabled and the peer's configuration is enabled
auto const outboundEnabled =
peerFeatureEnabled(http_resp, FEATURE_VPRR, outboundEnable);
BEAST_EXPECT(!(peerEnabled ^ outboundEnabled));
};
handshake(1, 1);
handshake(1, 0);
handshake(0, 1);
handshake(0, 0);
});
}
jtx::Env env_; jtx::Env env_;
Network network_; Network network_;
@@ -1387,6 +1557,8 @@ public:
testSelectedPeerDisconnects(log); testSelectedPeerDisconnects(log);
testSelectedPeerStopsRelaying(log); testSelectedPeerStopsRelaying(log);
testInternalHashRouter(log); testInternalHashRouter(log);
testRandomSquelch(log);
testHandshake(log);
} }
}; };