diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 82b27ee18..4d0bfb2de 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -859,6 +859,7 @@ target_sources (rippled PRIVATE src/test/overlay/short_read_test.cpp src/test/overlay/compression_test.cpp src/test/overlay/reduce_relay_test.cpp + src/test/overlay/handshake_test.cpp #[===============================[ test sources: subdir: peerfinder diff --git a/bin/ci/ubuntu/build-and-test.sh b/bin/ci/ubuntu/build-and-test.sh index 92d8bc10e..7ae75f2b1 100755 --- a/bin/ci/ubuntu/build-and-test.sh +++ b/bin/ci/ubuntu/build-and-test.sh @@ -168,6 +168,7 @@ else 'ripple.tx.OversizeMeta' 'ripple.consensus.DistributedValidators' 'ripple.app.NoRippleCheckLimits' + 'ripple.ripple_data.compression' 'ripple.NodeStore.Timing' 'ripple.consensus.ByzantineFailureSim' 'beast.chrono.abstract_clock' diff --git a/src/ripple/beast/rfc2616.h b/src/ripple/beast/rfc2616.h index deac23550..5aff5526a 100644 --- a/src/ripple/beast/rfc2616.h +++ b/src/ripple/beast/rfc2616.h @@ -37,6 +37,19 @@ namespace rfc2616 { namespace detail { +struct ci_equal_pred +{ + explicit ci_equal_pred() = default; + + bool + operator()(char c1, char c2) + { + // VFALCO TODO Use a table lookup here + return std::tolower(static_cast(c1)) == + std::tolower(static_cast(c2)); + } +}; + /** Returns `true` if `c` is linear white space. This excludes the CRLF sequence allowed for line continuations. @@ -195,6 +208,179 @@ split_commas(boost::beast::string_view const& s) return split_commas(s.begin(), s.end()); } +//------------------------------------------------------------------------------ + +/** Iterates through a comma separated list. + + Meets the requirements of ForwardIterator. + + List defined in rfc2616 2.1. + + @note Values returned may contain backslash escapes. +*/ +class list_iterator +{ + using iter_type = boost::string_ref::const_iterator; + + iter_type it_; + iter_type end_; + boost::string_ref value_; + +public: + using value_type = boost::string_ref; + using pointer = value_type const*; + using reference = value_type const&; + using difference_type = std::ptrdiff_t; + using iterator_category = std::forward_iterator_tag; + + list_iterator(iter_type begin, iter_type end) : it_(begin), end_(end) + { + if (it_ != end_) + increment(); + } + + bool + operator==(list_iterator const& other) const + { + return other.it_ == it_ && other.end_ == end_ && + other.value_.size() == value_.size(); + } + + bool + operator!=(list_iterator const& other) const + { + return !(*this == other); + } + + reference + operator*() const + { + return value_; + } + + pointer + operator->() const + { + return &*(*this); + } + + list_iterator& + operator++() + { + increment(); + return *this; + } + + list_iterator + operator++(int) + { + auto temp = *this; + ++(*this); + return temp; + } + +private: + template + void + increment(); +}; + +template +void +list_iterator::increment() +{ + using namespace detail; + value_.clear(); + while (it_ != end_) + { + if (*it_ == '"') + { + // quoted-string + ++it_; + if (it_ == end_) + return; + if (*it_ != '"') + { + auto start = it_; + for (;;) + { + ++it_; + if (it_ == end_) + { + value_ = boost::string_ref( + &*start, std::distance(start, it_)); + return; + } + if (*it_ == '"') + { + value_ = boost::string_ref( + &*start, std::distance(start, it_)); + ++it_; + return; + } + } + } + ++it_; + } + else if (*it_ == ',') + { + it_++; + continue; + } + else if (is_lws(*it_)) + { + ++it_; + continue; + } + else + { + auto start = it_; + for (;;) + { + ++it_; + if (it_ == end_ || *it_ == ',' || is_lws(*it_)) + { + value_ = + boost::string_ref(&*start, std::distance(start, it_)); + return; + } + } + } + } +} +/** Returns true if two strings are equal. + + A case-insensitive comparison is used. +*/ +inline bool +ci_equal(boost::string_ref s1, boost::string_ref s2) +{ + return boost::range::equal(s1, s2, detail::ci_equal_pred{}); +} + +/** Returns a range representing the list. */ +inline boost::iterator_range +make_list(boost::string_ref const& field) +{ + return boost::iterator_range{ + list_iterator{field.begin(), field.end()}, + list_iterator{field.end(), field.end()}}; +} + +/** Returns true if the specified token exists in the list. + + A case-insensitive comparison is used. +*/ +template +bool +token_in_list(boost::string_ref const& value, boost::string_ref const& token) +{ + for (auto const& item : make_list(value)) + if (ci_equal(item, token)) + return true; + return false; +} + template bool is_keep_alive(boost::beast::http::message const& m) diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index 08582d4f2..5d948ef0b 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -191,10 +191,18 @@ public: std::size_t WORKERS = 0; // Reduce-relay - these parameters are experimental. - // Enable reduce-relay functionality - bool REDUCE_RELAY_ENABLE = false; - // Send squelch message to peers - bool REDUCE_RELAY_SQUELCH = false; + // Enable reduce-relay features + // Validation/proposal reduce-relay feature + bool VP_REDUCE_RELAY_ENABLE = false; + // Send squelch message to peers. Generally this config should + // have the same value as VP_REDUCE_RELAY_ENABLE. It can be + // used for testing the feature's function without + // affecting the message relaying. To use it for testing, + // set it to false and set VP_REDUCE_RELAY_ENABLE to true. + // Squelch messages will not be sent to the peers in this case. + // Set log level to debug so that the feature function can be + // analyzed. + bool VP_REDUCE_RELAY_SQUELCH = false; // These override the command line client settings boost::optional rpc_ip; diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index f69c3e441..9275fabb1 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -523,8 +523,8 @@ Config::loadFromString(std::string const& fileContents) if (exists(SECTION_REDUCE_RELAY)) { auto sec = section(SECTION_REDUCE_RELAY); - REDUCE_RELAY_ENABLE = sec.value_or("enable", false); - REDUCE_RELAY_SQUELCH = sec.value_or("squelch", false); + VP_REDUCE_RELAY_ENABLE = sec.value_or("vp_enable", false); + VP_REDUCE_RELAY_SQUELCH = sec.value_or("vp_squelch", false); } if (getSingleSection(secConfig, SECTION_MAX_TRANSACTIONS, strTemp, j_)) diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index cf37cf72b..a9c52dd77 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -75,7 +75,7 @@ public: beast::IP::Address public_ip; int ipLimit = 0; std::uint32_t crawlOptions = 0; - boost::optional networkID; + std::optional networkID; bool vlEnabled = true; }; diff --git a/src/ripple/overlay/SquelchCommon.h b/src/ripple/overlay/ReduceRelayCommon.h similarity index 66% rename from src/ripple/overlay/SquelchCommon.h rename to src/ripple/overlay/ReduceRelayCommon.h index dea68a72d..5e7f585d8 100644 --- a/src/ripple/overlay/SquelchCommon.h +++ b/src/ripple/overlay/ReduceRelayCommon.h @@ -17,22 +17,26 @@ */ //============================================================================== -#ifndef RIPPLE_OVERLAY_SQUELCHCOMMON_H_INCLUDED -#define RIPPLE_OVERLAY_SQUELCHCOMMON_H_INCLUDED +#ifndef RIPPLE_OVERLAY_REDUCERELAYCOMMON_H_INCLUDED +#define RIPPLE_OVERLAY_REDUCERELAYCOMMON_H_INCLUDED + #include namespace ripple { -namespace squelch { - -using namespace std::chrono; +namespace reduce_relay { // Peer's squelch is limited in time to -// rand{MIN_UNSQUELCH_EXPIRE, MAX_UNSQUELCH_EXPIRE} -static constexpr seconds MIN_UNSQUELCH_EXPIRE = seconds{300}; -static constexpr seconds MAX_UNSQUELCH_EXPIRE = seconds{600}; +// rand{MIN_UNSQUELCH_EXPIRE, max_squelch}, +// where max_squelch is +// min(max(MAX_UNSQUELCH_EXPIRE_DEFAULT, SQUELCH_PER_PEER * number_of_peers), +// MAX_UNSQUELCH_EXPIRE_PEERS) +static constexpr auto MIN_UNSQUELCH_EXPIRE = std::chrono::seconds{300}; +static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT = std::chrono::seconds{600}; +static constexpr auto SQUELCH_PER_PEER = std::chrono::seconds(10); +static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS = std::chrono::seconds{3600}; // No message received threshold before identifying a peer as idled -static constexpr seconds IDLED = seconds{8}; +static constexpr auto IDLED = std::chrono::seconds{8}; // Message count threshold to start selecting peers as the source // of messages from the validator. We add peers who reach // MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS @@ -40,13 +44,13 @@ static constexpr seconds IDLED = seconds{8}; static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 9; static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 10; // Max selected peers to choose as the source of messages from validator -static constexpr uint16_t MAX_SELECTED_PEERS = 3; +static constexpr uint16_t MAX_SELECTED_PEERS = 5; // Wait before reduce-relay feature is enabled on boot up to let // the server establish peer connections -static constexpr minutes WAIT_ON_BOOTUP = minutes{10}; +static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10}; -} // namespace squelch +} // namespace reduce_relay } // namespace ripple -#endif // RIPPLED_SQUELCHCOMMON_H +#endif // RIPPLED_REDUCERELAYCOMMON_H_INCLUDED \ No newline at end of file diff --git a/src/ripple/overlay/Slot.h b/src/ripple/overlay/Slot.h index 57d1c137a..dbb48a921 100644 --- a/src/ripple/overlay/Slot.h +++ b/src/ripple/overlay/Slot.h @@ -25,8 +25,8 @@ #include #include #include +#include #include -#include #include #include @@ -40,7 +40,7 @@ namespace ripple { -namespace squelch { +namespace reduce_relay { template class Slots; @@ -61,7 +61,7 @@ template Unit epoch(TP const& t) { - return duration_cast(t.time_since_epoch()); + return std::chrono::duration_cast(t.time_since_epoch()); } /** Abstract class. Declares squelch and unsquelch handlers. @@ -124,10 +124,10 @@ private: /** Update peer info. If the message is from a new * peer or from a previously expired squelched peer then switch * the peer's and slot's state to Counting. If time of last - * selection round is > 2 * MAX_UNSQUELCH_EXPIRE then switch the slot's - * state to Counting. If the number of messages for the peer - * is > MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. - * If the number of considered peers who reached MAX_MESSAGE_THRESHOLD is + * selection round is > 2 * MAX_UNSQUELCH_EXPIRE_DEFAULT then switch the + * slot's state to Counting. If the number of messages for the peer is > + * MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the + * number of considered peers who reached MAX_MESSAGE_THRESHOLD is * MAX_SELECTED_PEERS then randomly select MAX_SELECTED_PEERS from * considered peers, and call squelch handler for each peer, which is not * selected and not already in Squelched state. Set the state for those @@ -197,6 +197,14 @@ private: void deleteIdlePeer(PublicKey const& validator); + /** Get random squelch duration between MIN_UNSQUELCH_EXPIRE and + * min(max(MAX_UNSQUELCH_EXPIRE_DEFAULT, SQUELCH_PER_PEER * npeers), + * MAX_UNSQUELCH_EXPIRE_PEERS) + * @param npeers number of peers that can be squelched in the Slot + */ + std::chrono::seconds + getSquelchDuration(std::size_t npeers); + private: /** Reset counts of peers in Selected or Counting state */ void @@ -231,6 +239,7 @@ template void Slot::deleteIdlePeer(PublicKey const& validator) { + using namespace std::chrono; auto now = clock_type::now(); for (auto it = peers_.begin(); it != peers_.end();) { @@ -239,7 +248,7 @@ Slot::deleteIdlePeer(PublicKey const& validator) ++it; if (now - peer.lastMessage > IDLED) { - JLOG(journal_.debug()) + JLOG(journal_.trace()) << "deleteIdlePeer: " << Slice(validator) << " " << id << " idled " << duration_cast(now - peer.lastMessage).count() @@ -256,12 +265,13 @@ Slot::update( id_t id, protocol::MessageType type) { + using namespace std::chrono; auto now = clock_type::now(); auto it = peers_.find(id); // First message from this peer if (it == peers_.end()) { - JLOG(journal_.debug()) + JLOG(journal_.trace()) << "update: adding peer " << Slice(validator) << " " << id; peers_.emplace( std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now})); @@ -271,7 +281,7 @@ Slot::update( // Message from a peer with expired squelch if (it->second.state == PeerState::Squelched && now > it->second.expire) { - JLOG(journal_.debug()) + JLOG(journal_.trace()) << "update: squelch expired " << Slice(validator) << " " << id; it->second.state = PeerState::Counting; it->second.lastMessage = now; @@ -281,7 +291,7 @@ Slot::update( auto& peer = it->second; - JLOG(journal_.debug()) + JLOG(journal_.trace()) << "update: existing peer " << Slice(validator) << " " << id << " slot state " << static_cast(state_) << " peer state " << static_cast(peer.state) << " count " << peer.count << " last " @@ -299,9 +309,9 @@ Slot::update( if (peer.count == (MAX_MESSAGE_THRESHOLD + 1)) ++reachedThreshold_; - if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE) + if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE_DEFAULT) { - JLOG(journal_.debug()) + JLOG(journal_.trace()) << "update: resetting due to inactivity " << Slice(validator) << " " << id << " " << duration_cast(now - lastSelected_).count(); initCounting(); @@ -338,7 +348,7 @@ Slot::update( if (selected.size() != MAX_SELECTED_PEERS) { - JLOG(journal_.debug()) + JLOG(journal_.trace()) << "update: selection failed " << Slice(validator) << " " << id; initCounting(); return; @@ -347,11 +357,13 @@ Slot::update( lastSelected_ = now; auto s = selected.begin(); - JLOG(journal_.debug()) + JLOG(journal_.trace()) << "update: " << Slice(validator) << " " << id << " pool size " << consideredPoolSize << " selected " << *s << " " << *std::next(s, 1) << " " << *std::next(s, 2); + assert(peers_.size() >= MAX_SELECTED_PEERS); + // squelch peers which are not selected and // not already squelched std::stringstream str; @@ -363,18 +375,16 @@ Slot::update( v.state = PeerState::Selected; else if (v.state != PeerState::Squelched) { - if (journal_.debug()) + if (journal_.trace()) str << k << " "; v.state = PeerState::Squelched; - auto duration = Squelch::getSquelchDuration(); + std::chrono::seconds duration = + getSquelchDuration(peers_.size() - MAX_SELECTED_PEERS); v.expire = now + duration; - handler_.squelch( - validator, - k, - duration_cast(duration).count()); + handler_.squelch(validator, k, duration.count()); } } - JLOG(journal_.debug()) << "update: squelching " << Slice(validator) + JLOG(journal_.trace()) << "update: squelching " << Slice(validator) << " " << id << " " << str.str(); considered_.clear(); reachedThreshold_ = 0; @@ -382,6 +392,22 @@ Slot::update( } } +template +std::chrono::seconds +Slot::getSquelchDuration(std::size_t npeers) +{ + using namespace std::chrono; + auto m = std::max( + MAX_UNSQUELCH_EXPIRE_DEFAULT, seconds{SQUELCH_PER_PEER * npeers}); + if (m > MAX_UNSQUELCH_EXPIRE_PEERS) + { + m = MAX_UNSQUELCH_EXPIRE_PEERS; + JLOG(journal_.warn()) + << "getSquelchDuration: unexpected squelch duration " << npeers; + } + return seconds{ripple::rand_int(MIN_UNSQUELCH_EXPIRE / 1s, m / 1s)}; +} + template void Slot::deletePeer(PublicKey const& validator, id_t id, bool erase) @@ -389,7 +415,7 @@ Slot::deletePeer(PublicKey const& validator, id_t id, bool erase) auto it = peers_.find(id); if (it != peers_.end()) { - JLOG(journal_.debug()) + JLOG(journal_.trace()) << "deletePeer: " << Slice(validator) << " " << id << " selected " << (it->second.state == PeerState::Selected) << " considered " << (considered_.find(id) != considered_.end()) << " erase " @@ -486,6 +512,7 @@ std::unordered_map< std::tuple> Slot::getPeers() const { + using namespace std::chrono; auto init = std::unordered_map< id_t, std::tuple>(); @@ -531,7 +558,6 @@ public: * @param key Message's hash * @param validator Validator's public key * @param id Peer's id which received the message - * @param id Peer's pointer which received the message * @param type Received protocol message type */ void @@ -643,7 +669,7 @@ template bool Slots::addPeerMessage(uint256 const& key, id_t id) { - beast::expire(peersWithMessage_, squelch::IDLED); + beast::expire(peersWithMessage_, reduce_relay::IDLED); if (key.isNonZero()) { @@ -686,7 +712,7 @@ Slots::updateSlotAndSquelch( auto it = slots_.find(validator); if (it == slots_.end()) { - JLOG(journal_.debug()) + JLOG(journal_.trace()) << "updateSlotAndSquelch: new slot " << Slice(validator); auto it = slots_ .emplace(std::make_pair( @@ -716,9 +742,9 @@ Slots::deleteIdlePeers() for (auto it = slots_.begin(); it != slots_.end();) { it->second.deleteIdlePeer(it->first); - if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE) + if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT) { - JLOG(journal_.debug()) + JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot " << Slice(it->first); it = slots_.erase(it); } @@ -727,7 +753,7 @@ Slots::deleteIdlePeers() } } -} // namespace squelch +} // namespace reduce_relay } // namespace ripple diff --git a/src/ripple/overlay/Squelch.h b/src/ripple/overlay/Squelch.h index ab2fa65fb..175c80a0a 100644 --- a/src/ripple/overlay/Squelch.h +++ b/src/ripple/overlay/Squelch.h @@ -21,15 +21,17 @@ #define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED #include -#include +#include +#include #include +#include #include #include namespace ripple { -namespace squelch { +namespace reduce_relay { /** Maintains squelching of relaying messages from validators */ template @@ -38,84 +40,89 @@ class Squelch using time_point = typename clock_type::time_point; public: - Squelch() = default; + explicit Squelch(beast::Journal journal) : journal_(journal) + { + } virtual ~Squelch() = default; - /** Squelch/Unsquelch relaying for the validator + /** Squelch validation/proposal relaying for the validator * @param validator The validator's public key - * @param squelch Squelch/unsquelch flag - * @param squelchDuration Squelch duration time if squelch is true - */ - void - squelch(PublicKey const& validator, bool squelch, uint64_t squelchDuration); - - /** Are the messages to this validator squelched - * @param validator Validator's public key - * @return true if squelched + * @param squelchDuration Squelch duration in seconds + * @return false if invalid squelch duration */ bool - isSquelched(PublicKey const& validator); + addSquelch( + PublicKey const& validator, + std::chrono::seconds const& squelchDuration); - /** Get random squelch duration between MIN_UNSQUELCH_EXPIRE and - * MAX_UNSQUELCH_EXPIRE */ - static seconds - getSquelchDuration(); + /** Remove the squelch + * @param validator The validator's public key + */ + void + removeSquelch(PublicKey const& validator); + + /** Remove expired squelch + * @param validator Validator's public key + * @return true if removed or doesn't exist, false if still active + */ + bool + expireSquelch(PublicKey const& validator); private: /** Maintains the list of squelched relaying to downstream peers. * Expiration time is included in the TMSquelch message. */ hash_map squelched_; + beast::Journal const journal_; }; -template -void -Squelch::squelch( - PublicKey const& validator, - bool squelch, - uint64_t squelchDuration) -{ - if (squelch) - { - squelched_[validator] = [squelchDuration]() { - seconds duration = seconds(squelchDuration); - return clock_type::now() + - ((duration >= MIN_UNSQUELCH_EXPIRE && - duration <= MAX_UNSQUELCH_EXPIRE) - ? duration - : getSquelchDuration()); - }(); - } - else - squelched_.erase(validator); -} - template bool -Squelch::isSquelched(PublicKey const& validator) +Squelch::addSquelch( + PublicKey const& validator, + std::chrono::seconds const& squelchDuration) { - auto now = clock_type::now(); - - auto const& it = squelched_.find(validator); - if (it == squelched_.end()) - return false; - else if (it->second > now) + if (squelchDuration >= MIN_UNSQUELCH_EXPIRE && + squelchDuration <= MAX_UNSQUELCH_EXPIRE_PEERS) + { + squelched_[validator] = clock_type::now() + squelchDuration; return true; + } - squelched_.erase(it); + JLOG(journal_.error()) << "squelch: invalid squelch duration " + << squelchDuration.count(); + + // unsquelch if invalid duration + removeSquelch(validator); return false; } template -seconds -Squelch::getSquelchDuration() +void +Squelch::removeSquelch(PublicKey const& validator) { - auto d = seconds(ripple::rand_int( - MIN_UNSQUELCH_EXPIRE.count(), MAX_UNSQUELCH_EXPIRE.count())); - return d; + squelched_.erase(validator); } -} // namespace squelch +template +bool +Squelch::expireSquelch(PublicKey const& validator) +{ + auto now = clock_type::now(); + + auto const& it = squelched_.find(validator); + if (it == squelched_.end()) + return true; + else if (it->second > now) + return false; + + // squelch expired + squelched_.erase(it); + + return true; +} + +} // namespace reduce_relay } // namespace ripple diff --git a/src/ripple/overlay/impl/ConnectAttempt.cpp b/src/ripple/overlay/impl/ConnectAttempt.cpp index f980cdb12..62b8e3733 100644 --- a/src/ripple/overlay/impl/ConnectAttempt.cpp +++ b/src/ripple/overlay/impl/ConnectAttempt.cpp @@ -202,7 +202,9 @@ ConnectAttempt::onHandshake(error_code ec) return close(); // makeSharedValue logs req_ = makeRequest( - !overlay_.peerFinder().config().peerPrivate, app_.config().COMPRESSION); + !overlay_.peerFinder().config().peerPrivate, + app_.config().COMPRESSION, + app_.config().VP_REDUCE_RELAY_ENABLE); buildHandshake( req_, @@ -281,23 +283,6 @@ ConnectAttempt::onShutdown(error_code ec) //-------------------------------------------------------------------------- -auto -ConnectAttempt::makeRequest(bool crawl, bool compressionEnabled) -> request_type -{ - request_type m; - m.method(boost::beast::http::verb::get); - m.target("/"); - m.version(11); - m.insert("User-Agent", BuildInfo::getFullVersionString()); - m.insert("Upgrade", supportedProtocolVersions()); - m.insert("Connection", "Upgrade"); - m.insert("Connect-As", "Peer"); - m.insert("Crawl", crawl ? "public" : "private"); - if (compressionEnabled) - m.insert("X-Offer-Compression", "lz4"); - return m; -} - void ConnectAttempt::processResponse() { diff --git a/src/ripple/overlay/impl/ConnectAttempt.h b/src/ripple/overlay/impl/ConnectAttempt.h index 92c31f28e..65eac6809 100644 --- a/src/ripple/overlay/impl/ConnectAttempt.h +++ b/src/ripple/overlay/impl/ConnectAttempt.h @@ -104,10 +104,6 @@ private: onRead(error_code ec); void onShutdown(error_code ec); - - static request_type - makeRequest(bool crawl, bool compressionEnabled); - void processResponse(); diff --git a/src/ripple/overlay/impl/Handshake.cpp b/src/ripple/overlay/impl/Handshake.cpp index d2f6f966d..e43b7199c 100644 --- a/src/ripple/overlay/impl/Handshake.cpp +++ b/src/ripple/overlay/impl/Handshake.cpp @@ -34,6 +34,67 @@ namespace ripple { +std::optional +getFeatureValue( + boost::beast::http::fields const& headers, + std::string const& feature) +{ + auto const header = headers.find("X-Protocol-Ctl"); + if (header == headers.end()) + return {}; + boost::smatch match; + boost::regex rx(feature + "=([^;\\s]+)"); + auto const value = header->value().to_string(); + if (boost::regex_search(value, match, rx)) + return {match[1]}; + return {}; +} + +bool +isFeatureValue( + boost::beast::http::fields const& headers, + std::string const& feature, + std::string const& value) +{ + if (auto const fvalue = getFeatureValue(headers, feature)) + return beast::rfc2616::token_in_list(fvalue.value(), value); + + return false; +} + +bool +featureEnabled( + boost::beast::http::fields const& headers, + std::string const& feature) +{ + return isFeatureValue(headers, feature, "1"); +} + +std::string +makeFeaturesRequestHeader(bool comprEnabled, bool vpReduceRelayEnabled) +{ + std::stringstream str; + if (comprEnabled) + str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE; + if (vpReduceRelayEnabled) + str << FEATURE_VPRR << "=1"; + return str.str(); +} + +std::string +makeFeaturesResponseHeader( + http_request_type const& headers, + bool comprEnabled, + bool vpReduceRelayEnabled) +{ + std::stringstream str; + if (comprEnabled && isFeatureValue(headers, FEATURE_COMPR, "lz4")) + str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE; + if (vpReduceRelayEnabled && featureEnabled(headers, FEATURE_VPRR)) + str << FEATURE_VPRR << "=1"; + return str.str(); +} + /** Hashes the latest finished message from an SSL stream. @param ssl the session to get the message from. @@ -48,7 +109,7 @@ namespace ripple { this topic, see https://github.com/openssl/openssl/issues/5509 and https://github.com/ripple/rippled/issues/2413. */ -static boost::optional> +static std::optional> hashLastMessage(SSL const* ssl, size_t (*get)(const SSL*, void*, size_t)) { constexpr std::size_t sslMinimumFinishedLength = 12; @@ -57,7 +118,7 @@ hashLastMessage(SSL const* ssl, size_t (*get)(const SSL*, void*, size_t)) size_t len = get(ssl, buf, sizeof(buf)); if (len < sslMinimumFinishedLength) - return boost::none; + return std::nullopt; sha512_hasher h; @@ -66,14 +127,14 @@ hashLastMessage(SSL const* ssl, size_t (*get)(const SSL*, void*, size_t)) return cookie; } -boost::optional +std::optional makeSharedValue(stream_type& ssl, beast::Journal journal) { auto const cookie1 = hashLastMessage(ssl.native_handle(), SSL_get_finished); if (!cookie1) { JLOG(journal.error()) << "Cookie generation: local setup not complete"; - return boost::none; + return std::nullopt; } auto const cookie2 = @@ -81,7 +142,7 @@ makeSharedValue(stream_type& ssl, beast::Journal journal) if (!cookie2) { JLOG(journal.error()) << "Cookie generation: peer setup not complete"; - return boost::none; + return std::nullopt; } auto const result = (*cookie1 ^ *cookie2); @@ -92,7 +153,7 @@ makeSharedValue(stream_type& ssl, beast::Journal journal) { JLOG(journal.error()) << "Cookie generation: identical finished messages"; - return boost::none; + return std::nullopt; } return sha512Half(Slice(result.data(), result.size())); @@ -102,7 +163,7 @@ void buildHandshake( boost::beast::http::fields& h, ripple::uint256 const& sharedValue, - boost::optional networkID, + std::optional networkID, beast::IP::Address public_ip, beast::IP::Address remote_ip, Application& app) @@ -155,7 +216,7 @@ PublicKey verifyHandshake( boost::beast::http::fields const& headers, ripple::uint256 const& sharedValue, - boost::optional networkID, + std::optional networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application& app) @@ -291,4 +352,54 @@ verifyHandshake( return publicKey; } +auto +makeRequest(bool crawlPublic, bool comprEnabled, bool vpReduceRelayEnabled) + -> request_type +{ + request_type m; + m.method(boost::beast::http::verb::get); + m.target("/"); + m.version(11); + m.insert("User-Agent", BuildInfo::getFullVersionString()); + m.insert("Upgrade", supportedProtocolVersions()); + m.insert("Connection", "Upgrade"); + m.insert("Connect-As", "Peer"); + m.insert("Crawl", crawlPublic ? "public" : "private"); + m.insert( + "X-Protocol-Ctl", + makeFeaturesRequestHeader(comprEnabled, vpReduceRelayEnabled)); + return m; +} + +http_response_type +makeResponse( + bool crawlPublic, + http_request_type const& req, + beast::IP::Address public_ip, + beast::IP::Address remote_ip, + uint256 const& sharedValue, + std::optional networkID, + ProtocolVersion protocol, + Application& app) +{ + http_response_type resp; + resp.result(boost::beast::http::status::switching_protocols); + resp.version(req.version()); + resp.insert("Connection", "Upgrade"); + resp.insert("Upgrade", to_string(protocol)); + resp.insert("Connect-As", "Peer"); + resp.insert("Server", BuildInfo::getFullVersionString()); + resp.insert("Crawl", crawlPublic ? "public" : "private"); + resp.insert( + "X-Protocol-Ctl", + makeFeaturesResponseHeader( + req, + app.config().COMPRESSION, + app.config().VP_REDUCE_RELAY_ENABLE)); + + buildHandshake(resp, sharedValue, networkID, public_ip, remote_ip, app); + + return resp; +} + } // namespace ripple diff --git a/src/ripple/overlay/impl/Handshake.h b/src/ripple/overlay/impl/Handshake.h index 8a92a1fd9..a93c29008 100644 --- a/src/ripple/overlay/impl/Handshake.h +++ b/src/ripple/overlay/impl/Handshake.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -30,14 +31,22 @@ #include #include +#include +#include #include -#include +#include #include namespace ripple { using socket_type = boost::beast::tcp_stream; using stream_type = boost::beast::ssl_stream; +using request_type = + boost::beast::http::request; +using http_request_type = + boost::beast::http::request; +using http_response_type = + boost::beast::http::response; /** Computes a shared value based on the SSL connection state. @@ -48,7 +57,7 @@ using stream_type = boost::beast::ssl_stream; @param ssl the SSL/TLS connection state. @return A 256-bit value on success; an unseated optional otherwise. */ -boost::optional +std::optional makeSharedValue(stream_type& ssl, beast::Journal journal); /** Insert fields headers necessary for upgrading the link to the peer protocol. @@ -57,7 +66,7 @@ void buildHandshake( boost::beast::http::fields& h, uint256 const& sharedValue, - boost::optional networkID, + std::optional networkID, beast::IP::Address public_ip, beast::IP::Address remote_ip, Application& app); @@ -77,11 +86,144 @@ PublicKey verifyHandshake( boost::beast::http::fields const& headers, uint256 const& sharedValue, - boost::optional networkID, + std::optional networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application& app); +/** Make outbound http request + + @param crawlPublic if true then server's IP/Port are included in crawl + @param comprEnabled if true then compression feature is enabled + @param vpReduceRelayEnabled if true then reduce-relay feature is enabled + @return http request with empty body + */ +request_type +makeRequest(bool crawlPublic, bool comprEnabled, bool vpReduceRelayEnabled); + +/** Make http response + + @param crawlPublic if true then server's IP/Port are included in crawl + @param req incoming http request + @param public_ip server's public IP + @param remote_ip peer's IP + @param sharedValue shared value based on the SSL connection state + @param networkID specifies what network we intend to connect to + @param version supported protocol version + @param app Application's reference to access some common properties + @return http response + */ +http_response_type +makeResponse( + bool crawlPublic, + http_request_type const& req, + beast::IP::Address public_ip, + beast::IP::Address remote_ip, + uint256 const& sharedValue, + std::optional networkID, + ProtocolVersion version, + Application& app); + +// Protocol features negotiated via HTTP handshake. +// The format is: +// X-Protocol-Ctl: feature1=value1[,value2]*[\s*;\s*feature2=value1[,value2]*]* +// value: \S+ +static constexpr char FEATURE_COMPR[] = "compr"; // compression +static constexpr char FEATURE_VPRR[] = + "vprr"; // validation/proposal reduce-relay +static constexpr char DELIM_FEATURE[] = ";"; +static constexpr char DELIM_VALUE[] = ","; + +/** Get feature's header value + @param headers request/response header + @param feature name + @return seated optional with feature's value if the feature + is found in the header, unseated optional otherwise + */ +std::optional +getFeatureValue( + boost::beast::http::fields const& headers, + std::string const& feature); + +/** Check if a feature's value is equal to the specified value + @param headers request/response header + @param feature to check + @param value of the feature to check, must be a single value; i.e. not + value1,value2... + @return true if the feature's value matches the specified value, false if + doesn't match or the feature is not found in the header + */ +bool +isFeatureValue( + boost::beast::http::fields const& headers, + std::string const& feature, + std::string const& value); + +/** Check if a feature is enabled + @param headers request/response header + @param feature to check + @return true if enabled + */ +bool +featureEnabled( + boost::beast::http::fields const& headers, + std::string const& feature); + +/** Check if a feature should be enabled for a peer. The feature + is enabled if its configured value is true and the http header + has the specified feature value. + @tparam headers request (inbound) or response (outbound) header + @param request http headers + @param feature to check + @param config feature's configuration value + @param value feature's value to check in the headers + @return true if the feature is enabled + */ +template +bool +peerFeatureEnabled( + headers const& request, + std::string const& feature, + std::string value, + bool config) +{ + return config && isFeatureValue(request, feature, value); +} + +/** Wrapper for enable(1)/disable type(0) of feature */ +template +bool +peerFeatureEnabled( + headers const& request, + std::string const& feature, + bool config) +{ + return config && peerFeatureEnabled(request, feature, "1", config); +} + +/** Make request header X-Protocol-Ctl value with supported features + @param comprEnabled if true then compression feature is enabled + @param vpReduceRelayEnabled if true then reduce-relay feature is enabled + @return X-Protocol-Ctl header value + */ +std::string +makeFeaturesRequestHeader(bool comprEnabled, bool vpReduceRelayEnabled); + +/** Make response header X-Protocol-Ctl value with supported features. + If the request has a feature that we support enabled + and the feature's configuration is enabled then enable this feature in + the response header. + @param header request's header + @param comprEnabled if true then compression feature is enabled + @param vpReduceRelayEnabled if true then reduce-relay feature is enabled + @return X-Protocol-Ctl header value + */ +std::string +makeFeaturesResponseHeader( + http_request_type const& headers, + bool comprEnabled, + bool vpReduceRelayEnabled); + } // namespace ripple #endif diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 44973464a..7c36ea47c 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -1388,7 +1388,7 @@ std::shared_ptr makeSquelchMessage( PublicKey const& validator, bool squelch, - uint64_t squelchDuration) + uint32_t squelchDuration) { protocol::TMSquelch m; m.set_squelch(squelch); @@ -1402,12 +1402,11 @@ void OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const { if (auto peer = findPeerByShortID(id); - peer && app_.config().REDUCE_RELAY_SQUELCH) + peer && app_.config().VP_REDUCE_RELAY_SQUELCH) { // optimize - multiple message with different // validator might be sent to the same peer - auto m = makeSquelchMessage(validator, false, 0); - peer->send(m); + peer->send(makeSquelchMessage(validator, false, 0)); } } @@ -1418,10 +1417,9 @@ OverlayImpl::squelch( uint32_t squelchDuration) const { if (auto peer = findPeerByShortID(id); - peer && app_.config().REDUCE_RELAY_SQUELCH) + peer && app_.config().VP_REDUCE_RELAY_SQUELCH) { - auto m = makeSquelchMessage(validator, true, squelchDuration); - peer->send(m); + peer->send(makeSquelchMessage(validator, true, squelchDuration)); } } diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 65f5ffc52..ae04b388d 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -54,7 +54,7 @@ namespace ripple { class PeerImp; class BasicConfig; -class OverlayImpl : public Overlay, public squelch::SquelchHandler +class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler { public: class Child @@ -126,7 +126,7 @@ private: boost::optional networkID_; - squelch::Slots slots_; + reduce_relay::Slots slots_; // A message with the list of manifests we send to peers std::shared_ptr manifestMessage_; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 0ecc8967a..2d7f636f8 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -94,16 +94,30 @@ PeerImp::PeerImp( , publicKey_(publicKey) , lastPingTime_(clock_type::now()) , creationTime_(clock_type::now()) + , squelch_(app_.journal("Squelch")) , usage_(consumer) , fee_(Resource::feeLightPeer) , slot_(slot) , request_(std::move(request)) , headers_(request_) , compressionEnabled_( - headers_["X-Offer-Compression"] == "lz4" && app_.config().COMPRESSION + peerFeatureEnabled( + headers_, + FEATURE_COMPR, + "lz4", + app_.config().COMPRESSION) ? Compressed::On : Compressed::Off) + , vpReduceRelayEnabled_(peerFeatureEnabled( + headers_, + FEATURE_VPRR, + app_.config().VP_REDUCE_RELAY_ENABLE)) { + JLOG(journal_.debug()) << " compression enabled " + << (compressionEnabled_ == Compressed::On) + << " vp reduce-relay enabled " + << vpReduceRelayEnabled_ << " on " << remote_address_ + << " " << id_; } PeerImp::~PeerImp() @@ -223,7 +237,7 @@ PeerImp::send(std::shared_ptr const& m) return; auto validator = m->getValidatorKey(); - if (validator && squelch_.isSquelched(*validator)) + if (validator && !squelch_.expireSquelch(*validator)) return; overlay_.reportTraffic( @@ -739,36 +753,17 @@ PeerImp::doAccept() // XXX Set timer: connection idle (idle may vary depending on connection // type.) - auto write_buffer = [this, sharedValue]() { - auto buf = std::make_shared(); + auto write_buffer = std::make_shared(); - http_response_type resp; - resp.result(boost::beast::http::status::switching_protocols); - resp.version(request_.version()); - resp.insert("Connection", "Upgrade"); - resp.insert("Upgrade", to_string(protocol_)); - resp.insert("Connect-As", "Peer"); - resp.insert("Server", BuildInfo::getFullVersionString()); - resp.insert( - "Crawl", - overlay_.peerFinder().config().peerPrivate ? "private" : "public"); - - if (request_["X-Offer-Compression"] == "lz4" && - app_.config().COMPRESSION) - resp.insert("X-Offer-Compression", "lz4"); - - buildHandshake( - resp, - *sharedValue, - overlay_.setup().networkID, - overlay_.setup().public_ip, - remote_address_.address(), - app_); - - boost::beast::ostream(*buf) << resp; - - return buf; - }(); + boost::beast::ostream(*write_buffer) << makeResponse( + !overlay_.peerFinder().config().peerPrivate, + request_, + overlay_.setup().public_ip, + remote_address_.address(), + *sharedValue, + overlay_.setup().networkID, + protocol_, + app_); // Write the whole buffer and only start protocol when that's done. boost::asio::async_write( @@ -971,13 +966,17 @@ void PeerImp::onMessageBegin( std::uint16_t type, std::shared_ptr<::google::protobuf::Message> const& m, - std::size_t size) + std::size_t size, + std::size_t uncompressed_size, + bool isCompressed) { load_event_ = app_.getJobQueue().makeLoadEvent(jtPEER, protocolMessageName(type)); fee_ = Resource::feeLightPeer; overlay_.reportTraffic( TrafficCount::categorize(*m, type, true), true, static_cast(size)); + JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " " + << uncompressed_size << " " << isCompressed; } void @@ -1566,12 +1565,8 @@ PeerImp::onMessage(std::shared_ptr const& m) { // Count unique messages (Slots has it's own 'HashRouter'), which a peer // receives within IDLED seconds since the message has been relayed. - // Wait WAIT_ON_BOOTUP time to let the server establish connections to - // peers. - if (app_.config().REDUCE_RELAY_ENABLE && relayed && - (stopwatch().now() - *relayed) < squelch::IDLED && - squelch::epoch(UptimeClock::now()) > - squelch::WAIT_ON_BOOTUP) + if (reduceRelayReady() && relayed && + (stopwatch().now() - *relayed) < reduce_relay::IDLED) overlay_.updateSlotAndSquelch( suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER); JLOG(p_journal_.trace()) << "Proposal: duplicate"; @@ -2173,10 +2168,8 @@ PeerImp::onMessage(std::shared_ptr const& m) // peer receives within IDLED seconds since the message has been // relayed. Wait WAIT_ON_BOOTUP time to let the server establish // connections to peers. - if (app_.config().REDUCE_RELAY_ENABLE && (bool)relayed && - (stopwatch().now() - *relayed) < squelch::IDLED && - squelch::epoch(UptimeClock::now()) > - squelch::WAIT_ON_BOOTUP) + if (reduceRelayReady() && relayed && + (stopwatch().now() - *relayed) < reduce_relay::IDLED) overlay_.updateSlotAndSquelch( key, val->getSignerPublic(), id_, protocol::mtVALIDATION); JLOG(p_journal_.trace()) << "Validation: duplicate"; @@ -2358,6 +2351,14 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + using on_message_fn = + void (PeerImp::*)(std::shared_ptr const&); + if (!strand_.running_in_this_thread()) + return post( + strand_, + std::bind( + (on_message_fn)&PeerImp::onMessage, shared_from_this(), m)); + if (!m->has_validatorpubkey()) { charge(Resource::feeBadData); @@ -2371,9 +2372,6 @@ PeerImp::onMessage(std::shared_ptr const& m) return; } PublicKey key(slice); - auto squelch = m->squelch(); - auto duration = m->has_squelchduration() ? m->squelchduration() : 0; - auto sp = shared_from_this(); // Ignore the squelch for validator's own messages. if (key == app_.getValidationPublicKey()) @@ -2383,15 +2381,15 @@ PeerImp::onMessage(std::shared_ptr const& m) return; } - if (!strand_.running_in_this_thread()) - return post(strand_, [sp, key, squelch, duration]() { - sp->squelch_.squelch(key, squelch, duration); - }); + std::uint32_t duration = + m->has_squelchduration() ? m->squelchduration() : 0; + if (!m->squelch()) + squelch_.removeSquelch(key); + else if (!squelch_.addSquelch(key, std::chrono::seconds{duration})) + charge(Resource::feeBadData); JLOG(p_journal_.debug()) << "onMessage: TMSquelch " << slice << " " << id() << " " << duration; - - squelch_.squelch(key, squelch, duration); } //-------------------------------------------------------------------------- @@ -2555,9 +2553,7 @@ PeerImp::checkPropose( // as part of the squelch logic. auto haveMessage = app_.overlay().relay( *packet, peerPos.suppressionID(), peerPos.publicKey()); - if (app_.config().REDUCE_RELAY_ENABLE && !haveMessage.empty() && - squelch::epoch(UptimeClock::now()) > - squelch::WAIT_ON_BOOTUP) + if (reduceRelayReady() && !haveMessage.empty()) overlay_.updateSlotAndSquelch( peerPos.suppressionID(), peerPos.publicKey(), @@ -2592,9 +2588,7 @@ PeerImp::checkValidation( // as part of the squelch logic. auto haveMessage = overlay_.relay(*packet, suppression, val->getSignerPublic()); - if (app_.config().REDUCE_RELAY_ENABLE && !haveMessage.empty() && - squelch::epoch(UptimeClock::now()) > - squelch::WAIT_ON_BOOTUP) + if (reduceRelayReady() && !haveMessage.empty()) { overlay_.updateSlotAndSquelch( suppression, @@ -3038,6 +3032,16 @@ PeerImp::isHighLatency() const return latency_ >= peerHighLatency; } +bool +PeerImp::reduceRelayReady() +{ + if (!reduceRelayReady_) + reduceRelayReady_ = + reduce_relay::epoch(UptimeClock::now()) > + reduce_relay::WAIT_ON_BOOTUP; + return vpReduceRelayEnabled_ && reduceRelayReady_; +} + void PeerImp::Metrics::add_message(std::uint64_t bytes) { diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 35fafb581..583783b21 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -118,7 +118,8 @@ private: clock_type::time_point lastPingTime_; clock_type::time_point const creationTime_; - squelch::Squelch squelch_; + reduce_relay::Squelch squelch_; + inline static std::atomic_bool reduceRelayReady_{false}; // Notes on thread locking: // @@ -169,6 +170,10 @@ private: Compressed compressionEnabled_ = Compressed::Off; + // true if validation/proposal reduce-relay feature is enabled + // on the peer. + bool vpReduceRelayEnabled_ = false; + friend class OverlayImpl; class Metrics @@ -459,6 +464,11 @@ private: void onWriteMessage(error_code ec, std::size_t bytes_transferred); + // Check if reduce-relay feature is enabled and + // reduce_relay::WAIT_ON_BOOTUP time passed since the start + bool + reduceRelayReady(); + public: //-------------------------------------------------------------------------- // @@ -473,7 +483,9 @@ public: onMessageBegin( std::uint16_t type, std::shared_ptr<::google::protobuf::Message> const& m, - std::size_t size); + std::size_t size, + std::size_t uncompressed_size, + bool isCompressed); void onMessageEnd( @@ -594,18 +606,32 @@ PeerImp::PeerImp( , publicKey_(publicKey) , lastPingTime_(clock_type::now()) , creationTime_(clock_type::now()) + , squelch_(app_.journal("Squelch")) , usage_(usage) , fee_(Resource::feeLightPeer) , slot_(std::move(slot)) , response_(std::move(response)) , headers_(response_) , compressionEnabled_( - headers_["X-Offer-Compression"] == "lz4" && app_.config().COMPRESSION + peerFeatureEnabled( + headers_, + FEATURE_COMPR, + "lz4", + app_.config().COMPRESSION) ? Compressed::On : Compressed::Off) + , vpReduceRelayEnabled_(peerFeatureEnabled( + headers_, + FEATURE_VPRR, + app_.config().VP_REDUCE_RELAY_ENABLE)) { read_buffer_.commit(boost::asio::buffer_copy( read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers)); + JLOG(journal_.debug()) << "compression enabled " + << (compressionEnabled_ == Compressed::On) + << " vp reduce-relay enabled " + << vpReduceRelayEnabled_ << " on " << remote_address_ + << " " << id_; } template diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index 745016d74..b5b7f80a6 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -124,15 +125,25 @@ buffersBegin(BufferSequence const& bufs) bufs); } +template +auto +buffersEnd(BufferSequence const& bufs) +{ + return boost::asio::buffers_iterator::end( + bufs); +} + /** Parse a message header * @return a seated optional if the message header was successfully * parsed. An unseated optional otherwise, in which case * @param ec contains more information: * - set to `errc::success` if not enough bytes were present * - set to `errc::no_message` if a valid header was not present + * @bufs - sequence of input buffers, can't be empty + * @size input data size */ template -boost::optional +std::optional parseMessageHeader( boost::system::error_code& ec, BufferSequence const& bufs, @@ -142,6 +153,7 @@ parseMessageHeader( MessageHeader hdr; auto iter = buffersBegin(bufs); + assert(iter != buffersEnd(bufs)); // Check valid header compressed message: // - 4 bits are the compression algorithm, 1st bit is always set to 1 @@ -156,13 +168,13 @@ parseMessageHeader( if (size < hdr.header_size) { ec = make_error_code(boost::system::errc::success); - return boost::none; + return std::nullopt; } if (*iter & 0x0C) { ec = make_error_code(boost::system::errc::protocol_error); - return boost::none; + return std::nullopt; } hdr.algorithm = static_cast(*iter & 0xF0); @@ -170,7 +182,7 @@ parseMessageHeader( if (hdr.algorithm != compression::Algorithm::LZ4) { ec = make_error_code(boost::system::errc::protocol_error); - return boost::none; + return std::nullopt; } for (int i = 0; i != 4; ++i) @@ -200,7 +212,7 @@ parseMessageHeader( if (size < hdr.header_size) { ec = make_error_code(boost::system::errc::success); - return boost::none; + return std::nullopt; } hdr.algorithm = Algorithm::None; @@ -218,7 +230,7 @@ parseMessageHeader( } ec = make_error_code(boost::system::errc::no_message); - return boost::none; + return std::nullopt; } template < @@ -268,7 +280,13 @@ invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler) if (!m) return false; - handler.onMessageBegin(header.message_type, m, header.payload_wire_size); + using namespace ripple::compression; + handler.onMessageBegin( + header.message_type, + m, + header.payload_wire_size, + header.uncompressed_size, + header.algorithm != Algorithm::None); handler.onMessage(m); handler.onMessageEnd(header.message_type, m); diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 12fa466c6..41c6c6568 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -362,6 +362,6 @@ message TMSquelch { required bool squelch = 1; // squelch if true, otherwise unsquelch required bytes validatorPubKey = 2; // validator's public key - optional uint32 squelchDuration = 3; // squelch duration in milliseconds + optional uint32 squelchDuration = 3; // squelch duration in seconds } diff --git a/src/test/overlay/compression_test.cpp b/src/test/overlay/compression_test.cpp index a83aa09b1..b91a55921 100644 --- a/src/test/overlay/compression_test.cpp +++ b/src/test/overlay/compression_test.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -112,7 +113,7 @@ public: BEAST_EXPECT(header); - if (header->algorithm == Algorithm::None) + if (!header || header->algorithm == Algorithm::None) return; std::vector decompressed; @@ -242,7 +243,7 @@ public: uint256 const hash(ripple::sha512Half(123456789)); getLedger->set_ledgerhash(hash.begin(), hash.size()); getLedger->set_ledgerseq(123456789); - ripple::SHAMapNodeID sha(17, hash); + ripple::SHAMapNodeID sha(64, hash); getLedger->add_nodeids(sha.getRawString()); getLedger->set_requestcookie(123456789); getLedger->set_querytype(protocol::qtINDIRECT); @@ -302,7 +303,7 @@ public: uint256 hash(ripple::sha512Half(i)); auto object = getObject->add_objects(); object->set_hash(hash.data(), hash.size()); - ripple::SHAMapNodeID sha(i % 55, hash); + ripple::SHAMapNodeID sha(64, hash); object->set_nodeid(sha.getRawString()); object->set_index(""); object->set_data(""); @@ -458,14 +459,80 @@ public: "TMValidatorListCollection"); } + void + testHandshake() + { + testcase("Handshake"); + auto getEnv = [&](bool enable) { + Config c; + std::stringstream str; + str << "[reduce_relay]\n" + << "vp_enable=1\n" + << "vp_squelch=1\n" + << "[compression]\n" + << enable << "\n"; + c.loadFromString(str.str()); + auto env = std::make_shared(*this); + env->app().config().COMPRESSION = c.COMPRESSION; + env->app().config().VP_REDUCE_RELAY_ENABLE = + c.VP_REDUCE_RELAY_ENABLE; + env->app().config().VP_REDUCE_RELAY_SQUELCH = + c.VP_REDUCE_RELAY_SQUELCH; + return env; + }; + auto handshake = [&](int outboundEnable, int inboundEnable) { + beast::IP::Address addr = + boost::asio::ip::address::from_string("172.1.1.100"); + + auto env = getEnv(outboundEnable); + auto request = ripple::makeRequest( + true, + env->app().config().COMPRESSION, + env->app().config().VP_REDUCE_RELAY_ENABLE); + http_request_type http_request; + http_request.version(request.version()); + http_request.base() = request.base(); + // feature enabled on the peer's connection only if both sides are + // enabled + auto const peerEnabled = inboundEnable && outboundEnable; + // inbound is enabled if the request's header has the feature + // enabled and the peer's configuration is enabled + auto const inboundEnabled = peerFeatureEnabled( + http_request, FEATURE_COMPR, "lz4", inboundEnable); + BEAST_EXPECT(!(peerEnabled ^ inboundEnabled)); + + env.reset(); + env = getEnv(inboundEnable); + auto http_resp = ripple::makeResponse( + true, + http_request, + addr, + addr, + uint256{1}, + 1, + {1, 0}, + env->app()); + // outbound is enabled if the response's header has the feature + // enabled and the peer's configuration is enabled + auto const outboundEnabled = peerFeatureEnabled( + http_resp, FEATURE_COMPR, "lz4", outboundEnable); + BEAST_EXPECT(!(peerEnabled ^ outboundEnabled)); + }; + handshake(1, 1); + handshake(1, 0); + handshake(0, 1); + handshake(0, 0); + } + void run() override { testProtocol(); + testHandshake(); } }; -BEAST_DEFINE_TESTSUITE_MANUAL_PRIO(compression, ripple_data, ripple, 20); +BEAST_DEFINE_TESTSUITE_MANUAL(compression, ripple_data, ripple); } // namespace test } // namespace ripple diff --git a/src/test/overlay/handshake_test.cpp b/src/test/overlay/handshake_test.cpp new file mode 100644 index 000000000..25bf20add --- /dev/null +++ b/src/test/overlay/handshake_test.cpp @@ -0,0 +1,64 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include + +namespace ripple { + +namespace test { + +class handshake_test : public beast::unit_test::suite +{ +public: + handshake_test() = default; + + void + testHandshake() + { + testcase("X-Protocol-Ctl"); + boost::beast::http::fields headers; + headers.insert( + "X-Protocol-Ctl", + "feature1=v1,v2,v3; feature2=v4; feature3=10; feature4=1; " + "feature5=v6"); + BEAST_EXPECT(!featureEnabled(headers, "feature1")); + BEAST_EXPECT(!isFeatureValue(headers, "feature1", "2")); + BEAST_EXPECT(isFeatureValue(headers, "feature1", "v1")); + BEAST_EXPECT(isFeatureValue(headers, "feature1", "v2")); + BEAST_EXPECT(isFeatureValue(headers, "feature1", "v3")); + BEAST_EXPECT(isFeatureValue(headers, "feature2", "v4")); + BEAST_EXPECT(!isFeatureValue(headers, "feature3", "1")); + BEAST_EXPECT(isFeatureValue(headers, "feature3", "10")); + BEAST_EXPECT(!isFeatureValue(headers, "feature4", "10")); + BEAST_EXPECT(isFeatureValue(headers, "feature4", "1")); + BEAST_EXPECT(!featureEnabled(headers, "v6")); + } + + void + run() override + { + testHandshake(); + } +}; + +BEAST_DEFINE_TESTSUITE(handshake, ripple_data, ripple); + +} // namespace test +} // namespace ripple \ No newline at end of file diff --git a/src/test/overlay/reduce_relay_test.cpp b/src/test/overlay/reduce_relay_test.cpp index 5e8f05d63..d7aca9199 100644 --- a/src/test/overlay/reduce_relay_test.cpp +++ b/src/test/overlay/reduce_relay_test.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -437,7 +438,8 @@ class PeerSim : public PeerPartial, public std::enable_shared_from_this { public: using id_t = Peer::id_t; - PeerSim(Overlay& overlay) : overlay_(overlay) + PeerSim(Overlay& overlay, beast::Journal journal) + : overlay_(overlay), squelch_(journal) { id_ = sid_++; } @@ -462,7 +464,7 @@ public: { auto validator = m->getValidatorKey(); assert(validator); - if (squelch_.isSquelched(*validator)) + if (!squelch_.expireSquelch(*validator)) return; overlay_.updateSlotAndSquelch({}, *validator, id(), f); @@ -474,24 +476,28 @@ public: { auto validator = squelch.validatorpubkey(); PublicKey key(Slice(validator.data(), validator.size())); - squelch_.squelch(key, squelch.squelch(), squelch.squelchduration()); + if (squelch.squelch()) + squelch_.addSquelch( + key, std::chrono::seconds{squelch.squelchduration()}); + else + squelch_.removeSquelch(key); } private: inline static id_t sid_ = 0; id_t id_; Overlay& overlay_; - squelch::Squelch squelch_; + reduce_relay::Squelch squelch_; }; -class OverlaySim : public Overlay, public squelch::SquelchHandler +class OverlaySim : public Overlay, public reduce_relay::SquelchHandler { using Peers = std::unordered_map; public: using id_t = Peer::id_t; using clock_type = ManualClock; - OverlaySim(Application& app) : slots_(app, *this) + OverlaySim(Application& app) : slots_(app, *this), app_(app) { } @@ -506,7 +512,7 @@ public: } std::uint16_t - inState(PublicKey const& validator, squelch::PeerState state) + inState(PublicKey const& validator, reduce_relay::PeerState state) { auto res = slots_.inState(validator, state); return res ? *res : 0; @@ -545,7 +551,7 @@ public: Peer::id_t id; if (peersCache_.empty() || !useCache) { - peer = std::make_shared(*this); + peer = std::make_shared(*this, app_.journal("Squelch")); id = peer->id(); } else @@ -602,7 +608,7 @@ public: bool isCountingState(PublicKey const& validator) { - return slots_.inState(validator, squelch::SlotState::Counting); + return slots_.inState(validator, reduce_relay::SlotState::Counting); } std::set @@ -629,7 +635,7 @@ public: std::unordered_map< id_t, std::tuple< - squelch::PeerState, + reduce_relay::PeerState, std::uint16_t, std::uint32_t, std::uint32_t>> @@ -664,7 +670,8 @@ private: UnsquelchCB unsquelch_; Peers peers_; Peers peersCache_; - squelch::Slots slots_; + reduce_relay::Slots slots_; + Application& app_; }; class Network @@ -843,8 +850,8 @@ public: for (auto& [_, v] : peers) { (void)_; - if (std::get(v) == - squelch::PeerState::Squelched) + if (std::get(v) == + reduce_relay::PeerState::Squelched) return false; } } @@ -858,7 +865,7 @@ private: class reduce_relay_test : public beast::unit_test::suite { - using Slot = squelch::Slot; + using Slot = reduce_relay::Slot; using id_t = Peer::id_t; protected: @@ -870,7 +877,7 @@ protected: << "num peers " << (int)network_.overlay().getNumPeers() << std::endl; for (auto& [k, v] : peers) - std::cout << k << ":" << (int)std::get(v) + std::cout << k << ":" << (int)std::get(v) << " "; std::cout << std::endl; } @@ -950,7 +957,8 @@ protected: str << s << " "; if (log) std::cout - << (double)squelch::epoch(now).count() / + << (double)reduce_relay::epoch(now) + .count() / 1000. << " random, squelched, validator: " << validator.id() << " peers: " << str.str() << std::endl; @@ -958,7 +966,7 @@ protected: network_.overlay().isCountingState(validator); BEAST_EXPECT( countingState == false && - selected.size() == squelch::MAX_SELECTED_PEERS); + selected.size() == reduce_relay::MAX_SELECTED_PEERS); } // Trigger Link Down or Peer Disconnect event @@ -1038,12 +1046,13 @@ protected: event.isSelected_ = network_.overlay().isSelected(event.key_, event.peer_); auto peers = network_.overlay().getPeers(event.key_); - auto d = squelch::epoch(now).count() - + auto d = reduce_relay::epoch(now).count() - std::get<3>(peers[event.peer_]); mustHandle = event.isSelected_ && - d > milliseconds(squelch::IDLED).count() && + d > milliseconds(reduce_relay::IDLED).count() && network_.overlay().inState( - event.key_, squelch::PeerState::Squelched) > 0 && + event.key_, reduce_relay::PeerState::Squelched) > + 0 && peers.find(event.peer_) != peers.end(); } network_.overlay().deleteIdlePeers( @@ -1062,7 +1071,7 @@ protected: } if (event.state_ == State::WaitReset || (event.state_ == State::On && - (now - event.time_ > (squelch::IDLED + seconds(2))))) + (now - event.time_ > (reduce_relay::IDLED + seconds(2))))) { bool handled = event.state_ == State::WaitReset || !event.handled_; @@ -1158,16 +1167,17 @@ protected: if (squelched) { BEAST_EXPECT( - squelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); + squelched == + MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS); n++; } }, 1, - squelch::MAX_MESSAGE_THRESHOLD + 2, + reduce_relay::MAX_MESSAGE_THRESHOLD + 2, purge, resetClock); auto selected = network_.overlay().getSelected(network_.validator(0)); - BEAST_EXPECT(selected.size() == squelch::MAX_SELECTED_PEERS); + BEAST_EXPECT(selected.size() == reduce_relay::MAX_SELECTED_PEERS); BEAST_EXPECT(n == 1); // only one selection round auto res = checkCounting(network_.validator(0), false); BEAST_EXPECT(res); @@ -1231,7 +1241,7 @@ protected: unsquelched++; }); BEAST_EXPECT( - unsquelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); + unsquelched == MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS); BEAST_EXPECT(checkCounting(network_.validator(0), true)); }); } @@ -1244,7 +1254,7 @@ protected: doTest("Selected Peer Stops Relaying", log, [this](bool log) { ManualClock::advance(seconds(601)); BEAST_EXPECT(propagateAndSquelch(log, true, false)); - ManualClock::advance(squelch::IDLED + seconds(1)); + ManualClock::advance(reduce_relay::IDLED + seconds(1)); std::uint16_t unsquelched = 0; network_.overlay().deleteIdlePeers( [&](PublicKey const& key, PeerWPtr const& peer) { @@ -1252,7 +1262,7 @@ protected: }); auto peers = network_.overlay().getPeers(network_.validator(0)); BEAST_EXPECT( - unsquelched == MAX_PEERS - squelch::MAX_SELECTED_PEERS); + unsquelched == MAX_PEERS - reduce_relay::MAX_SELECTED_PEERS); BEAST_EXPECT(checkCounting(network_.validator(0), true)); }); } @@ -1267,8 +1277,8 @@ protected: BEAST_EXPECT(propagateAndSquelch(log, true, false)); auto peers = network_.overlay().getPeers(network_.validator(0)); auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) { - return std::get(it.second) == - squelch::PeerState::Squelched; + return std::get(it.second) == + reduce_relay::PeerState::Squelched; }); assert(it != peers.end()); std::uint16_t unsquelched = 0; @@ -1289,37 +1299,37 @@ protected: std::string toLoad(R"rippleConfig( [reduce_relay] -enable=1 -squelch=1 +vp_enable=1 +vp_squelch=1 )rippleConfig"); c.loadFromString(toLoad); - BEAST_EXPECT(c.REDUCE_RELAY_ENABLE == true); - BEAST_EXPECT(c.REDUCE_RELAY_SQUELCH == true); + BEAST_EXPECT(c.VP_REDUCE_RELAY_ENABLE == true); + BEAST_EXPECT(c.VP_REDUCE_RELAY_SQUELCH == true); Config c1; toLoad = (R"rippleConfig( [reduce_relay] -enable=0 -squelch=0 +vp_enable=0 +vp_squelch=0 )rippleConfig"); c1.loadFromString(toLoad); - BEAST_EXPECT(c1.REDUCE_RELAY_ENABLE == false); - BEAST_EXPECT(c1.REDUCE_RELAY_SQUELCH == false); + BEAST_EXPECT(c1.VP_REDUCE_RELAY_ENABLE == false); + BEAST_EXPECT(c1.VP_REDUCE_RELAY_SQUELCH == false); Config c2; toLoad = R"rippleConfig( [reduce_relay] -enabled=1 -squelched=1 +vp_enabled=1 +vp_squelched=1 )rippleConfig"; c2.loadFromString(toLoad); - BEAST_EXPECT(c2.REDUCE_RELAY_ENABLE == false); - BEAST_EXPECT(c2.REDUCE_RELAY_SQUELCH == false); + BEAST_EXPECT(c2.VP_REDUCE_RELAY_ENABLE == false); + BEAST_EXPECT(c2.VP_REDUCE_RELAY_SQUELCH == false); }); } @@ -1354,7 +1364,7 @@ squelched=1 peers = network_.overlay().getPeers(network_.validator(0)); BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1)); // advance the clock - ManualClock::advance(squelch::IDLED + seconds(1)); + ManualClock::advance(reduce_relay::IDLED + seconds(1)); network_.overlay().updateSlotAndSquelch( key, network_.validator(0), @@ -1366,6 +1376,166 @@ squelched=1 }); } + struct Handler : public reduce_relay::SquelchHandler + { + Handler() : maxDuration_(0) + { + } + void + squelch(PublicKey const&, Peer::id_t, std::uint32_t duration) + const override + { + if (duration > maxDuration_) + maxDuration_ = duration; + } + void + unsquelch(PublicKey const&, Peer::id_t) const override + { + } + mutable int maxDuration_; + }; + + void + testRandomSquelch(bool l) + { + doTest("Random Squelch", l, [&](bool l) { + PublicKey validator = std::get<0>(randomKeyPair(KeyType::ed25519)); + Handler handler; + + auto run = [&](int npeers) { + handler.maxDuration_ = 0; + reduce_relay::Slots slots(env_.app(), handler); + // 1st message from a new peer switches the slot + // to counting state and resets the counts of all peers + + // MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold + // and switch the slot's state to peer selection. + for (int m = 1; m <= reduce_relay::MAX_MESSAGE_THRESHOLD + 2; + m++) + { + for (int peer = 0; peer < npeers; peer++) + { + // make unique message hash to make the + // slot's internal hash router accept the message + std::uint64_t mid = m * 1000 + peer; + uint256 const message{mid}; + slots.updateSlotAndSquelch( + message, + validator, + peer, + protocol::MessageType::mtVALIDATION); + } + } + // make Slot's internal hash router expire all messages + ManualClock::advance(hours(1)); + }; + + using namespace reduce_relay; + // expect max duration less than MAX_UNSQUELCH_EXPIRE_DEFAULT with + // less than or equal to 60 peers + run(20); + BEAST_EXPECT( + handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() && + handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + run(60); + BEAST_EXPECT( + handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() && + handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + // expect max duration greater than MIN_UNSQUELCH_EXPIRE and less + // than MAX_UNSQUELCH_EXPIRE_PEERS with peers greater than 60 + // and less than 360 + run(350); + // can't make this condition stronger. squelch + // duration is probabilistic and max condition may still fail. + // log when the value is low + BEAST_EXPECT( + handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() && + handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count()); + using namespace beast::unit_test::detail; + if (handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count()) + log << make_reason( + "warning: squelch duration is low", + __FILE__, + __LINE__) + << std::endl + << std::flush; + // more than 400 is still less than MAX_UNSQUELCH_EXPIRE_PEERS + run(400); + BEAST_EXPECT( + handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() && + handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count()); + if (handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count()) + log << make_reason( + "warning: squelch duration is low", + __FILE__, + __LINE__) + << std::endl + << std::flush; + }); + } + + void + testHandshake(bool log) + { + doTest("Handshake", log, [&](bool log) { + auto setEnv = [&](bool enable) { + Config c; + std::stringstream str; + str << "[reduce_relay]\n" + << "vp_enable=" << enable << "\n" + << "vp_squelch=" << enable << "\n" + << "[compression]\n" + << "1\n"; + c.loadFromString(str.str()); + env_.app().config().VP_REDUCE_RELAY_ENABLE = + c.VP_REDUCE_RELAY_ENABLE; + env_.app().config().VP_REDUCE_RELAY_SQUELCH = + c.VP_REDUCE_RELAY_SQUELCH; + env_.app().config().COMPRESSION = c.COMPRESSION; + }; + auto handshake = [&](int outboundEnable, int inboundEnable) { + beast::IP::Address addr = + boost::asio::ip::address::from_string("172.1.1.100"); + + setEnv(outboundEnable); + auto request = ripple::makeRequest( + true, + env_.app().config().COMPRESSION, + env_.app().config().VP_REDUCE_RELAY_ENABLE); + http_request_type http_request; + http_request.version(request.version()); + http_request.base() = request.base(); + // feature enabled on the peer's connection only if both sides + // are enabled + auto const peerEnabled = inboundEnable && outboundEnable; + // inbound is enabled if the request's header has the feature + // enabled and the peer's configuration is enabled + auto const inboundEnabled = peerFeatureEnabled( + http_request, FEATURE_VPRR, inboundEnable); + BEAST_EXPECT(!(peerEnabled ^ inboundEnabled)); + + setEnv(inboundEnable); + auto http_resp = ripple::makeResponse( + true, + http_request, + addr, + addr, + uint256{1}, + 1, + {1, 0}, + env_.app()); + // outbound is enabled if the response's header has the feature + // enabled and the peer's configuration is enabled + auto const outboundEnabled = + peerFeatureEnabled(http_resp, FEATURE_VPRR, outboundEnable); + BEAST_EXPECT(!(peerEnabled ^ outboundEnabled)); + }; + handshake(1, 1); + handshake(1, 0); + handshake(0, 1); + handshake(0, 0); + }); + } + jtx::Env env_; Network network_; @@ -1387,6 +1557,8 @@ public: testSelectedPeerDisconnects(log); testSelectedPeerStopsRelaying(log); testInternalHashRouter(log); + testRandomSquelch(log); + testHandshake(log); } };