From ea145d12c7e1bd32344bcb46f18657be679ef0ac Mon Sep 17 00:00:00 2001 From: Gregory Tsipenyuk Date: Tue, 8 Sep 2020 18:05:08 -0400 Subject: [PATCH] Improve transaction relaying logic: The existing logic involves every server sending every transaction that it receives to all its peers (except the one that it received a transaction from). This commit instead uses a randomized algorithm, where a node will randomly select peers to relay a given transaction to, caching the list of transaction hashes that are not relayed and forwading them to peers once every second. Peers can then determine whether there are transactions that they have not seen and can request them from the node which has them. It is expected that this feature will further reduce the bandwidth needed to operate a server. --- Builds/CMake/RippledCore.cmake | 3 + Builds/levelization/results/ordering.txt | 1 + src/ripple/app/consensus/RCLConsensus.cpp | 4 +- src/ripple/app/ledger/impl/OpenLedger.cpp | 4 +- src/ripple/app/misc/NetworkOPs.cpp | 4 +- src/ripple/core/Config.h | 15 ++ src/ripple/core/Job.h | 2 + src/ripple/core/JobTypes.h | 2 + src/ripple/core/impl/Config.cpp | 11 + src/ripple/overlay/Overlay.h | 19 ++ src/ripple/overlay/Peer.h | 15 ++ src/ripple/overlay/ReduceRelayCommon.h | 5 + src/ripple/overlay/Slot.h | 10 +- src/ripple/overlay/impl/ConnectAttempt.cpp | 5 +- src/ripple/overlay/impl/Handshake.cpp | 41 +-- src/ripple/overlay/impl/Handshake.h | 45 ++-- src/ripple/overlay/impl/Message.cpp | 2 + src/ripple/overlay/impl/OverlayImpl.cpp | 106 +++++++- src/ripple/overlay/impl/OverlayImpl.h | 49 ++++ src/ripple/overlay/impl/PeerImp.cpp | 268 +++++++++++++++++++- src/ripple/overlay/impl/PeerImp.h | 82 +++++- src/ripple/overlay/impl/ProtocolMessage.h | 12 + src/ripple/overlay/impl/TrafficCount.cpp | 9 + src/ripple/overlay/impl/TrafficCount.h | 20 +- src/ripple/overlay/impl/TxMetrics.cpp | 151 +++++++++++ src/ripple/overlay/impl/TxMetrics.h | 141 +++++++++++ src/ripple/proto/ripple.proto | 13 + src/ripple/protocol/jss.h | 14 ++ src/ripple/rpc/handlers/Handlers.h | 2 + src/ripple/rpc/handlers/TxReduceRelay.cpp | 33 +++ src/ripple/rpc/impl/Handler.cpp | 1 + src/test/app/LedgerReplay_test.cpp | 20 +- src/test/overlay/compression_test.cpp | 5 +- src/test/overlay/reduce_relay_test.cpp | 31 ++- src/test/overlay/tx_reduce_relay_test.cpp | 275 +++++++++++++++++++++ 35 files changed, 1349 insertions(+), 71 deletions(-) create mode 100644 src/ripple/overlay/impl/TxMetrics.cpp create mode 100644 src/ripple/overlay/impl/TxMetrics.h create mode 100644 src/ripple/rpc/handlers/TxReduceRelay.cpp create mode 100644 src/test/overlay/tx_reduce_relay_test.cpp diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index a585770d52..26f83fb798 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -535,6 +535,7 @@ target_sources (rippled PRIVATE src/ripple/overlay/impl/PeerSet.cpp src/ripple/overlay/impl/ProtocolVersion.cpp src/ripple/overlay/impl/TrafficCount.cpp + src/ripple/overlay/impl/TxMetrics.cpp #[===============================[ main sources: subdir: peerfinder @@ -613,6 +614,7 @@ target_sources (rippled PRIVATE src/ripple/rpc/handlers/TransactionEntry.cpp src/ripple/rpc/handlers/Tx.cpp src/ripple/rpc/handlers/TxHistory.cpp + src/ripple/rpc/handlers/TxReduceRelay.cpp src/ripple/rpc/handlers/UnlList.cpp src/ripple/rpc/handlers/Unsubscribe.cpp src/ripple/rpc/handlers/ValidationCreate.cpp @@ -864,6 +866,7 @@ target_sources (rippled PRIVATE src/test/overlay/compression_test.cpp src/test/overlay/reduce_relay_test.cpp src/test/overlay/handshake_test.cpp + src/test/overlay/tx_reduce_relay_test.cpp #[===============================[ test sources: subdir: peerfinder diff --git a/Builds/levelization/results/ordering.txt b/Builds/levelization/results/ordering.txt index 8efdd690c8..de0d6f7c09 100644 --- a/Builds/levelization/results/ordering.txt +++ b/Builds/levelization/results/ordering.txt @@ -163,6 +163,7 @@ test.overlay > ripple.basics test.overlay > ripple.beast test.overlay > ripple.core test.overlay > ripple.overlay +test.overlay > ripple.peerfinder test.overlay > ripple.protocol test.overlay > ripple.shamap test.overlay > test.jtx diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index 11c5ff6c30..859c66d315 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -187,8 +187,8 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx) msg.set_status(protocol::tsNEW); msg.set_receivetimestamp( app_.timeKeeper().now().time_since_epoch().count()); - app_.overlay().foreach(send_always( - std::make_shared(msg, protocol::mtTRANSACTION))); + static std::set skip{}; + app_.overlay().relay(tx.id(), msg, skip); } else { diff --git a/src/ripple/app/ledger/impl/OpenLedger.cpp b/src/ripple/app/ledger/impl/OpenLedger.cpp index 463e5c880a..113c46951a 100644 --- a/src/ripple/app/ledger/impl/OpenLedger.cpp +++ b/src/ripple/app/ledger/impl/OpenLedger.cpp @@ -150,9 +150,7 @@ OpenLedger::accept( msg.set_status(protocol::tsNEW); msg.set_receivetimestamp( app.timeKeeper().now().time_since_epoch().count()); - app.overlay().foreach(send_if_not( - std::make_shared(msg, protocol::mtTRANSACTION), - peer_in_set(*toSkip))); + app.overlay().relay(txId, msg, *toSkip); } } diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index b29133b1f6..ab96822359 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1342,9 +1342,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) app_.timeKeeper().now().time_since_epoch().count()); tx.set_deferred(e.result == terQUEUED); // FIXME: This should be when we received it - app_.overlay().foreach(send_if_not( - std::make_shared(tx, protocol::mtTRANSACTION), - peer_in_set(*toSkip))); + app_.overlay().relay(e.transaction->getID(), tx, *toSkip); e.transaction->setBroadcast(); } } diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index 146a40a796..30117ad13d 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -217,6 +217,21 @@ public: // Set log level to debug so that the feature function can be // analyzed. bool VP_REDUCE_RELAY_SQUELCH = false; + // Transaction reduce-relay feature + bool TX_REDUCE_RELAY_ENABLE = false; + // If tx reduce-relay feature is disabled + // and this flag is enabled then some + // tx-related metrics is collected. It + // is ignored if tx reduce-relay feature is + // enabled. It is used in debugging to compare + // metrics with the feature disabled/enabled. + bool TX_REDUCE_RELAY_METRICS = false; + // Minimum peers a server should have before + // selecting random peers + std::size_t TX_REDUCE_RELAY_MIN_PEERS = 20; + // Percentage of peers with the tx reduce-relay feature enabled + // to relay to out of total active peers + std::size_t TX_RELAY_PERCENTAGE = 25; // These override the command line client settings std::optional rpc_ip; diff --git a/src/ripple/core/Job.h b/src/ripple/core/Job.h index a5fbf5413e..15552be70c 100644 --- a/src/ripple/core/Job.h +++ b/src/ripple/core/Job.h @@ -52,6 +52,8 @@ enum JobType { jtRPC, // A websocket command from the client jtUPDATE_PF, // Update pathfinding requests jtTRANSACTION, // A transaction received from the network + jtMISSING_TXN, // Request missing transactions + jtREQUESTED_TXN, // Reply with requested transactions jtBATCH, // Apply batched transactions jtADVANCE, // Advance validated/acquired ledgers jtPUBLEDGER, // Publish a fully-accepted ledger diff --git a/src/ripple/core/JobTypes.h b/src/ripple/core/JobTypes.h index 3cd7794f6f..45f69a5308 100644 --- a/src/ripple/core/JobTypes.h +++ b/src/ripple/core/JobTypes.h @@ -84,6 +84,8 @@ private: add(jtNETOP_CLUSTER, "clusterReport", 1, false, 9999ms, 9999ms); add(jtNETOP_TIMER, "heartbeat", 1, false, 999ms, 999ms); add(jtADMIN, "administration", maxLimit, false, 0ms, 0ms); + add(jtMISSING_TXN, "handleHaveTransactions", 1200, false, 0ms, 0ms); + add(jtREQUESTED_TXN, "doTransactions", 1200, false, 0ms, 0ms); add(jtPEER, "peerCommand", 0, true, 200ms, 2500ms); add(jtDISK, "diskAccess", 0, true, 500ms, 1000ms); diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index d67b39c004..08c30f0da9 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -641,6 +641,17 @@ Config::loadFromString(std::string const& fileContents) auto sec = section(SECTION_REDUCE_RELAY); VP_REDUCE_RELAY_ENABLE = sec.value_or("vp_enable", false); VP_REDUCE_RELAY_SQUELCH = sec.value_or("vp_squelch", false); + TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false); + TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false); + TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20); + TX_RELAY_PERCENTAGE = sec.value_or("tx_relay_percentage", 25); + if (TX_RELAY_PERCENTAGE < 10 || TX_RELAY_PERCENTAGE > 100 || + TX_REDUCE_RELAY_MIN_PEERS < 10) + Throw( + "Invalid " SECTION_REDUCE_RELAY + ", tx_min_peers must be greater or equal to 10" + ", tx_relay_percentage must be greater or equal to 10 " + "and less or equal to 100"); } if (getSingleSection(secConfig, SECTION_MAX_TRANSACTIONS, strTemp, j_)) diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index 65050ca26f..844eafb86c 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -173,6 +173,19 @@ public: uint256 const& uid, PublicKey const& validator) = 0; + /** Relay a transaction. If the tx reduce-relay feature is enabled then + * randomly select peers to relay to and queue transaction's hash + * for the rest of the peers. + * @param hash transaction's hash + * @param m transaction's protocol message to relay + * @param toSkip peers which have already seen this transaction + */ + virtual void + relay( + uint256 const& hash, + protocol::TMTransaction& m, + std::set const& toSkip) = 0; + /** Visit every active peer. * * The visitor must be invocable as: @@ -225,6 +238,12 @@ public: */ virtual std::optional networkID() const = 0; + + /** Returns tx reduce-relay metrics + @return json value of tx reduce-relay metrics + */ + virtual Json::Value + txMetrics() const = 0; }; } // namespace ripple diff --git a/src/ripple/overlay/Peer.h b/src/ripple/overlay/Peer.h index 49dc2fa096..ba41597415 100644 --- a/src/ripple/overlay/Peer.h +++ b/src/ripple/overlay/Peer.h @@ -66,6 +66,18 @@ public: virtual beast::IP::Endpoint getRemoteAddress() const = 0; + /** Send aggregated transactions' hashes. */ + virtual void + sendTxQueue() = 0; + + /** Aggregate transaction's hash. */ + virtual void + addTxQueue(uint256 const&) = 0; + + /** Remove hash from the transactions' hashes queue. */ + virtual void + removeTxQueue(uint256 const&) = 0; + /** Adjust this peer's load balance based on the type of load imposed. */ virtual void charge(Resource::Charge const& fee) = 0; @@ -121,6 +133,9 @@ public: virtual bool compressionEnabled() const = 0; + + virtual bool + txReduceRelayEnabled() const = 0; }; } // namespace ripple diff --git a/src/ripple/overlay/ReduceRelayCommon.h b/src/ripple/overlay/ReduceRelayCommon.h index 5e7f585d8f..3b87c3c8c1 100644 --- a/src/ripple/overlay/ReduceRelayCommon.h +++ b/src/ripple/overlay/ReduceRelayCommon.h @@ -48,6 +48,11 @@ static constexpr uint16_t MAX_SELECTED_PEERS = 5; // Wait before reduce-relay feature is enabled on boot up to let // the server establish peer connections static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10}; +// Maximum size of the aggregated transaction hashes per peer. +// Once we get to high tps throughput, this cap will prevent +// TMTransactions from exceeding the current protocol message +// size limit of 64MB. +static constexpr std::size_t MAX_TX_QUEUE_SIZE = 10000; } // namespace reduce_relay diff --git a/src/ripple/overlay/Slot.h b/src/ripple/overlay/Slot.h index dbb48a9218..b7a2129ed8 100644 --- a/src/ripple/overlay/Slot.h +++ b/src/ripple/overlay/Slot.h @@ -20,7 +20,7 @@ #ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED #define RIPPLE_OVERLAY_SLOT_H_INCLUDED -#include +#include #include #include #include @@ -549,8 +549,8 @@ public: * @param app Applicaton reference * @param handler Squelch/unsquelch implementation */ - Slots(Application& app, SquelchHandler const& handler) - : handler_(handler), app_(app), journal_(app.journal("Slots")) + Slots(Logs& logs, SquelchHandler const& handler) + : handler_(handler), logs_(logs), journal_(logs.journal("Slots")) { } ~Slots() = default; @@ -655,7 +655,7 @@ private: hash_map> slots_; SquelchHandler const& handler_; // squelch/unsquelch handler - Application& app_; + Logs& logs_; beast::Journal const journal_; // Maintain aged container of message/peers. This is required // to discard duplicate message from the same peer. A message @@ -717,7 +717,7 @@ Slots::updateSlotAndSquelch( auto it = slots_ .emplace(std::make_pair( validator, - Slot(handler_, app_.journal("Slot")))) + Slot(handler_, logs_.journal("Slot")))) .first; it->second.update(validator, id, type); } diff --git a/src/ripple/overlay/impl/ConnectAttempt.cpp b/src/ripple/overlay/impl/ConnectAttempt.cpp index 911215d807..f1d1c6ffe0 100644 --- a/src/ripple/overlay/impl/ConnectAttempt.cpp +++ b/src/ripple/overlay/impl/ConnectAttempt.cpp @@ -204,8 +204,9 @@ ConnectAttempt::onHandshake(error_code ec) req_ = makeRequest( !overlay_.peerFinder().config().peerPrivate, app_.config().COMPRESSION, - app_.config().VP_REDUCE_RELAY_ENABLE, - app_.config().LEDGER_REPLAY); + app_.config().LEDGER_REPLAY, + app_.config().TX_REDUCE_RELAY_ENABLE, + app_.config().VP_REDUCE_RELAY_ENABLE); buildHandshake( req_, diff --git a/src/ripple/overlay/impl/Handshake.cpp b/src/ripple/overlay/impl/Handshake.cpp index a11d6b802b..2ea208f557 100644 --- a/src/ripple/overlay/impl/Handshake.cpp +++ b/src/ripple/overlay/impl/Handshake.cpp @@ -73,16 +73,19 @@ featureEnabled( std::string makeFeaturesRequestHeader( bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled) + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled) { std::stringstream str; if (comprEnabled) str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE; - if (vpReduceRelayEnabled) - str << FEATURE_VPRR << "=1"; if (ledgerReplayEnabled) - str << FEATURE_LEDGER_REPLAY << "=1"; + str << FEATURE_LEDGER_REPLAY << "=1" << DELIM_FEATURE; + if (txReduceRelayEnabled) + str << FEATURE_TXRR << "=1" << DELIM_FEATURE; + if (vpReduceRelayEnabled) + str << FEATURE_VPRR << "=1" << DELIM_FEATURE; return str.str(); } @@ -90,16 +93,19 @@ std::string makeFeaturesResponseHeader( http_request_type const& headers, bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled) + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled) { std::stringstream str; if (comprEnabled && isFeatureValue(headers, FEATURE_COMPR, "lz4")) str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE; - if (vpReduceRelayEnabled && featureEnabled(headers, FEATURE_VPRR)) - str << FEATURE_VPRR << "=1"; if (ledgerReplayEnabled && featureEnabled(headers, FEATURE_LEDGER_REPLAY)) - str << FEATURE_LEDGER_REPLAY << "=1"; + str << FEATURE_LEDGER_REPLAY << "=1" << DELIM_FEATURE; + if (txReduceRelayEnabled && featureEnabled(headers, FEATURE_TXRR)) + str << FEATURE_TXRR << "=1" << DELIM_FEATURE; + if (vpReduceRelayEnabled && featureEnabled(headers, FEATURE_VPRR)) + str << FEATURE_VPRR << "=1" << DELIM_FEATURE; return str.str(); } @@ -363,8 +369,9 @@ auto makeRequest( bool crawlPublic, bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled) -> request_type + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled) -> request_type { request_type m; m.method(boost::beast::http::verb::get); @@ -378,7 +385,10 @@ makeRequest( m.insert( "X-Protocol-Ctl", makeFeaturesRequestHeader( - comprEnabled, vpReduceRelayEnabled, ledgerReplayEnabled)); + comprEnabled, + ledgerReplayEnabled, + txReduceRelayEnabled, + vpReduceRelayEnabled)); return m; } @@ -406,8 +416,9 @@ makeResponse( makeFeaturesResponseHeader( req, app.config().COMPRESSION, - app.config().VP_REDUCE_RELAY_ENABLE, - app.config().LEDGER_REPLAY)); + app.config().LEDGER_REPLAY, + app.config().TX_REDUCE_RELAY_ENABLE, + app.config().VP_REDUCE_RELAY_ENABLE)); buildHandshake(resp, sharedValue, networkID, public_ip, remote_ip, app); diff --git a/src/ripple/overlay/impl/Handshake.h b/src/ripple/overlay/impl/Handshake.h index 089f4ff6b6..2accf5d221 100644 --- a/src/ripple/overlay/impl/Handshake.h +++ b/src/ripple/overlay/impl/Handshake.h @@ -95,16 +95,20 @@ verifyHandshake( @param crawlPublic if true then server's IP/Port are included in crawl @param comprEnabled if true then compression feature is enabled - @param vpReduceRelayEnabled if true then reduce-relay feature is enabled @param ledgerReplayEnabled if true then ledger-replay feature is enabled + @param txReduceRelayEnabled if true then transaction reduce-relay feature is + enabled + @param vpReduceRelayEnabled if true then validation/proposal reduce-relay + feature is enabled @return http request with empty body */ request_type makeRequest( bool crawlPublic, bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled); + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled); /** Make http response @@ -133,11 +137,15 @@ makeResponse( // The format is: // X-Protocol-Ctl: feature1=value1[,value2]*[\s*;\s*feature2=value1[,value2]*]* // value: \S+ -static constexpr char FEATURE_COMPR[] = "compr"; // compression -static constexpr char FEATURE_VPRR[] = - "vprr"; // validation/proposal reduce-relay -static constexpr char FEATURE_LEDGER_REPLAY[] = - "ledgerreplay"; // ledger replay + +// compression feature +static constexpr char FEATURE_COMPR[] = "compr"; +// validation/proposal reduce-relay feature +static constexpr char FEATURE_VPRR[] = "vprr"; +// transaction reduce-relay feature +static constexpr char FEATURE_TXRR[] = "txrr"; +// ledger replay +static constexpr char FEATURE_LEDGER_REPLAY[] = "ledgerreplay"; static constexpr char DELIM_FEATURE[] = ";"; static constexpr char DELIM_VALUE[] = ","; @@ -210,15 +218,19 @@ peerFeatureEnabled( /** Make request header X-Protocol-Ctl value with supported features @param comprEnabled if true then compression feature is enabled - @param vpReduceRelayEnabled if true then reduce-relay feature is enabled @param ledgerReplayEnabled if true then ledger-replay feature is enabled + @param txReduceRelayEnabled if true then transaction reduce-relay feature is + enabled + @param vpReduceRelayEnabled if true then validation/proposal reduce-relay + feature is enabled @return X-Protocol-Ctl header value */ std::string makeFeaturesRequestHeader( bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled); + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled); /** Make response header X-Protocol-Ctl value with supported features. If the request has a feature that we support enabled @@ -226,16 +238,21 @@ makeFeaturesRequestHeader( the response header. @param header request's header @param comprEnabled if true then compression feature is enabled - @param vpReduceRelayEnabled if true then reduce-relay feature is enabled @param ledgerReplayEnabled if true then ledger-replay feature is enabled + @param txReduceRelayEnabled if true then transaction reduce-relay feature is + enabled + @param vpReduceRelayEnabled if true then validation/proposal reduce-relay + feature is enabled + @param vpReduceRelayEnabled if true then reduce-relay feature is enabled @return X-Protocol-Ctl header value */ std::string makeFeaturesResponseHeader( http_request_type const& headers, bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled); + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled); } // namespace ripple diff --git a/src/ripple/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp index aabd6ab785..b4cb1f192a 100644 --- a/src/ripple/overlay/impl/Message.cpp +++ b/src/ripple/overlay/impl/Message.cpp @@ -86,6 +86,7 @@ Message::compress() case protocol::mtVALIDATORLIST: case protocol::mtVALIDATORLISTCOLLECTION: case protocol::mtREPLAY_DELTA_RESPONSE: + case protocol::mtTRANSACTIONS: return true; case protocol::mtPING: case protocol::mtCLUSTER: @@ -100,6 +101,7 @@ Message::compress() case protocol::mtREPLAY_DELTA_REQ: case protocol::mtGET_PEER_SHARD_INFO_V2: case protocol::mtPEER_SHARD_INFO_V2: + case protocol::mtHAVE_TRANSACTIONS: break; } return false; diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 27ee59586b..91e37794e1 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -102,6 +103,8 @@ OverlayImpl::Timer::on_timer(error_code ec) overlay_.m_peerFinder->once_per_second(); overlay_.sendEndpoints(); overlay_.autoConnect(); + if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE) + overlay_.sendTxQueue(); if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0) overlay_.deleteIdlePeers(); @@ -137,7 +140,7 @@ OverlayImpl::OverlayImpl( , m_resolver(resolver) , next_id_(1) , timer_count_(0) - , slots_(app, *this) + , slots_(app.logs(), *this) , m_stats( std::bind(&OverlayImpl::collect_metrics, this), collector, @@ -1148,6 +1151,37 @@ OverlayImpl::getActivePeers() const return ret; } +Overlay::PeerSequence +OverlayImpl::getActivePeers( + std::set const& toSkip, + std::size_t& active, + std::size_t& disabled, + std::size_t& enabledInSkip) const +{ + Overlay::PeerSequence ret; + std::lock_guard lock(mutex_); + + active = ids_.size(); + ret.reserve(ids_.size() - toSkip.size()); + + for (auto& [id, w] : ids_) + { + if (auto p = w.lock()) + { + // tx rr feature disabled + if (!p->txReduceRelayEnabled()) + disabled++; + + if (toSkip.count(id) == 0) + ret.emplace_back(std::move(p)); + else if (p->txReduceRelayEnabled()) + enabledInSkip++; + } + } + + return ret; +} + void OverlayImpl::checkTracking(std::uint32_t index) { @@ -1264,6 +1298,67 @@ OverlayImpl::getManifestsMessage() return manifestMessage_; } +void +OverlayImpl::relay( + uint256 const& hash, + protocol::TMTransaction& m, + std::set const& toSkip) +{ + auto const sm = std::make_shared(m, protocol::mtTRANSACTION); + std::size_t total = 0; + std::size_t disabled = 0; + std::size_t enabledInSkip = 0; + + // total peers excluding peers in toSkip + auto peers = getActivePeers(toSkip, total, disabled, enabledInSkip); + auto minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled; + + if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay) + { + for (auto const& p : peers) + p->send(sm); + if (app_.config().TX_REDUCE_RELAY_ENABLE || + app_.config().TX_REDUCE_RELAY_METRICS) + txMetrics_.addMetrics(total, toSkip.size(), 0); + return; + } + + // We have more peers than the minimum (disabled + minimum enabled), + // relay to all disabled and some randomly selected enabled that + // do not have the transaction. + auto enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS + + (total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100; + + txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled); + + if (enabledTarget > enabledInSkip) + std::shuffle(peers.begin(), peers.end(), default_prng()); + + JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size() + << " selected " << enabledTarget << " skip " + << toSkip.size() << " disabled " << disabled; + + // count skipped peers with the enabled feature towards the quota + std::uint16_t enabledAndRelayed = enabledInSkip; + for (auto const& p : peers) + { + // always relay to a peer with the disabled feature + if (!p->txReduceRelayEnabled()) + { + p->send(sm); + } + else if (enabledAndRelayed < enabledTarget) + { + enabledAndRelayed++; + p->send(sm); + } + else + { + p->addTxQueue(hash); + } + } +} + //------------------------------------------------------------------------------ void @@ -1333,6 +1428,15 @@ OverlayImpl::sendEndpoints() } } +void +OverlayImpl::sendTxQueue() +{ + for_each([](auto const& p) { + if (p->txReduceRelayEnabled()) + p->sendTxQueue(); + }); +} + std::shared_ptr makeSquelchMessage( PublicKey const& validator, diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 764f2a79ac..5f23b9150e 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -128,6 +129,9 @@ private: reduce_relay::Slots slots_; + // Transaction reduce-relay metrics + metrics::TxMetrics txMetrics_; + // A message with the list of manifests we send to peers std::shared_ptr manifestMessage_; // Used to track whether we need to update the cached list of manifests @@ -197,6 +201,22 @@ public: PeerSequence getActivePeers() const override; + /** Get active peers excluding peers in toSkip. + @param toSkip peers to skip + @param active a number of active peers + @param disabled a number of peers with tx reduce-relay + feature disabled + @param enabledInSkip a number of peers with tx reduce-relay + feature enabled and in toSkip + @return active peers less peers in toSkip + */ + PeerSequence + getActivePeers( + std::set const& toSkip, + std::size_t& active, + std::size_t& disabled, + std::size_t& enabledInSkip) const; + void checkTracking(std::uint32_t) override; std::shared_ptr @@ -223,6 +243,12 @@ public: uint256 const& uid, PublicKey const& validator) override; + void + relay( + uint256 const&, + protocol::TMTransaction& m, + std::set const& skip) override; + std::shared_ptr getManifestsMessage(); @@ -411,6 +437,25 @@ public: void deletePeer(Peer::id_t id); + Json::Value + txMetrics() const override + { + return txMetrics_.json(); + } + + /** Add tx reduce-relay metrics. */ + template + void + addTxMetrics(Args... args) + { + if (!strand_.running_in_this_thread()) + return post( + strand_, + std::bind(&OverlayImpl::addTxMetrics, this, args...)); + + txMetrics_.addMetrics(args...); + } + private: void squelch( @@ -518,6 +563,10 @@ private: void sendEndpoints(); + /** Send once a second transactions' hashes aggregated by peers. */ + void + sendTxQueue(); + /** Check if peers stopped relaying messages * and if slots stopped receiving messages from the validator */ void diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 103d21d80a..e6d75bf44d 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -108,6 +109,10 @@ PeerImp::PeerImp( app_.config().COMPRESSION) ? Compressed::On : Compressed::Off) + , txReduceRelayEnabled_(peerFeatureEnabled( + headers_, + FEATURE_TXRR, + app_.config().TX_REDUCE_RELAY_ENABLE)) , vpReduceRelayEnabled_(peerFeatureEnabled( headers_, FEATURE_VPRR, @@ -118,11 +123,13 @@ PeerImp::PeerImp( app_.config().LEDGER_REPLAY)) , ledgerReplayMsgHandler_(app, app.getLedgerReplayer()) { - JLOG(journal_.debug()) << " compression enabled " - << (compressionEnabled_ == Compressed::On) - << " vp reduce-relay enabled " - << vpReduceRelayEnabled_ << " on " << remote_address_ - << " " << id_; + JLOG(journal_.info()) << "compression enabled " + << (compressionEnabled_ == Compressed::On) + << " vp reduce-relay enabled " + << vpReduceRelayEnabled_ + << " tx reduce-relay enabled " + << txReduceRelayEnabled_ << " on " << remote_address_ + << " " << id_; } PeerImp::~PeerImp() @@ -285,6 +292,54 @@ PeerImp::send(std::shared_ptr const& m) std::placeholders::_2))); } +void +PeerImp::sendTxQueue() +{ + if (!strand_.running_in_this_thread()) + return post( + strand_, std::bind(&PeerImp::sendTxQueue, shared_from_this())); + + if (!txQueue_.empty()) + { + protocol::TMHaveTransactions ht; + std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) { + ht.add_hashes(hash.data(), hash.size()); + }); + JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size(); + txQueue_.clear(); + send(std::make_shared(ht, protocol::mtHAVE_TRANSACTIONS)); + } +} + +void +PeerImp::addTxQueue(uint256 const& hash) +{ + if (!strand_.running_in_this_thread()) + return post( + strand_, std::bind(&PeerImp::addTxQueue, shared_from_this(), hash)); + + if (txQueue_.size() == reduce_relay::MAX_TX_QUEUE_SIZE) + { + JLOG(p_journal_.warn()) << "addTxQueue exceeds the cap"; + sendTxQueue(); + } + + txQueue_.insert(hash); + JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size(); +} + +void +PeerImp::removeTxQueue(uint256 const& hash) +{ + if (!strand_.running_in_this_thread()) + return post( + strand_, + std::bind(&PeerImp::removeTxQueue, shared_from_this(), hash)); + + auto removed = txQueue_.erase(hash); + JLOG(p_journal_.trace()) << "removeTxQueue " << removed; +} + void PeerImp::charge(Resource::Charge const& fee) { @@ -967,8 +1022,25 @@ PeerImp::onMessageBegin( load_event_ = app_.getJobQueue().makeLoadEvent(jtPEER, protocolMessageName(type)); fee_ = Resource::feeLightPeer; - overlay_.reportTraffic( - TrafficCount::categorize(*m, type, true), true, static_cast(size)); + auto const category = TrafficCount::categorize(*m, type, true); + overlay_.reportTraffic(category, true, static_cast(size)); + using namespace protocol; + if ((type == MessageType::mtTRANSACTION || + type == MessageType::mtHAVE_TRANSACTIONS || + type == MessageType::mtTRANSACTIONS || + // GET_OBJECTS + category == TrafficCount::category::get_transactions || + // GET_LEDGER + category == TrafficCount::category::ld_tsc_get || + category == TrafficCount::category::ld_tsc_share || + // LEDGER_DATA + category == TrafficCount::category::gl_tsc_share || + category == TrafficCount::category::gl_tsc_get) && + (txReduceRelayEnabled() || app_.config().TX_REDUCE_RELAY_METRICS)) + { + overlay_.addTxMetrics( + static_cast(type), static_cast(size)); + } JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " " << uncompressed_size << " " << isCompressed; } @@ -1440,6 +1512,14 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) +{ + handleTransaction(m, true); +} + +void +PeerImp::handleTransaction( + std::shared_ptr const& m, + bool eraseTxQueue) { if (tracking_.load() == Tracking::diverged) return; @@ -1472,6 +1552,11 @@ PeerImp::onMessage(std::shared_ptr const& m) JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID; } + // Erase only if the server has seen this tx. If the server has not + // seen this tx then the tx could not has been queued for this peer. + else if (eraseTxQueue && txReduceRelayEnabled()) + removeTxQueue(txID); + return; } @@ -2509,6 +2594,9 @@ PeerImp::onMessage(std::shared_ptr const& m) { protocol::TMGetObjectByHash& packet = *m; + JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type() + << " " << packet.objects_size(); + if (packet.query()) { // this is a query @@ -2524,6 +2612,25 @@ PeerImp::onMessage(std::shared_ptr const& m) return; } + if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS) + { + if (!txReduceRelayEnabled()) + { + JLOG(p_journal_.error()) + << "TMGetObjectByHash: tx reduce-relay is disabled"; + fee_ = Resource::feeInvalidRequest; + return; + } + + std::weak_ptr weak = shared_from_this(); + app_.getJobQueue().addJob( + jtREQUESTED_TXN, "doTransactions", [weak, m](Job&) { + if (auto peer = weak.lock()) + peer->doTransactions(m); + }); + return; + } + fee_ = Resource::feeMediumBurdenPeer; protocol::TMGetObjectByHash reply; @@ -2644,6 +2751,98 @@ PeerImp::onMessage(std::shared_ptr const& m) } } +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + if (!txReduceRelayEnabled()) + { + JLOG(p_journal_.error()) + << "TMHaveTransactions: tx reduce-relay is disabled"; + fee_ = Resource::feeInvalidRequest; + return; + } + + std::weak_ptr weak = shared_from_this(); + app_.getJobQueue().addJob( + jtMISSING_TXN, "handleHaveTransactions", [weak, m](Job&) { + if (auto peer = weak.lock()) + peer->handleHaveTransactions(m); + }); +} + +void +PeerImp::handleHaveTransactions( + std::shared_ptr const& m) +{ + protocol::TMGetObjectByHash tmBH; + tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS); + tmBH.set_query(true); + + JLOG(p_journal_.trace()) + << "received TMHaveTransactions " << m->hashes_size(); + + for (std::uint32_t i = 0; i < m->hashes_size(); i++) + { + if (!stringIsUint256Sized(m->hashes(i))) + { + JLOG(p_journal_.error()) + << "TMHaveTransactions with invalid hash size"; + fee_ = Resource::feeInvalidRequest; + return; + } + + uint256 hash(m->hashes(i)); + + auto txn = app_.getMasterTransaction().fetch_from_cache(hash); + + JLOG(p_journal_.trace()) << "checking transaction " << (bool)txn; + + if (!txn) + { + JLOG(p_journal_.debug()) << "adding transaction to request"; + + auto obj = tmBH.add_objects(); + obj->set_hash(hash.data(), hash.size()); + } + else + { + // Erase only if a peer has seen this tx. If the peer has not + // seen this tx then the tx could not has been queued for this + // peer. + removeTxQueue(hash); + } + } + + JLOG(p_journal_.trace()) + << "transaction request object is " << tmBH.objects_size(); + + if (tmBH.objects_size() > 0) + send(std::make_shared(tmBH, protocol::mtGET_OBJECTS)); +} + +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + if (!txReduceRelayEnabled()) + { + JLOG(p_journal_.error()) + << "TMTransactions: tx reduce-relay is disabled"; + fee_ = Resource::feeInvalidRequest; + return; + } + + JLOG(p_journal_.trace()) + << "received TMTransactions " << m->transactions_size(); + + overlay_.addTxMetrics(m->transactions_size()); + + for (std::uint32_t i = 0; i < m->transactions_size(); ++i) + handleTransaction( + std::shared_ptr( + m->mutable_transactions(i), [](protocol::TMTransaction*) {}), + false); +} + void PeerImp::onMessage(std::shared_ptr const& m) { @@ -2740,6 +2939,61 @@ PeerImp::doFetchPack(const std::shared_ptr& packet) }); } +void +PeerImp::doTransactions( + std::shared_ptr const& packet) +{ + protocol::TMTransactions reply; + + JLOG(p_journal_.trace()) << "received TMGetObjectByHash requesting tx " + << packet->objects_size(); + + if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE) + { + JLOG(p_journal_.error()) << "doTransactions, invalid number of hashes"; + fee_ = Resource::feeInvalidRequest; + return; + } + + for (std::uint32_t i = 0; i < packet->objects_size(); ++i) + { + auto const& obj = packet->objects(i); + + if (!stringIsUint256Sized(obj.hash())) + { + fee_ = Resource::feeInvalidRequest; + return; + } + + uint256 hash(obj.hash()); + + auto txn = app_.getMasterTransaction().fetch_from_cache(hash); + + if (!txn) + { + JLOG(p_journal_.error()) << "doTransactions, transaction not found " + << Slice(hash.data(), hash.size()); + fee_ = Resource::feeInvalidRequest; + return; + } + + Serializer s; + auto tx = reply.add_transactions(); + auto sttx = txn->getSTransaction(); + sttx->add(s); + tx->set_rawtransaction(s.data(), s.size()); + tx->set_status( + txn->getStatus() == INCLUDED ? protocol::tsCURRENT + : protocol::tsNEW); + tx->set_receivetimestamp( + app_.timeKeeper().now().time_since_epoch().count()); + tx->set_deferred(txn->getSubmitResult().queued); + } + + if (reply.transactions_size() > 0) + send(std::make_shared(reply, protocol::mtTRANSACTIONS)); +} + void PeerImp::checkTransaction( int flags, diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 9c511000c7..f7bfbbc9d6 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -167,6 +168,13 @@ private: std::mutex mutable shardInfoMutex_; Compressed compressionEnabled_ = Compressed::Off; + + // Queue of transactions' hashes that have not been + // relayed. The hashes are sent once a second to a peer + // and the peer requests missing transactions from the node. + hash_set txQueue_; + // true if tx reduce-relay feature is enabled on the peer. + bool txReduceRelayEnabled_ = false; // true if validation/proposal reduce-relay feature is enabled // on the peer. bool vpReduceRelayEnabled_ = false; @@ -255,7 +263,7 @@ public: } // Work-around for calling shared_from_this in constructors - void + virtual void run(); // Called when Overlay gets a stop request. @@ -269,6 +277,22 @@ public: void send(std::shared_ptr const& m) override; + /** Send aggregated transactions' hashes */ + void + sendTxQueue() override; + + /** Add transaction's hash to the transactions' hashes queue + @param hash transaction's hash + */ + void + addTxQueue(uint256 const& hash) override; + + /** Remove transaction's hash from the transactions' hashes queue + @param hash transaction's hash + */ + void + removeTxQueue(uint256 const& hash) override; + /** Send a set of PeerFinder endpoints as a protocol message. */ template < class FwdIt, @@ -401,6 +425,12 @@ public: return compressionEnabled_ == Compressed::On; } + bool + txReduceRelayEnabled() const override + { + return txReduceRelayEnabled_; + } + private: void close(); @@ -453,6 +483,29 @@ private: void onWriteMessage(error_code ec, std::size_t bytes_transferred); + /** Called from onMessage(TMTransaction(s)). + @param m Transaction protocol message + @param eraseTxQueue is true when called from onMessage(TMTransaction) + and is false when called from onMessage(TMTransactions). If true then + the transaction hash is erased from txQueue_. Don't need to erase from + the queue when called from onMessage(TMTransactions) because this + message is a response to the missing transactions request and the queue + would not have any of these transactions. + */ + void + handleTransaction( + std::shared_ptr const& m, + bool eraseTxQueue); + + /** Handle protocol message with hashes of transactions that have not + been relayed by an upstream node down to its peers - request + transactions, which have not been relayed to this peer. + @param m protocol message with transactions' hashes + */ + void + handleHaveTransactions( + std::shared_ptr const& m); + // Check if reduce-relay feature is enabled and // reduce_relay::WAIT_ON_BOOTUP time passed since the start bool @@ -518,6 +571,10 @@ public: void onMessage(std::shared_ptr const& m); void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void onMessage(std::shared_ptr const& m); void onMessage(std::shared_ptr const& m); @@ -547,6 +604,13 @@ private: std::uint32_t version, std::vector const& blobs); + /** Process peer's request to send missing transactions. The request is + sent in response to TMHaveTransactions. + @param packet protocol message containing missing transactions' hashes. + */ + void + doTransactions(std::shared_ptr const& packet); + void checkTransaction( int flags, @@ -628,6 +692,10 @@ PeerImp::PeerImp( app_.config().COMPRESSION) ? Compressed::On : Compressed::Off) + , txReduceRelayEnabled_(peerFeatureEnabled( + headers_, + FEATURE_TXRR, + app_.config().TX_REDUCE_RELAY_ENABLE)) , vpReduceRelayEnabled_(peerFeatureEnabled( headers_, FEATURE_VPRR, @@ -640,11 +708,13 @@ PeerImp::PeerImp( { read_buffer_.commit(boost::asio::buffer_copy( read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers)); - JLOG(journal_.debug()) << "compression enabled " - << (compressionEnabled_ == Compressed::On) - << " vp reduce-relay enabled " - << vpReduceRelayEnabled_ << " on " << remote_address_ - << " " << id_; + JLOG(journal_.info()) << "compression enabled " + << (compressionEnabled_ == Compressed::On) + << " vp reduce-relay enabled " + << vpReduceRelayEnabled_ + << " tx reduce-relay enabled " + << txReduceRelayEnabled_ << " on " << remote_address_ + << " " << id_; } template diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index bbf7c1e16c..09861353a1 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -95,6 +95,10 @@ protocolMessageName(int type) return "peer_shard_info"; case protocol::mtGET_OBJECTS: return "get_objects"; + case protocol::mtHAVE_TRANSACTIONS: + return "have_transactions"; + case protocol::mtTRANSACTIONS: + return "transactions"; case protocol::mtSQUELCH: return "squelch"; case protocol::mtPROOF_PATH_REQ: @@ -453,6 +457,14 @@ invokeProtocolMessage( success = detail::invoke( *header, buffers, handler); break; + case protocol::mtHAVE_TRANSACTIONS: + success = detail::invoke( + *header, buffers, handler); + break; + case protocol::mtTRANSACTIONS: + success = detail::invoke( + *header, buffers, handler); + break; case protocol::mtSQUELCH: success = detail::invoke(*header, buffers, handler); diff --git a/src/ripple/overlay/impl/TrafficCount.cpp b/src/ripple/overlay/impl/TrafficCount.cpp index 5ae5abc3bf..9b35d47683 100644 --- a/src/ripple/overlay/impl/TrafficCount.cpp +++ b/src/ripple/overlay/impl/TrafficCount.cpp @@ -138,6 +138,9 @@ TrafficCount::categorize( ? TrafficCount::category::share_fetch_pack : TrafficCount::category::get_fetch_pack; + if (msg->type() == protocol::TMGetObjectByHash::otTRANSACTIONS) + return TrafficCount::category::get_transactions; + return (msg->query() == inbound) ? TrafficCount::category::share_hash : TrafficCount::category::get_hash; } @@ -154,6 +157,12 @@ TrafficCount::categorize( if (type == protocol::mtREPLAY_DELTA_RESPONSE) return TrafficCount::category::replay_delta_response; + if (type == protocol::mtHAVE_TRANSACTIONS) + return TrafficCount::category::have_transactions; + + if (type == protocol::mtTRANSACTIONS) + return TrafficCount::category::requested_transactions; + return TrafficCount::category::unknown; } diff --git a/src/ripple/overlay/impl/TrafficCount.h b/src/ripple/overlay/impl/TrafficCount.h index fbb3fb9ed1..9e212da791 100644 --- a/src/ripple/overlay/impl/TrafficCount.h +++ b/src/ripple/overlay/impl/TrafficCount.h @@ -136,6 +136,9 @@ public: share_fetch_pack, get_fetch_pack, + // TMGetObjectByHash: transactions + get_transactions, + // TMGetObjectByHash: generic share_hash, get_hash, @@ -148,6 +151,12 @@ public: replay_delta_request, replay_delta_response, + // TMHaveTransactions + have_transactions, + + // TMTransactions + requested_transactions, + unknown // must be last }; @@ -230,13 +239,16 @@ protected: {"getobject_CAS_get"}, // category::get_cas_object {"getobject_Fetch_Pack_share"}, // category::share_fetch_pack {"getobject_Fetch Pack_get"}, // category::get_fetch_pack + {"getobject_Transactions_get"}, // category::get_transactions {"getobject_share"}, // category::share_hash {"getobject_get"}, // category::get_hash {"proof_path_request"}, // category::proof_path_request - {"proof_path_response"}, // category::proof_path_response - {"replay_delta_request"}, // category::replay_delta_request - {"replay_delta_response"}, // category::replay_delta_response - {"unknown"} // category::unknown + {"proof_path_response"}, // category::proof_path_response + {"replay_delta_request"}, // category::replay_delta_request + {"replay_delta_response"}, // category::replay_delta_response + {"have_transactions"}, // category::have_transactions + {"requested_transactions"}, // category::transactions + {"unknown"} // category::unknown }}; }; diff --git a/src/ripple/overlay/impl/TxMetrics.cpp b/src/ripple/overlay/impl/TxMetrics.cpp new file mode 100644 index 0000000000..f77f746b3d --- /dev/null +++ b/src/ripple/overlay/impl/TxMetrics.cpp @@ -0,0 +1,151 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "ripple/overlay/impl/TxMetrics.h" +#include "ripple/protocol/jss.h" + +#include + +namespace ripple { + +namespace metrics { + +void +TxMetrics::addMetrics(protocol::MessageType type, std::uint32_t val) +{ + auto add = [&](auto& m, std::uint32_t val) { + std::lock_guard lock(mutex); + m.addMetrics(val); + }; + + switch (type) + { + case protocol::MessageType::mtTRANSACTION: + add(tx, val); + break; + case protocol::MessageType::mtHAVE_TRANSACTIONS: + add(haveTx, val); + break; + case protocol::MessageType::mtGET_LEDGER: + add(getLedger, val); + break; + case protocol::MessageType::mtLEDGER_DATA: + add(ledgerData, val); + break; + case protocol::MessageType::mtTRANSACTIONS: + add(transactions, val); + break; + default: + return; + } +} + +void +TxMetrics::addMetrics( + std::uint32_t selected, + std::uint32_t suppressed, + std::uint32_t notenabled) +{ + std::lock_guard lock(mutex); + selectedPeers.addMetrics(selected); + suppressedPeers.addMetrics(suppressed); + notEnabled.addMetrics(notenabled); +} + +void +TxMetrics::addMetrics(std::uint32_t missing) +{ + std::lock_guard lock(mutex); + missingTx.addMetrics(missing); +} + +void +MultipleMetrics::addMetrics(std::uint32_t val2) +{ + addMetrics(1, val2); +} + +void +MultipleMetrics::addMetrics(std::uint32_t val1, std::uint32_t val2) +{ + m1.addMetrics(val1); + m2.addMetrics(val2); +} + +void +SingleMetrics::addMetrics(std::uint32_t val) +{ + using namespace std::chrono_literals; + accum += val; + N++; + auto const timeElapsed = clock_type::now() - intervalStart; + auto const timeElapsedInSecs = + std::chrono::duration_cast(timeElapsed); + + if (timeElapsedInSecs >= 1s) + { + auto const avg = accum / (perTimeUnit ? timeElapsedInSecs.count() : N); + rollingAvgAggreg.push_back(avg); + + auto const total = std::accumulate( + rollingAvgAggreg.begin(), rollingAvgAggreg.end(), 0ull); + rollingAvg = total / rollingAvgAggreg.size(); + + intervalStart = clock_type::now(); + accum = 0; + N = 0; + } +} + +Json::Value +TxMetrics::json() const +{ + std::lock_guard l(mutex); + + Json::Value ret(Json::objectValue); + + ret[jss::txr_tx_cnt] = std::to_string(tx.m1.rollingAvg); + ret[jss::txr_tx_sz] = std::to_string(tx.m2.rollingAvg); + + ret[jss::txr_have_txs_cnt] = std::to_string(haveTx.m1.rollingAvg); + ret[jss::txr_have_txs_sz] = std::to_string(haveTx.m2.rollingAvg); + + ret[jss::txr_get_ledger_cnt] = std::to_string(getLedger.m1.rollingAvg); + ret[jss::txr_get_ledger_sz] = std::to_string(getLedger.m2.rollingAvg); + + ret[jss::txr_ledger_data_cnt] = std::to_string(ledgerData.m1.rollingAvg); + ret[jss::txr_ledger_data_sz] = std::to_string(ledgerData.m2.rollingAvg); + + ret[jss::txr_transactions_cnt] = std::to_string(transactions.m1.rollingAvg); + ret[jss::txr_transactions_sz] = std::to_string(transactions.m2.rollingAvg); + + ret[jss::txr_selected_cnt] = std::to_string(selectedPeers.rollingAvg); + + ret[jss::txr_suppressed_cnt] = std::to_string(suppressedPeers.rollingAvg); + + ret[jss::txr_not_enabled_cnt] = std::to_string(notEnabled.rollingAvg); + + ret[jss::txr_missing_tx_freq] = std::to_string(missingTx.rollingAvg); + + return ret; +} + +} // namespace metrics + +} // namespace ripple diff --git a/src/ripple/overlay/impl/TxMetrics.h b/src/ripple/overlay/impl/TxMetrics.h new file mode 100644 index 0000000000..a37d7e7a48 --- /dev/null +++ b/src/ripple/overlay/impl/TxMetrics.h @@ -0,0 +1,141 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_TXMETRICS_H_INCLUDED +#define RIPPLE_OVERLAY_TXMETRICS_H_INCLUDED + +#include "ripple/json/json_value.h" +#include "ripple/protocol/messages.h" + +#include + +#include +#include + +namespace ripple { + +namespace metrics { + +/** Run single metrics rolling average. Can be either average of a value + per second or average of a value's sample per second. For instance, + for transaction it makes sense to have transaction bytes and count + per second, but for a number of selected peers to relay per transaction + it makes sense to have sample's average. + */ +struct SingleMetrics +{ + /** Class constructor + @param ptu if true then calculate metrics per second, otherwise + sample's average + */ + SingleMetrics(bool ptu = true) : perTimeUnit(ptu) + { + } + using clock_type = std::chrono::steady_clock; + clock_type::time_point intervalStart{clock_type::now()}; + std::uint64_t accum{0}; + std::uint64_t rollingAvg{0}; + std::uint32_t N{0}; + bool perTimeUnit{true}; + boost::circular_buffer rollingAvgAggreg{30, 0ull}; + /** Add metrics value + * @param val metrics value, either bytes or count + */ + void + addMetrics(std::uint32_t val); +}; + +/** Run two metrics. For instance message size and count for + protocol messages. */ +struct MultipleMetrics +{ + MultipleMetrics(bool ptu1 = true, bool ptu2 = true) : m1(ptu1), m2(ptu2) + { + } + + SingleMetrics m1; + SingleMetrics m2; + /** Add metrics to m2. m1 in this case aggregates the frequency. + @param val2 m2 metrics value + */ + void + addMetrics(std::uint32_t val2); + /** Add metrics to m1 and m2. + * @param val1 m1 metrics value + * @param val2 m2 metrics value + */ + void + addMetrics(std::uint32_t val1, std::uint32_t val2); +}; + +/** Run transaction reduce-relay feature related metrics */ +struct TxMetrics +{ + mutable std::mutex mutex; + // TMTransaction bytes and count per second + MultipleMetrics tx; + // TMHaveTransactions bytes and count per second + MultipleMetrics haveTx; + // TMGetLedger bytes and count per second + MultipleMetrics getLedger; + // TMLedgerData bytes and count per second + MultipleMetrics ledgerData; + // TMTransactions bytes and count per second + MultipleMetrics transactions; + // Peers selected to relay in each transaction sample average + SingleMetrics selectedPeers{false}; + // Peers suppressed to relay in each transaction sample average + SingleMetrics suppressedPeers{false}; + // Peers with tx reduce-relay feature not enabled + SingleMetrics notEnabled{false}; + // TMTransactions number of transactions count per second + SingleMetrics missingTx; + /** Add protocol message metrics + @param type protocol message type + @param val message size in bytes + */ + void + addMetrics(protocol::MessageType type, std::uint32_t val); + /** Add peers selected for relaying and suppressed peers metrics. + @param selected number of selected peers to relay + @param suppressed number of suppressed peers + @param notEnabled number of peers with tx reduce-relay featured disabled + */ + void + addMetrics( + std::uint32_t selected, + std::uint32_t suppressed, + std::uint32_t notEnabled); + /** Add number of missing transactions that a node requested + @param missing number of missing transactions + */ + void + addMetrics(std::uint32_t missing); + /** Get json representation of the metrics + @return json object + */ + Json::Value + json() const; +}; + +} // namespace metrics + +} // namespace ripple + +#endif \ No newline at end of file diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 5306aee7b7..74cbfe8f6c 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -31,6 +31,8 @@ enum MessageType mtREPLAY_DELTA_RESPONSE = 60; mtGET_PEER_SHARD_INFO_V2 = 61; mtPEER_SHARD_INFO_V2 = 62; + mtHAVE_TRANSACTIONS = 63; + mtTRANSACTIONS = 64; } // token, iterations, target, challenge = issue demand for proof of work @@ -176,6 +178,11 @@ message TMTransaction optional bool deferred = 4; // not applied to open ledger } +message TMTransactions +{ + repeated TMTransaction transactions = 1; +} + enum NodeStatus { @@ -316,6 +323,7 @@ message TMGetObjectByHash otSTATE_NODE = 4; otCAS_OBJECT = 5; otFETCH_PACK = 6; + otTRANSACTIONS = 7; } required ObjectType type = 1; @@ -437,3 +445,8 @@ message TMReplayDeltaResponse optional TMReplyError error = 4; } +message TMHaveTransactions +{ + repeated bytes hashes = 1; +} + diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index 1b0c4793aa..f1218474ae 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -570,6 +570,20 @@ JSS(tx_json); // in/out: TransactionSign JSS(tx_signing_hash); // out: TransactionSign JSS(tx_unsigned); // out: TransactionSign JSS(txn_count); // out: NetworkOPs +JSS(txr_tx_cnt); // out: protocol message tx's count +JSS(txr_tx_sz); // out: protocol message tx's size +JSS(txr_have_txs_cnt); // out: protocol message have tx count +JSS(txr_have_txs_sz); // out: protocol message have tx size +JSS(txr_get_ledger_cnt); // out: protocol message get ledger count +JSS(txr_get_ledger_sz); // out: protocol message get ledger size +JSS(txr_ledger_data_cnt); // out: protocol message ledger data count +JSS(txr_ledger_data_sz); // out: protocol message ledger data size +JSS(txr_transactions_cnt); // out: protocol message get object count +JSS(txr_transactions_sz); // out: protocol message get object size +JSS(txr_selected_cnt); // out: selected peers count +JSS(txr_suppressed_cnt); // out: suppressed peers count +JSS(txr_not_enabled_cnt); // out: peers with tx reduce-relay disabled count +JSS(txr_missing_tx_freq); // out: missing tx frequency average JSS(txs); // out: TxHistory JSS(type); // in: AccountObjects // out: NetworkOPs diff --git a/src/ripple/rpc/handlers/Handlers.h b/src/ripple/rpc/handlers/Handlers.h index ec8a0ff355..5fb42268f5 100644 --- a/src/ripple/rpc/handlers/Handlers.h +++ b/src/ripple/rpc/handlers/Handlers.h @@ -141,6 +141,8 @@ doTxJson(RPC::JsonContext&); Json::Value doTxHistory(RPC::JsonContext&); Json::Value +doTxReduceRelay(RPC::JsonContext&); +Json::Value doUnlList(RPC::JsonContext&); Json::Value doUnsubscribe(RPC::JsonContext&); diff --git a/src/ripple/rpc/handlers/TxReduceRelay.cpp b/src/ripple/rpc/handlers/TxReduceRelay.cpp new file mode 100644 index 0000000000..bb883b8dea --- /dev/null +++ b/src/ripple/rpc/handlers/TxReduceRelay.cpp @@ -0,0 +1,33 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include + +namespace ripple { + +Json::Value +doTxReduceRelay(RPC::JsonContext& context) +{ + return context.app.overlay().txMetrics(); +} + +} // namespace ripple diff --git a/src/ripple/rpc/impl/Handler.cpp b/src/ripple/rpc/impl/Handler.cpp index c4360b7a5b..6e313931f8 100644 --- a/src/ripple/rpc/impl/Handler.cpp +++ b/src/ripple/rpc/impl/Handler.cpp @@ -147,6 +147,7 @@ Handler const handlerArray[]{ {"transaction_entry", byRef(&doTransactionEntry), Role::USER, NO_CONDITION}, {"tx", byRef(&doTxJson), Role::USER, NEEDS_NETWORK_CONNECTION}, {"tx_history", byRef(&doTxHistory), Role::USER, NO_CONDITION}, + {"tx_reduce_relay", byRef(&doTxReduceRelay), Role::USER, NO_CONDITION}, {"unl_list", byRef(&doUnlList), Role::ADMIN, NO_CONDITION}, {"validation_create", byRef(&doValidationCreate), diff --git a/src/test/app/LedgerReplay_test.cpp b/src/test/app/LedgerReplay_test.cpp index 7e1ecf9024..549495d40b 100644 --- a/src/test/app/LedgerReplay_test.cpp +++ b/src/test/app/LedgerReplay_test.cpp @@ -289,6 +289,23 @@ public: { return false; } + void + sendTxQueue() override + { + } + void + addTxQueue(const uint256&) override + { + } + void + removeTxQueue(const uint256&) override + { + } + bool + txReduceRelayEnabled() const override + { + return false; + } bool ledgerReplayEnabled_; }; @@ -1060,7 +1077,8 @@ struct LedgerReplayer_test : public beast::unit_test::suite { testcase("handshake test"); auto handshake = [&](bool client, bool server, bool expecting) -> bool { - auto request = ripple::makeRequest(true, false, false, client); + auto request = + ripple::makeRequest(true, false, client, false, false); http_request_type http_request; http_request.version(request.version()); http_request.base() = request.base(); diff --git a/src/test/overlay/compression_test.cpp b/src/test/overlay/compression_test.cpp index 66155da20b..e24cb09f96 100644 --- a/src/test/overlay/compression_test.cpp +++ b/src/test/overlay/compression_test.cpp @@ -488,8 +488,9 @@ public: auto request = ripple::makeRequest( true, env->app().config().COMPRESSION, - env->app().config().VP_REDUCE_RELAY_ENABLE, - false); + false, + env->app().config().TX_REDUCE_RELAY_ENABLE, + env->app().config().VP_REDUCE_RELAY_ENABLE); http_request_type http_request; http_request.version(request.version()); http_request.base() = request.base(); diff --git a/src/test/overlay/reduce_relay_test.cpp b/src/test/overlay/reduce_relay_test.cpp index dec1d87988..1839220dc4 100644 --- a/src/test/overlay/reduce_relay_test.cpp +++ b/src/test/overlay/reduce_relay_test.cpp @@ -159,6 +159,23 @@ public: { return false; } + bool + txReduceRelayEnabled() const override + { + return false; + } + void + sendTxQueue() override + { + } + void + addTxQueue(const uint256&) override + { + } + void + removeTxQueue(const uint256&) override + { + } }; /** Manually advanced clock. */ @@ -491,7 +508,7 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler public: using id_t = Peer::id_t; using clock_type = ManualClock; - OverlaySim(Application& app) : slots_(app, *this), app_(app) + OverlaySim(Application& app) : slots_(app.logs(), *this), logs_(app.logs()) { } @@ -545,7 +562,7 @@ public: Peer::id_t id; if (peersCache_.empty() || !useCache) { - peer = std::make_shared(*this, app_.journal("Squelch")); + peer = std::make_shared(*this, logs_.journal("Squelch")); id = peer->id(); } else @@ -665,7 +682,7 @@ private: Peers peers_; Peers peersCache_; reduce_relay::Slots slots_; - Application& app_; + Logs& logs_; }; class Network @@ -1398,7 +1415,8 @@ vp_squelched=1 auto run = [&](int npeers) { handler.maxDuration_ = 0; - reduce_relay::Slots slots(env_.app(), handler); + reduce_relay::Slots slots( + env_.app().logs(), handler); // 1st message from a new peer switches the slot // to counting state and resets the counts of all peers + // MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold @@ -1494,8 +1512,9 @@ vp_squelched=1 auto request = ripple::makeRequest( true, env_.app().config().COMPRESSION, - env_.app().config().VP_REDUCE_RELAY_ENABLE, - false); + false, + env_.app().config().TX_REDUCE_RELAY_ENABLE, + env_.app().config().VP_REDUCE_RELAY_ENABLE); http_request_type http_request; http_request.version(request.version()); http_request.base() = request.base(); diff --git a/src/test/overlay/tx_reduce_relay_test.cpp b/src/test/overlay/tx_reduce_relay_test.cpp new file mode 100644 index 0000000000..7b26ad0962 --- /dev/null +++ b/src/test/overlay/tx_reduce_relay_test.cpp @@ -0,0 +1,275 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== +#include +#include +#include +#include +#include +#include + +namespace ripple { + +namespace test { + +class tx_reduce_relay_test : public beast::unit_test::suite +{ +public: + using socket_type = boost::asio::ip::tcp::socket; + using middle_type = boost::beast::tcp_stream; + using stream_type = boost::beast::ssl_stream; + using shared_context = std::shared_ptr; + +private: + void + doTest(const std::string& msg, bool log, std::function f) + { + testcase(msg); + f(log); + } + + void + testConfig(bool log) + { + doTest("Config Test", log, [&](bool log) { + auto test = [&](bool enable, + bool metrics, + std::uint16_t min, + std::uint16_t pct, + bool success = true) { + std::stringstream str("[reduce_relay]"); + str << "[reduce_relay]\n" + << "tx_enable=" << static_cast(enable) << "\n" + << "tx_metrics=" << static_cast(metrics) << "\n" + << "tx_min_peers=" << min << "\n" + << "tx_relay_percentage=" << pct << "\n"; + Config c; + try + { + c.loadFromString(str.str()); + + BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable); + BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics); + BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min); + BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct); + if (success) + pass(); + else + fail(); + } + catch (...) + { + if (success) + fail(); + else + pass(); + } + }; + + test(true, true, 20, 25); + test(false, false, 20, 25); + test(false, false, 20, 0, false); + test(false, false, 20, 101, false); + test(false, false, 9, 10, false); + test(false, false, 10, 9, false); + }); + } + + class PeerTest : public PeerImp + { + public: + PeerTest( + Application& app, + std::shared_ptr const& slot, + http_request_type&& request, + PublicKey const& publicKey, + ProtocolVersion protocol, + Resource::Consumer consumer, + std::unique_ptr&& stream_ptr, + OverlayImpl& overlay) + : PeerImp( + app, + sid_, + slot, + std::move(request), + publicKey, + protocol, + consumer, + std::move(stream_ptr), + overlay) + { + sid_++; + } + ~PeerTest() = default; + + void + run() override + { + } + void + send(std::shared_ptr const&) override + { + sendTx_++; + } + void + addTxQueue(const uint256& hash) override + { + queueTx_++; + } + static void + init() + { + queueTx_ = 0; + sendTx_ = 0; + sid_ = 0; + } + inline static std::size_t sid_ = 0; + inline static std::uint16_t queueTx_ = 0; + inline static std::uint16_t sendTx_ = 0; + }; + + std::uint16_t lid_{0}; + std::uint16_t rid_{1}; + shared_context context_; + ProtocolVersion protocolVersion_; + boost::beast::multi_buffer read_buf_; + +public: + tx_reduce_relay_test() + : context_(make_SSLContext("")), protocolVersion_{1, 7} + { + } + +private: + void + addPeer( + jtx::Env& env, + std::vector>& peers, + std::uint16_t& nDisabled) + { + auto& overlay = dynamic_cast(env.app().overlay()); + boost::beast::http::request request; + (nDisabled == 0) + ? (void)request.insert( + "X-Protocol-Ctl", + makeFeaturesRequestHeader(false, false, true, false)) + : (void)nDisabled--; + auto stream_ptr = std::make_unique( + socket_type(std::forward( + env.app().getIOService())), + *context_); + beast::IP::Endpoint local( + beast::IP::Address::from_string("172.1.1." + std::to_string(lid_))); + beast::IP::Endpoint remote( + beast::IP::Address::from_string("172.1.1." + std::to_string(rid_))); + PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519))); + auto consumer = overlay.resourceManager().newInboundEndpoint(remote); + auto slot = overlay.peerFinder().new_inbound_slot(local, remote); + auto const peer = std::make_shared( + env.app(), + slot, + std::move(request), + key, + protocolVersion_, + consumer, + std::move(stream_ptr), + overlay); + overlay.add_active(peer); + peers.emplace_back(peer); // overlay stores week ptr to PeerImp + lid_ += 2; + rid_ += 2; + assert(lid_ <= 254); + } + + void + testRelay( + std::string const& test, + bool txRREnabled, + std::uint16_t nPeers, + std::uint16_t nDisabled, + std::uint16_t minPeers, + std::uint16_t relayPercentage, + std::uint16_t expectRelay, + std::uint16_t expectQueue, + std::set const& toSkip = {}) + { + testcase(test); + jtx::Env env(*this); + std::vector> peers; + env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled; + env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers; + env.app().config().TX_RELAY_PERCENTAGE = relayPercentage; + PeerTest::init(); + lid_ = 0; + rid_ = 0; + for (int i = 0; i < nPeers; i++) + addPeer(env, peers, nDisabled); + protocol::TMTransaction m; + m.set_rawtransaction("transaction"); + m.set_deferred(false); + m.set_status(protocol::TransactionStatus::tsNEW); + env.app().overlay().relay(uint256{0}, m, toSkip); + BEAST_EXPECT( + PeerTest::sendTx_ == expectRelay && + PeerTest::queueTx_ == expectQueue); + } + + void + run() override + { + bool log = false; + std::set skip = {0, 1, 2, 3, 4}; + testConfig(log); + // relay to all peers, no hash queue + testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0); + // relay to nPeers - skip (10-5=5) + testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0, skip); + // relay to all peers because min is greater than nPeers + testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0); + // relay to all peers because min + disabled is greater thant nPeers + testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0); + // relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30), + // queue the rest (30) + testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30); + // relay to minPeers + 25% of (nPeers - nPeers) - skip + // (20+0.25*(60-20)-5=25), queue the rest, skip counts towards relayed + // (60-25-5=30) + testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip); + // relay to minPeers + disabled + 25% of (nPeers - minPeers - disalbed) + // (20+10+0.25*(70-20-10)=40), queue the rest (30) + testRelay("disabled", true, 70, 10, 20, 25, 40, 30); + // relay to minPeers + disabled-not-in-skip + 25% of (nPeers - minPeers + // - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts + // towards relayed (70-35-5=30)) + testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip); + // relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled) + // - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts + // towards relayed (15-5-10=0) + skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0, skip); + // relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled) + // - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts + // towards relayed (20-14=6) + skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}; + testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6, skip); + } +}; + +BEAST_DEFINE_TESTSUITE(tx_reduce_relay, ripple_data, ripple); +} // namespace test +} // namespace ripple \ No newline at end of file