diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index a585770d52..26f83fb798 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -535,6 +535,7 @@ target_sources (rippled PRIVATE src/ripple/overlay/impl/PeerSet.cpp src/ripple/overlay/impl/ProtocolVersion.cpp src/ripple/overlay/impl/TrafficCount.cpp + src/ripple/overlay/impl/TxMetrics.cpp #[===============================[ main sources: subdir: peerfinder @@ -613,6 +614,7 @@ target_sources (rippled PRIVATE src/ripple/rpc/handlers/TransactionEntry.cpp src/ripple/rpc/handlers/Tx.cpp src/ripple/rpc/handlers/TxHistory.cpp + src/ripple/rpc/handlers/TxReduceRelay.cpp src/ripple/rpc/handlers/UnlList.cpp src/ripple/rpc/handlers/Unsubscribe.cpp src/ripple/rpc/handlers/ValidationCreate.cpp @@ -864,6 +866,7 @@ target_sources (rippled PRIVATE src/test/overlay/compression_test.cpp src/test/overlay/reduce_relay_test.cpp src/test/overlay/handshake_test.cpp + src/test/overlay/tx_reduce_relay_test.cpp #[===============================[ test sources: subdir: peerfinder diff --git a/Builds/levelization/results/ordering.txt b/Builds/levelization/results/ordering.txt index 8efdd690c8..de0d6f7c09 100644 --- a/Builds/levelization/results/ordering.txt +++ b/Builds/levelization/results/ordering.txt @@ -163,6 +163,7 @@ test.overlay > ripple.basics test.overlay > ripple.beast test.overlay > ripple.core test.overlay > ripple.overlay +test.overlay > ripple.peerfinder test.overlay > ripple.protocol test.overlay > ripple.shamap test.overlay > test.jtx diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index 11c5ff6c30..859c66d315 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -187,8 +187,8 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx) msg.set_status(protocol::tsNEW); msg.set_receivetimestamp( app_.timeKeeper().now().time_since_epoch().count()); - app_.overlay().foreach(send_always( - std::make_shared(msg, protocol::mtTRANSACTION))); + static std::set skip{}; + app_.overlay().relay(tx.id(), msg, skip); } else { diff --git a/src/ripple/app/ledger/impl/OpenLedger.cpp b/src/ripple/app/ledger/impl/OpenLedger.cpp index 463e5c880a..113c46951a 100644 --- a/src/ripple/app/ledger/impl/OpenLedger.cpp +++ b/src/ripple/app/ledger/impl/OpenLedger.cpp @@ -150,9 +150,7 @@ OpenLedger::accept( msg.set_status(protocol::tsNEW); msg.set_receivetimestamp( app.timeKeeper().now().time_since_epoch().count()); - app.overlay().foreach(send_if_not( - std::make_shared(msg, protocol::mtTRANSACTION), - peer_in_set(*toSkip))); + app.overlay().relay(txId, msg, *toSkip); } } diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index b29133b1f6..ab96822359 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1342,9 +1342,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) app_.timeKeeper().now().time_since_epoch().count()); tx.set_deferred(e.result == terQUEUED); // FIXME: This should be when we received it - app_.overlay().foreach(send_if_not( - std::make_shared(tx, protocol::mtTRANSACTION), - peer_in_set(*toSkip))); + app_.overlay().relay(e.transaction->getID(), tx, *toSkip); e.transaction->setBroadcast(); } } diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index 146a40a796..30117ad13d 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -217,6 +217,21 @@ public: // Set log level to debug so that the feature function can be // analyzed. bool VP_REDUCE_RELAY_SQUELCH = false; + // Transaction reduce-relay feature + bool TX_REDUCE_RELAY_ENABLE = false; + // If tx reduce-relay feature is disabled + // and this flag is enabled then some + // tx-related metrics is collected. It + // is ignored if tx reduce-relay feature is + // enabled. It is used in debugging to compare + // metrics with the feature disabled/enabled. + bool TX_REDUCE_RELAY_METRICS = false; + // Minimum peers a server should have before + // selecting random peers + std::size_t TX_REDUCE_RELAY_MIN_PEERS = 20; + // Percentage of peers with the tx reduce-relay feature enabled + // to relay to out of total active peers + std::size_t TX_RELAY_PERCENTAGE = 25; // These override the command line client settings std::optional rpc_ip; diff --git a/src/ripple/core/Job.h b/src/ripple/core/Job.h index a5fbf5413e..15552be70c 100644 --- a/src/ripple/core/Job.h +++ b/src/ripple/core/Job.h @@ -52,6 +52,8 @@ enum JobType { jtRPC, // A websocket command from the client jtUPDATE_PF, // Update pathfinding requests jtTRANSACTION, // A transaction received from the network + jtMISSING_TXN, // Request missing transactions + jtREQUESTED_TXN, // Reply with requested transactions jtBATCH, // Apply batched transactions jtADVANCE, // Advance validated/acquired ledgers jtPUBLEDGER, // Publish a fully-accepted ledger diff --git a/src/ripple/core/JobTypes.h b/src/ripple/core/JobTypes.h index 3cd7794f6f..45f69a5308 100644 --- a/src/ripple/core/JobTypes.h +++ b/src/ripple/core/JobTypes.h @@ -84,6 +84,8 @@ private: add(jtNETOP_CLUSTER, "clusterReport", 1, false, 9999ms, 9999ms); add(jtNETOP_TIMER, "heartbeat", 1, false, 999ms, 999ms); add(jtADMIN, "administration", maxLimit, false, 0ms, 0ms); + add(jtMISSING_TXN, "handleHaveTransactions", 1200, false, 0ms, 0ms); + add(jtREQUESTED_TXN, "doTransactions", 1200, false, 0ms, 0ms); add(jtPEER, "peerCommand", 0, true, 200ms, 2500ms); add(jtDISK, "diskAccess", 0, true, 500ms, 1000ms); diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index d67b39c004..08c30f0da9 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -641,6 +641,17 @@ Config::loadFromString(std::string const& fileContents) auto sec = section(SECTION_REDUCE_RELAY); VP_REDUCE_RELAY_ENABLE = sec.value_or("vp_enable", false); VP_REDUCE_RELAY_SQUELCH = sec.value_or("vp_squelch", false); + TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false); + TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false); + TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20); + TX_RELAY_PERCENTAGE = sec.value_or("tx_relay_percentage", 25); + if (TX_RELAY_PERCENTAGE < 10 || TX_RELAY_PERCENTAGE > 100 || + TX_REDUCE_RELAY_MIN_PEERS < 10) + Throw( + "Invalid " SECTION_REDUCE_RELAY + ", tx_min_peers must be greater or equal to 10" + ", tx_relay_percentage must be greater or equal to 10 " + "and less or equal to 100"); } if (getSingleSection(secConfig, SECTION_MAX_TRANSACTIONS, strTemp, j_)) diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index 65050ca26f..844eafb86c 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -173,6 +173,19 @@ public: uint256 const& uid, PublicKey const& validator) = 0; + /** Relay a transaction. If the tx reduce-relay feature is enabled then + * randomly select peers to relay to and queue transaction's hash + * for the rest of the peers. + * @param hash transaction's hash + * @param m transaction's protocol message to relay + * @param toSkip peers which have already seen this transaction + */ + virtual void + relay( + uint256 const& hash, + protocol::TMTransaction& m, + std::set const& toSkip) = 0; + /** Visit every active peer. * * The visitor must be invocable as: @@ -225,6 +238,12 @@ public: */ virtual std::optional networkID() const = 0; + + /** Returns tx reduce-relay metrics + @return json value of tx reduce-relay metrics + */ + virtual Json::Value + txMetrics() const = 0; }; } // namespace ripple diff --git a/src/ripple/overlay/Peer.h b/src/ripple/overlay/Peer.h index 49dc2fa096..ba41597415 100644 --- a/src/ripple/overlay/Peer.h +++ b/src/ripple/overlay/Peer.h @@ -66,6 +66,18 @@ public: virtual beast::IP::Endpoint getRemoteAddress() const = 0; + /** Send aggregated transactions' hashes. */ + virtual void + sendTxQueue() = 0; + + /** Aggregate transaction's hash. */ + virtual void + addTxQueue(uint256 const&) = 0; + + /** Remove hash from the transactions' hashes queue. */ + virtual void + removeTxQueue(uint256 const&) = 0; + /** Adjust this peer's load balance based on the type of load imposed. */ virtual void charge(Resource::Charge const& fee) = 0; @@ -121,6 +133,9 @@ public: virtual bool compressionEnabled() const = 0; + + virtual bool + txReduceRelayEnabled() const = 0; }; } // namespace ripple diff --git a/src/ripple/overlay/ReduceRelayCommon.h b/src/ripple/overlay/ReduceRelayCommon.h index 5e7f585d8f..3b87c3c8c1 100644 --- a/src/ripple/overlay/ReduceRelayCommon.h +++ b/src/ripple/overlay/ReduceRelayCommon.h @@ -48,6 +48,11 @@ static constexpr uint16_t MAX_SELECTED_PEERS = 5; // Wait before reduce-relay feature is enabled on boot up to let // the server establish peer connections static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10}; +// Maximum size of the aggregated transaction hashes per peer. +// Once we get to high tps throughput, this cap will prevent +// TMTransactions from exceeding the current protocol message +// size limit of 64MB. +static constexpr std::size_t MAX_TX_QUEUE_SIZE = 10000; } // namespace reduce_relay diff --git a/src/ripple/overlay/Slot.h b/src/ripple/overlay/Slot.h index dbb48a9218..b7a2129ed8 100644 --- a/src/ripple/overlay/Slot.h +++ b/src/ripple/overlay/Slot.h @@ -20,7 +20,7 @@ #ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED #define RIPPLE_OVERLAY_SLOT_H_INCLUDED -#include +#include #include #include #include @@ -549,8 +549,8 @@ public: * @param app Applicaton reference * @param handler Squelch/unsquelch implementation */ - Slots(Application& app, SquelchHandler const& handler) - : handler_(handler), app_(app), journal_(app.journal("Slots")) + Slots(Logs& logs, SquelchHandler const& handler) + : handler_(handler), logs_(logs), journal_(logs.journal("Slots")) { } ~Slots() = default; @@ -655,7 +655,7 @@ private: hash_map> slots_; SquelchHandler const& handler_; // squelch/unsquelch handler - Application& app_; + Logs& logs_; beast::Journal const journal_; // Maintain aged container of message/peers. This is required // to discard duplicate message from the same peer. A message @@ -717,7 +717,7 @@ Slots::updateSlotAndSquelch( auto it = slots_ .emplace(std::make_pair( validator, - Slot(handler_, app_.journal("Slot")))) + Slot(handler_, logs_.journal("Slot")))) .first; it->second.update(validator, id, type); } diff --git a/src/ripple/overlay/impl/ConnectAttempt.cpp b/src/ripple/overlay/impl/ConnectAttempt.cpp index 911215d807..f1d1c6ffe0 100644 --- a/src/ripple/overlay/impl/ConnectAttempt.cpp +++ b/src/ripple/overlay/impl/ConnectAttempt.cpp @@ -204,8 +204,9 @@ ConnectAttempt::onHandshake(error_code ec) req_ = makeRequest( !overlay_.peerFinder().config().peerPrivate, app_.config().COMPRESSION, - app_.config().VP_REDUCE_RELAY_ENABLE, - app_.config().LEDGER_REPLAY); + app_.config().LEDGER_REPLAY, + app_.config().TX_REDUCE_RELAY_ENABLE, + app_.config().VP_REDUCE_RELAY_ENABLE); buildHandshake( req_, diff --git a/src/ripple/overlay/impl/Handshake.cpp b/src/ripple/overlay/impl/Handshake.cpp index a11d6b802b..2ea208f557 100644 --- a/src/ripple/overlay/impl/Handshake.cpp +++ b/src/ripple/overlay/impl/Handshake.cpp @@ -73,16 +73,19 @@ featureEnabled( std::string makeFeaturesRequestHeader( bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled) + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled) { std::stringstream str; if (comprEnabled) str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE; - if (vpReduceRelayEnabled) - str << FEATURE_VPRR << "=1"; if (ledgerReplayEnabled) - str << FEATURE_LEDGER_REPLAY << "=1"; + str << FEATURE_LEDGER_REPLAY << "=1" << DELIM_FEATURE; + if (txReduceRelayEnabled) + str << FEATURE_TXRR << "=1" << DELIM_FEATURE; + if (vpReduceRelayEnabled) + str << FEATURE_VPRR << "=1" << DELIM_FEATURE; return str.str(); } @@ -90,16 +93,19 @@ std::string makeFeaturesResponseHeader( http_request_type const& headers, bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled) + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled) { std::stringstream str; if (comprEnabled && isFeatureValue(headers, FEATURE_COMPR, "lz4")) str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE; - if (vpReduceRelayEnabled && featureEnabled(headers, FEATURE_VPRR)) - str << FEATURE_VPRR << "=1"; if (ledgerReplayEnabled && featureEnabled(headers, FEATURE_LEDGER_REPLAY)) - str << FEATURE_LEDGER_REPLAY << "=1"; + str << FEATURE_LEDGER_REPLAY << "=1" << DELIM_FEATURE; + if (txReduceRelayEnabled && featureEnabled(headers, FEATURE_TXRR)) + str << FEATURE_TXRR << "=1" << DELIM_FEATURE; + if (vpReduceRelayEnabled && featureEnabled(headers, FEATURE_VPRR)) + str << FEATURE_VPRR << "=1" << DELIM_FEATURE; return str.str(); } @@ -363,8 +369,9 @@ auto makeRequest( bool crawlPublic, bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled) -> request_type + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled) -> request_type { request_type m; m.method(boost::beast::http::verb::get); @@ -378,7 +385,10 @@ makeRequest( m.insert( "X-Protocol-Ctl", makeFeaturesRequestHeader( - comprEnabled, vpReduceRelayEnabled, ledgerReplayEnabled)); + comprEnabled, + ledgerReplayEnabled, + txReduceRelayEnabled, + vpReduceRelayEnabled)); return m; } @@ -406,8 +416,9 @@ makeResponse( makeFeaturesResponseHeader( req, app.config().COMPRESSION, - app.config().VP_REDUCE_RELAY_ENABLE, - app.config().LEDGER_REPLAY)); + app.config().LEDGER_REPLAY, + app.config().TX_REDUCE_RELAY_ENABLE, + app.config().VP_REDUCE_RELAY_ENABLE)); buildHandshake(resp, sharedValue, networkID, public_ip, remote_ip, app); diff --git a/src/ripple/overlay/impl/Handshake.h b/src/ripple/overlay/impl/Handshake.h index 089f4ff6b6..2accf5d221 100644 --- a/src/ripple/overlay/impl/Handshake.h +++ b/src/ripple/overlay/impl/Handshake.h @@ -95,16 +95,20 @@ verifyHandshake( @param crawlPublic if true then server's IP/Port are included in crawl @param comprEnabled if true then compression feature is enabled - @param vpReduceRelayEnabled if true then reduce-relay feature is enabled @param ledgerReplayEnabled if true then ledger-replay feature is enabled + @param txReduceRelayEnabled if true then transaction reduce-relay feature is + enabled + @param vpReduceRelayEnabled if true then validation/proposal reduce-relay + feature is enabled @return http request with empty body */ request_type makeRequest( bool crawlPublic, bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled); + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled); /** Make http response @@ -133,11 +137,15 @@ makeResponse( // The format is: // X-Protocol-Ctl: feature1=value1[,value2]*[\s*;\s*feature2=value1[,value2]*]* // value: \S+ -static constexpr char FEATURE_COMPR[] = "compr"; // compression -static constexpr char FEATURE_VPRR[] = - "vprr"; // validation/proposal reduce-relay -static constexpr char FEATURE_LEDGER_REPLAY[] = - "ledgerreplay"; // ledger replay + +// compression feature +static constexpr char FEATURE_COMPR[] = "compr"; +// validation/proposal reduce-relay feature +static constexpr char FEATURE_VPRR[] = "vprr"; +// transaction reduce-relay feature +static constexpr char FEATURE_TXRR[] = "txrr"; +// ledger replay +static constexpr char FEATURE_LEDGER_REPLAY[] = "ledgerreplay"; static constexpr char DELIM_FEATURE[] = ";"; static constexpr char DELIM_VALUE[] = ","; @@ -210,15 +218,19 @@ peerFeatureEnabled( /** Make request header X-Protocol-Ctl value with supported features @param comprEnabled if true then compression feature is enabled - @param vpReduceRelayEnabled if true then reduce-relay feature is enabled @param ledgerReplayEnabled if true then ledger-replay feature is enabled + @param txReduceRelayEnabled if true then transaction reduce-relay feature is + enabled + @param vpReduceRelayEnabled if true then validation/proposal reduce-relay + feature is enabled @return X-Protocol-Ctl header value */ std::string makeFeaturesRequestHeader( bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled); + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled); /** Make response header X-Protocol-Ctl value with supported features. If the request has a feature that we support enabled @@ -226,16 +238,21 @@ makeFeaturesRequestHeader( the response header. @param header request's header @param comprEnabled if true then compression feature is enabled - @param vpReduceRelayEnabled if true then reduce-relay feature is enabled @param ledgerReplayEnabled if true then ledger-replay feature is enabled + @param txReduceRelayEnabled if true then transaction reduce-relay feature is + enabled + @param vpReduceRelayEnabled if true then validation/proposal reduce-relay + feature is enabled + @param vpReduceRelayEnabled if true then reduce-relay feature is enabled @return X-Protocol-Ctl header value */ std::string makeFeaturesResponseHeader( http_request_type const& headers, bool comprEnabled, - bool vpReduceRelayEnabled, - bool ledgerReplayEnabled); + bool ledgerReplayEnabled, + bool txReduceRelayEnabled, + bool vpReduceRelayEnabled); } // namespace ripple diff --git a/src/ripple/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp index aabd6ab785..b4cb1f192a 100644 --- a/src/ripple/overlay/impl/Message.cpp +++ b/src/ripple/overlay/impl/Message.cpp @@ -86,6 +86,7 @@ Message::compress() case protocol::mtVALIDATORLIST: case protocol::mtVALIDATORLISTCOLLECTION: case protocol::mtREPLAY_DELTA_RESPONSE: + case protocol::mtTRANSACTIONS: return true; case protocol::mtPING: case protocol::mtCLUSTER: @@ -100,6 +101,7 @@ Message::compress() case protocol::mtREPLAY_DELTA_REQ: case protocol::mtGET_PEER_SHARD_INFO_V2: case protocol::mtPEER_SHARD_INFO_V2: + case protocol::mtHAVE_TRANSACTIONS: break; } return false; diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 27ee59586b..91e37794e1 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -102,6 +103,8 @@ OverlayImpl::Timer::on_timer(error_code ec) overlay_.m_peerFinder->once_per_second(); overlay_.sendEndpoints(); overlay_.autoConnect(); + if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE) + overlay_.sendTxQueue(); if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0) overlay_.deleteIdlePeers(); @@ -137,7 +140,7 @@ OverlayImpl::OverlayImpl( , m_resolver(resolver) , next_id_(1) , timer_count_(0) - , slots_(app, *this) + , slots_(app.logs(), *this) , m_stats( std::bind(&OverlayImpl::collect_metrics, this), collector, @@ -1148,6 +1151,37 @@ OverlayImpl::getActivePeers() const return ret; } +Overlay::PeerSequence +OverlayImpl::getActivePeers( + std::set const& toSkip, + std::size_t& active, + std::size_t& disabled, + std::size_t& enabledInSkip) const +{ + Overlay::PeerSequence ret; + std::lock_guard lock(mutex_); + + active = ids_.size(); + ret.reserve(ids_.size() - toSkip.size()); + + for (auto& [id, w] : ids_) + { + if (auto p = w.lock()) + { + // tx rr feature disabled + if (!p->txReduceRelayEnabled()) + disabled++; + + if (toSkip.count(id) == 0) + ret.emplace_back(std::move(p)); + else if (p->txReduceRelayEnabled()) + enabledInSkip++; + } + } + + return ret; +} + void OverlayImpl::checkTracking(std::uint32_t index) { @@ -1264,6 +1298,67 @@ OverlayImpl::getManifestsMessage() return manifestMessage_; } +void +OverlayImpl::relay( + uint256 const& hash, + protocol::TMTransaction& m, + std::set const& toSkip) +{ + auto const sm = std::make_shared(m, protocol::mtTRANSACTION); + std::size_t total = 0; + std::size_t disabled = 0; + std::size_t enabledInSkip = 0; + + // total peers excluding peers in toSkip + auto peers = getActivePeers(toSkip, total, disabled, enabledInSkip); + auto minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled; + + if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay) + { + for (auto const& p : peers) + p->send(sm); + if (app_.config().TX_REDUCE_RELAY_ENABLE || + app_.config().TX_REDUCE_RELAY_METRICS) + txMetrics_.addMetrics(total, toSkip.size(), 0); + return; + } + + // We have more peers than the minimum (disabled + minimum enabled), + // relay to all disabled and some randomly selected enabled that + // do not have the transaction. + auto enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS + + (total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100; + + txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled); + + if (enabledTarget > enabledInSkip) + std::shuffle(peers.begin(), peers.end(), default_prng()); + + JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size() + << " selected " << enabledTarget << " skip " + << toSkip.size() << " disabled " << disabled; + + // count skipped peers with the enabled feature towards the quota + std::uint16_t enabledAndRelayed = enabledInSkip; + for (auto const& p : peers) + { + // always relay to a peer with the disabled feature + if (!p->txReduceRelayEnabled()) + { + p->send(sm); + } + else if (enabledAndRelayed < enabledTarget) + { + enabledAndRelayed++; + p->send(sm); + } + else + { + p->addTxQueue(hash); + } + } +} + //------------------------------------------------------------------------------ void @@ -1333,6 +1428,15 @@ OverlayImpl::sendEndpoints() } } +void +OverlayImpl::sendTxQueue() +{ + for_each([](auto const& p) { + if (p->txReduceRelayEnabled()) + p->sendTxQueue(); + }); +} + std::shared_ptr makeSquelchMessage( PublicKey const& validator, diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 764f2a79ac..5f23b9150e 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -128,6 +129,9 @@ private: reduce_relay::Slots slots_; + // Transaction reduce-relay metrics + metrics::TxMetrics txMetrics_; + // A message with the list of manifests we send to peers std::shared_ptr manifestMessage_; // Used to track whether we need to update the cached list of manifests @@ -197,6 +201,22 @@ public: PeerSequence getActivePeers() const override; + /** Get active peers excluding peers in toSkip. + @param toSkip peers to skip + @param active a number of active peers + @param disabled a number of peers with tx reduce-relay + feature disabled + @param enabledInSkip a number of peers with tx reduce-relay + feature enabled and in toSkip + @return active peers less peers in toSkip + */ + PeerSequence + getActivePeers( + std::set const& toSkip, + std::size_t& active, + std::size_t& disabled, + std::size_t& enabledInSkip) const; + void checkTracking(std::uint32_t) override; std::shared_ptr @@ -223,6 +243,12 @@ public: uint256 const& uid, PublicKey const& validator) override; + void + relay( + uint256 const&, + protocol::TMTransaction& m, + std::set const& skip) override; + std::shared_ptr getManifestsMessage(); @@ -411,6 +437,25 @@ public: void deletePeer(Peer::id_t id); + Json::Value + txMetrics() const override + { + return txMetrics_.json(); + } + + /** Add tx reduce-relay metrics. */ + template + void + addTxMetrics(Args... args) + { + if (!strand_.running_in_this_thread()) + return post( + strand_, + std::bind(&OverlayImpl::addTxMetrics, this, args...)); + + txMetrics_.addMetrics(args...); + } + private: void squelch( @@ -518,6 +563,10 @@ private: void sendEndpoints(); + /** Send once a second transactions' hashes aggregated by peers. */ + void + sendTxQueue(); + /** Check if peers stopped relaying messages * and if slots stopped receiving messages from the validator */ void diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 103d21d80a..e6d75bf44d 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -108,6 +109,10 @@ PeerImp::PeerImp( app_.config().COMPRESSION) ? Compressed::On : Compressed::Off) + , txReduceRelayEnabled_(peerFeatureEnabled( + headers_, + FEATURE_TXRR, + app_.config().TX_REDUCE_RELAY_ENABLE)) , vpReduceRelayEnabled_(peerFeatureEnabled( headers_, FEATURE_VPRR, @@ -118,11 +123,13 @@ PeerImp::PeerImp( app_.config().LEDGER_REPLAY)) , ledgerReplayMsgHandler_(app, app.getLedgerReplayer()) { - JLOG(journal_.debug()) << " compression enabled " - << (compressionEnabled_ == Compressed::On) - << " vp reduce-relay enabled " - << vpReduceRelayEnabled_ << " on " << remote_address_ - << " " << id_; + JLOG(journal_.info()) << "compression enabled " + << (compressionEnabled_ == Compressed::On) + << " vp reduce-relay enabled " + << vpReduceRelayEnabled_ + << " tx reduce-relay enabled " + << txReduceRelayEnabled_ << " on " << remote_address_ + << " " << id_; } PeerImp::~PeerImp() @@ -285,6 +292,54 @@ PeerImp::send(std::shared_ptr const& m) std::placeholders::_2))); } +void +PeerImp::sendTxQueue() +{ + if (!strand_.running_in_this_thread()) + return post( + strand_, std::bind(&PeerImp::sendTxQueue, shared_from_this())); + + if (!txQueue_.empty()) + { + protocol::TMHaveTransactions ht; + std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) { + ht.add_hashes(hash.data(), hash.size()); + }); + JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size(); + txQueue_.clear(); + send(std::make_shared(ht, protocol::mtHAVE_TRANSACTIONS)); + } +} + +void +PeerImp::addTxQueue(uint256 const& hash) +{ + if (!strand_.running_in_this_thread()) + return post( + strand_, std::bind(&PeerImp::addTxQueue, shared_from_this(), hash)); + + if (txQueue_.size() == reduce_relay::MAX_TX_QUEUE_SIZE) + { + JLOG(p_journal_.warn()) << "addTxQueue exceeds the cap"; + sendTxQueue(); + } + + txQueue_.insert(hash); + JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size(); +} + +void +PeerImp::removeTxQueue(uint256 const& hash) +{ + if (!strand_.running_in_this_thread()) + return post( + strand_, + std::bind(&PeerImp::removeTxQueue, shared_from_this(), hash)); + + auto removed = txQueue_.erase(hash); + JLOG(p_journal_.trace()) << "removeTxQueue " << removed; +} + void PeerImp::charge(Resource::Charge const& fee) { @@ -967,8 +1022,25 @@ PeerImp::onMessageBegin( load_event_ = app_.getJobQueue().makeLoadEvent(jtPEER, protocolMessageName(type)); fee_ = Resource::feeLightPeer; - overlay_.reportTraffic( - TrafficCount::categorize(*m, type, true), true, static_cast(size)); + auto const category = TrafficCount::categorize(*m, type, true); + overlay_.reportTraffic(category, true, static_cast(size)); + using namespace protocol; + if ((type == MessageType::mtTRANSACTION || + type == MessageType::mtHAVE_TRANSACTIONS || + type == MessageType::mtTRANSACTIONS || + // GET_OBJECTS + category == TrafficCount::category::get_transactions || + // GET_LEDGER + category == TrafficCount::category::ld_tsc_get || + category == TrafficCount::category::ld_tsc_share || + // LEDGER_DATA + category == TrafficCount::category::gl_tsc_share || + category == TrafficCount::category::gl_tsc_get) && + (txReduceRelayEnabled() || app_.config().TX_REDUCE_RELAY_METRICS)) + { + overlay_.addTxMetrics( + static_cast(type), static_cast(size)); + } JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " " << uncompressed_size << " " << isCompressed; } @@ -1440,6 +1512,14 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) +{ + handleTransaction(m, true); +} + +void +PeerImp::handleTransaction( + std::shared_ptr const& m, + bool eraseTxQueue) { if (tracking_.load() == Tracking::diverged) return; @@ -1472,6 +1552,11 @@ PeerImp::onMessage(std::shared_ptr const& m) JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID; } + // Erase only if the server has seen this tx. If the server has not + // seen this tx then the tx could not has been queued for this peer. + else if (eraseTxQueue && txReduceRelayEnabled()) + removeTxQueue(txID); + return; } @@ -2509,6 +2594,9 @@ PeerImp::onMessage(std::shared_ptr const& m) { protocol::TMGetObjectByHash& packet = *m; + JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type() + << " " << packet.objects_size(); + if (packet.query()) { // this is a query @@ -2524,6 +2612,25 @@ PeerImp::onMessage(std::shared_ptr const& m) return; } + if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS) + { + if (!txReduceRelayEnabled()) + { + JLOG(p_journal_.error()) + << "TMGetObjectByHash: tx reduce-relay is disabled"; + fee_ = Resource::feeInvalidRequest; + return; + } + + std::weak_ptr weak = shared_from_this(); + app_.getJobQueue().addJob( + jtREQUESTED_TXN, "doTransactions", [weak, m](Job&) { + if (auto peer = weak.lock()) + peer->doTransactions(m); + }); + return; + } + fee_ = Resource::feeMediumBurdenPeer; protocol::TMGetObjectByHash reply; @@ -2644,6 +2751,98 @@ PeerImp::onMessage(std::shared_ptr const& m) } } +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + if (!txReduceRelayEnabled()) + { + JLOG(p_journal_.error()) + << "TMHaveTransactions: tx reduce-relay is disabled"; + fee_ = Resource::feeInvalidRequest; + return; + } + + std::weak_ptr weak = shared_from_this(); + app_.getJobQueue().addJob( + jtMISSING_TXN, "handleHaveTransactions", [weak, m](Job&) { + if (auto peer = weak.lock()) + peer->handleHaveTransactions(m); + }); +} + +void +PeerImp::handleHaveTransactions( + std::shared_ptr const& m) +{ + protocol::TMGetObjectByHash tmBH; + tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS); + tmBH.set_query(true); + + JLOG(p_journal_.trace()) + << "received TMHaveTransactions " << m->hashes_size(); + + for (std::uint32_t i = 0; i < m->hashes_size(); i++) + { + if (!stringIsUint256Sized(m->hashes(i))) + { + JLOG(p_journal_.error()) + << "TMHaveTransactions with invalid hash size"; + fee_ = Resource::feeInvalidRequest; + return; + } + + uint256 hash(m->hashes(i)); + + auto txn = app_.getMasterTransaction().fetch_from_cache(hash); + + JLOG(p_journal_.trace()) << "checking transaction " << (bool)txn; + + if (!txn) + { + JLOG(p_journal_.debug()) << "adding transaction to request"; + + auto obj = tmBH.add_objects(); + obj->set_hash(hash.data(), hash.size()); + } + else + { + // Erase only if a peer has seen this tx. If the peer has not + // seen this tx then the tx could not has been queued for this + // peer. + removeTxQueue(hash); + } + } + + JLOG(p_journal_.trace()) + << "transaction request object is " << tmBH.objects_size(); + + if (tmBH.objects_size() > 0) + send(std::make_shared(tmBH, protocol::mtGET_OBJECTS)); +} + +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + if (!txReduceRelayEnabled()) + { + JLOG(p_journal_.error()) + << "TMTransactions: tx reduce-relay is disabled"; + fee_ = Resource::feeInvalidRequest; + return; + } + + JLOG(p_journal_.trace()) + << "received TMTransactions " << m->transactions_size(); + + overlay_.addTxMetrics(m->transactions_size()); + + for (std::uint32_t i = 0; i < m->transactions_size(); ++i) + handleTransaction( + std::shared_ptr( + m->mutable_transactions(i), [](protocol::TMTransaction*) {}), + false); +} + void PeerImp::onMessage(std::shared_ptr const& m) { @@ -2740,6 +2939,61 @@ PeerImp::doFetchPack(const std::shared_ptr& packet) }); } +void +PeerImp::doTransactions( + std::shared_ptr const& packet) +{ + protocol::TMTransactions reply; + + JLOG(p_journal_.trace()) << "received TMGetObjectByHash requesting tx " + << packet->objects_size(); + + if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE) + { + JLOG(p_journal_.error()) << "doTransactions, invalid number of hashes"; + fee_ = Resource::feeInvalidRequest; + return; + } + + for (std::uint32_t i = 0; i < packet->objects_size(); ++i) + { + auto const& obj = packet->objects(i); + + if (!stringIsUint256Sized(obj.hash())) + { + fee_ = Resource::feeInvalidRequest; + return; + } + + uint256 hash(obj.hash()); + + auto txn = app_.getMasterTransaction().fetch_from_cache(hash); + + if (!txn) + { + JLOG(p_journal_.error()) << "doTransactions, transaction not found " + << Slice(hash.data(), hash.size()); + fee_ = Resource::feeInvalidRequest; + return; + } + + Serializer s; + auto tx = reply.add_transactions(); + auto sttx = txn->getSTransaction(); + sttx->add(s); + tx->set_rawtransaction(s.data(), s.size()); + tx->set_status( + txn->getStatus() == INCLUDED ? protocol::tsCURRENT + : protocol::tsNEW); + tx->set_receivetimestamp( + app_.timeKeeper().now().time_since_epoch().count()); + tx->set_deferred(txn->getSubmitResult().queued); + } + + if (reply.transactions_size() > 0) + send(std::make_shared(reply, protocol::mtTRANSACTIONS)); +} + void PeerImp::checkTransaction( int flags, diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 9c511000c7..f7bfbbc9d6 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -167,6 +168,13 @@ private: std::mutex mutable shardInfoMutex_; Compressed compressionEnabled_ = Compressed::Off; + + // Queue of transactions' hashes that have not been + // relayed. The hashes are sent once a second to a peer + // and the peer requests missing transactions from the node. + hash_set txQueue_; + // true if tx reduce-relay feature is enabled on the peer. + bool txReduceRelayEnabled_ = false; // true if validation/proposal reduce-relay feature is enabled // on the peer. bool vpReduceRelayEnabled_ = false; @@ -255,7 +263,7 @@ public: } // Work-around for calling shared_from_this in constructors - void + virtual void run(); // Called when Overlay gets a stop request. @@ -269,6 +277,22 @@ public: void send(std::shared_ptr const& m) override; + /** Send aggregated transactions' hashes */ + void + sendTxQueue() override; + + /** Add transaction's hash to the transactions' hashes queue + @param hash transaction's hash + */ + void + addTxQueue(uint256 const& hash) override; + + /** Remove transaction's hash from the transactions' hashes queue + @param hash transaction's hash + */ + void + removeTxQueue(uint256 const& hash) override; + /** Send a set of PeerFinder endpoints as a protocol message. */ template < class FwdIt, @@ -401,6 +425,12 @@ public: return compressionEnabled_ == Compressed::On; } + bool + txReduceRelayEnabled() const override + { + return txReduceRelayEnabled_; + } + private: void close(); @@ -453,6 +483,29 @@ private: void onWriteMessage(error_code ec, std::size_t bytes_transferred); + /** Called from onMessage(TMTransaction(s)). + @param m Transaction protocol message + @param eraseTxQueue is true when called from onMessage(TMTransaction) + and is false when called from onMessage(TMTransactions). If true then + the transaction hash is erased from txQueue_. Don't need to erase from + the queue when called from onMessage(TMTransactions) because this + message is a response to the missing transactions request and the queue + would not have any of these transactions. + */ + void + handleTransaction( + std::shared_ptr const& m, + bool eraseTxQueue); + + /** Handle protocol message with hashes of transactions that have not + been relayed by an upstream node down to its peers - request + transactions, which have not been relayed to this peer. + @param m protocol message with transactions' hashes + */ + void + handleHaveTransactions( + std::shared_ptr const& m); + // Check if reduce-relay feature is enabled and // reduce_relay::WAIT_ON_BOOTUP time passed since the start bool @@ -518,6 +571,10 @@ public: void onMessage(std::shared_ptr const& m); void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void onMessage(std::shared_ptr const& m); void onMessage(std::shared_ptr const& m); @@ -547,6 +604,13 @@ private: std::uint32_t version, std::vector const& blobs); + /** Process peer's request to send missing transactions. The request is + sent in response to TMHaveTransactions. + @param packet protocol message containing missing transactions' hashes. + */ + void + doTransactions(std::shared_ptr const& packet); + void checkTransaction( int flags, @@ -628,6 +692,10 @@ PeerImp::PeerImp( app_.config().COMPRESSION) ? Compressed::On : Compressed::Off) + , txReduceRelayEnabled_(peerFeatureEnabled( + headers_, + FEATURE_TXRR, + app_.config().TX_REDUCE_RELAY_ENABLE)) , vpReduceRelayEnabled_(peerFeatureEnabled( headers_, FEATURE_VPRR, @@ -640,11 +708,13 @@ PeerImp::PeerImp( { read_buffer_.commit(boost::asio::buffer_copy( read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers)); - JLOG(journal_.debug()) << "compression enabled " - << (compressionEnabled_ == Compressed::On) - << " vp reduce-relay enabled " - << vpReduceRelayEnabled_ << " on " << remote_address_ - << " " << id_; + JLOG(journal_.info()) << "compression enabled " + << (compressionEnabled_ == Compressed::On) + << " vp reduce-relay enabled " + << vpReduceRelayEnabled_ + << " tx reduce-relay enabled " + << txReduceRelayEnabled_ << " on " << remote_address_ + << " " << id_; } template diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index bbf7c1e16c..09861353a1 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -95,6 +95,10 @@ protocolMessageName(int type) return "peer_shard_info"; case protocol::mtGET_OBJECTS: return "get_objects"; + case protocol::mtHAVE_TRANSACTIONS: + return "have_transactions"; + case protocol::mtTRANSACTIONS: + return "transactions"; case protocol::mtSQUELCH: return "squelch"; case protocol::mtPROOF_PATH_REQ: @@ -453,6 +457,14 @@ invokeProtocolMessage( success = detail::invoke( *header, buffers, handler); break; + case protocol::mtHAVE_TRANSACTIONS: + success = detail::invoke( + *header, buffers, handler); + break; + case protocol::mtTRANSACTIONS: + success = detail::invoke( + *header, buffers, handler); + break; case protocol::mtSQUELCH: success = detail::invoke(*header, buffers, handler); diff --git a/src/ripple/overlay/impl/TrafficCount.cpp b/src/ripple/overlay/impl/TrafficCount.cpp index 5ae5abc3bf..9b35d47683 100644 --- a/src/ripple/overlay/impl/TrafficCount.cpp +++ b/src/ripple/overlay/impl/TrafficCount.cpp @@ -138,6 +138,9 @@ TrafficCount::categorize( ? TrafficCount::category::share_fetch_pack : TrafficCount::category::get_fetch_pack; + if (msg->type() == protocol::TMGetObjectByHash::otTRANSACTIONS) + return TrafficCount::category::get_transactions; + return (msg->query() == inbound) ? TrafficCount::category::share_hash : TrafficCount::category::get_hash; } @@ -154,6 +157,12 @@ TrafficCount::categorize( if (type == protocol::mtREPLAY_DELTA_RESPONSE) return TrafficCount::category::replay_delta_response; + if (type == protocol::mtHAVE_TRANSACTIONS) + return TrafficCount::category::have_transactions; + + if (type == protocol::mtTRANSACTIONS) + return TrafficCount::category::requested_transactions; + return TrafficCount::category::unknown; } diff --git a/src/ripple/overlay/impl/TrafficCount.h b/src/ripple/overlay/impl/TrafficCount.h index fbb3fb9ed1..9e212da791 100644 --- a/src/ripple/overlay/impl/TrafficCount.h +++ b/src/ripple/overlay/impl/TrafficCount.h @@ -136,6 +136,9 @@ public: share_fetch_pack, get_fetch_pack, + // TMGetObjectByHash: transactions + get_transactions, + // TMGetObjectByHash: generic share_hash, get_hash, @@ -148,6 +151,12 @@ public: replay_delta_request, replay_delta_response, + // TMHaveTransactions + have_transactions, + + // TMTransactions + requested_transactions, + unknown // must be last }; @@ -230,13 +239,16 @@ protected: {"getobject_CAS_get"}, // category::get_cas_object {"getobject_Fetch_Pack_share"}, // category::share_fetch_pack {"getobject_Fetch Pack_get"}, // category::get_fetch_pack + {"getobject_Transactions_get"}, // category::get_transactions {"getobject_share"}, // category::share_hash {"getobject_get"}, // category::get_hash {"proof_path_request"}, // category::proof_path_request - {"proof_path_response"}, // category::proof_path_response - {"replay_delta_request"}, // category::replay_delta_request - {"replay_delta_response"}, // category::replay_delta_response - {"unknown"} // category::unknown + {"proof_path_response"}, // category::proof_path_response + {"replay_delta_request"}, // category::replay_delta_request + {"replay_delta_response"}, // category::replay_delta_response + {"have_transactions"}, // category::have_transactions + {"requested_transactions"}, // category::transactions + {"unknown"} // category::unknown }}; }; diff --git a/src/ripple/overlay/impl/TxMetrics.cpp b/src/ripple/overlay/impl/TxMetrics.cpp new file mode 100644 index 0000000000..f77f746b3d --- /dev/null +++ b/src/ripple/overlay/impl/TxMetrics.cpp @@ -0,0 +1,151 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "ripple/overlay/impl/TxMetrics.h" +#include "ripple/protocol/jss.h" + +#include + +namespace ripple { + +namespace metrics { + +void +TxMetrics::addMetrics(protocol::MessageType type, std::uint32_t val) +{ + auto add = [&](auto& m, std::uint32_t val) { + std::lock_guard lock(mutex); + m.addMetrics(val); + }; + + switch (type) + { + case protocol::MessageType::mtTRANSACTION: + add(tx, val); + break; + case protocol::MessageType::mtHAVE_TRANSACTIONS: + add(haveTx, val); + break; + case protocol::MessageType::mtGET_LEDGER: + add(getLedger, val); + break; + case protocol::MessageType::mtLEDGER_DATA: + add(ledgerData, val); + break; + case protocol::MessageType::mtTRANSACTIONS: + add(transactions, val); + break; + default: + return; + } +} + +void +TxMetrics::addMetrics( + std::uint32_t selected, + std::uint32_t suppressed, + std::uint32_t notenabled) +{ + std::lock_guard lock(mutex); + selectedPeers.addMetrics(selected); + suppressedPeers.addMetrics(suppressed); + notEnabled.addMetrics(notenabled); +} + +void +TxMetrics::addMetrics(std::uint32_t missing) +{ + std::lock_guard lock(mutex); + missingTx.addMetrics(missing); +} + +void +MultipleMetrics::addMetrics(std::uint32_t val2) +{ + addMetrics(1, val2); +} + +void +MultipleMetrics::addMetrics(std::uint32_t val1, std::uint32_t val2) +{ + m1.addMetrics(val1); + m2.addMetrics(val2); +} + +void +SingleMetrics::addMetrics(std::uint32_t val) +{ + using namespace std::chrono_literals; + accum += val; + N++; + auto const timeElapsed = clock_type::now() - intervalStart; + auto const timeElapsedInSecs = + std::chrono::duration_cast(timeElapsed); + + if (timeElapsedInSecs >= 1s) + { + auto const avg = accum / (perTimeUnit ? timeElapsedInSecs.count() : N); + rollingAvgAggreg.push_back(avg); + + auto const total = std::accumulate( + rollingAvgAggreg.begin(), rollingAvgAggreg.end(), 0ull); + rollingAvg = total / rollingAvgAggreg.size(); + + intervalStart = clock_type::now(); + accum = 0; + N = 0; + } +} + +Json::Value +TxMetrics::json() const +{ + std::lock_guard l(mutex); + + Json::Value ret(Json::objectValue); + + ret[jss::txr_tx_cnt] = std::to_string(tx.m1.rollingAvg); + ret[jss::txr_tx_sz] = std::to_string(tx.m2.rollingAvg); + + ret[jss::txr_have_txs_cnt] = std::to_string(haveTx.m1.rollingAvg); + ret[jss::txr_have_txs_sz] = std::to_string(haveTx.m2.rollingAvg); + + ret[jss::txr_get_ledger_cnt] = std::to_string(getLedger.m1.rollingAvg); + ret[jss::txr_get_ledger_sz] = std::to_string(getLedger.m2.rollingAvg); + + ret[jss::txr_ledger_data_cnt] = std::to_string(ledgerData.m1.rollingAvg); + ret[jss::txr_ledger_data_sz] = std::to_string(ledgerData.m2.rollingAvg); + + ret[jss::txr_transactions_cnt] = std::to_string(transactions.m1.rollingAvg); + ret[jss::txr_transactions_sz] = std::to_string(transactions.m2.rollingAvg); + + ret[jss::txr_selected_cnt] = std::to_string(selectedPeers.rollingAvg); + + ret[jss::txr_suppressed_cnt] = std::to_string(suppressedPeers.rollingAvg); + + ret[jss::txr_not_enabled_cnt] = std::to_string(notEnabled.rollingAvg); + + ret[jss::txr_missing_tx_freq] = std::to_string(missingTx.rollingAvg); + + return ret; +} + +} // namespace metrics + +} // namespace ripple diff --git a/src/ripple/overlay/impl/TxMetrics.h b/src/ripple/overlay/impl/TxMetrics.h new file mode 100644 index 0000000000..a37d7e7a48 --- /dev/null +++ b/src/ripple/overlay/impl/TxMetrics.h @@ -0,0 +1,141 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_TXMETRICS_H_INCLUDED +#define RIPPLE_OVERLAY_TXMETRICS_H_INCLUDED + +#include "ripple/json/json_value.h" +#include "ripple/protocol/messages.h" + +#include + +#include +#include + +namespace ripple { + +namespace metrics { + +/** Run single metrics rolling average. Can be either average of a value + per second or average of a value's sample per second. For instance, + for transaction it makes sense to have transaction bytes and count + per second, but for a number of selected peers to relay per transaction + it makes sense to have sample's average. + */ +struct SingleMetrics +{ + /** Class constructor + @param ptu if true then calculate metrics per second, otherwise + sample's average + */ + SingleMetrics(bool ptu = true) : perTimeUnit(ptu) + { + } + using clock_type = std::chrono::steady_clock; + clock_type::time_point intervalStart{clock_type::now()}; + std::uint64_t accum{0}; + std::uint64_t rollingAvg{0}; + std::uint32_t N{0}; + bool perTimeUnit{true}; + boost::circular_buffer rollingAvgAggreg{30, 0ull}; + /** Add metrics value + * @param val metrics value, either bytes or count + */ + void + addMetrics(std::uint32_t val); +}; + +/** Run two metrics. For instance message size and count for + protocol messages. */ +struct MultipleMetrics +{ + MultipleMetrics(bool ptu1 = true, bool ptu2 = true) : m1(ptu1), m2(ptu2) + { + } + + SingleMetrics m1; + SingleMetrics m2; + /** Add metrics to m2. m1 in this case aggregates the frequency. + @param val2 m2 metrics value + */ + void + addMetrics(std::uint32_t val2); + /** Add metrics to m1 and m2. + * @param val1 m1 metrics value + * @param val2 m2 metrics value + */ + void + addMetrics(std::uint32_t val1, std::uint32_t val2); +}; + +/** Run transaction reduce-relay feature related metrics */ +struct TxMetrics +{ + mutable std::mutex mutex; + // TMTransaction bytes and count per second + MultipleMetrics tx; + // TMHaveTransactions bytes and count per second + MultipleMetrics haveTx; + // TMGetLedger bytes and count per second + MultipleMetrics getLedger; + // TMLedgerData bytes and count per second + MultipleMetrics ledgerData; + // TMTransactions bytes and count per second + MultipleMetrics transactions; + // Peers selected to relay in each transaction sample average + SingleMetrics selectedPeers{false}; + // Peers suppressed to relay in each transaction sample average + SingleMetrics suppressedPeers{false}; + // Peers with tx reduce-relay feature not enabled + SingleMetrics notEnabled{false}; + // TMTransactions number of transactions count per second + SingleMetrics missingTx; + /** Add protocol message metrics + @param type protocol message type + @param val message size in bytes + */ + void + addMetrics(protocol::MessageType type, std::uint32_t val); + /** Add peers selected for relaying and suppressed peers metrics. + @param selected number of selected peers to relay + @param suppressed number of suppressed peers + @param notEnabled number of peers with tx reduce-relay featured disabled + */ + void + addMetrics( + std::uint32_t selected, + std::uint32_t suppressed, + std::uint32_t notEnabled); + /** Add number of missing transactions that a node requested + @param missing number of missing transactions + */ + void + addMetrics(std::uint32_t missing); + /** Get json representation of the metrics + @return json object + */ + Json::Value + json() const; +}; + +} // namespace metrics + +} // namespace ripple + +#endif \ No newline at end of file diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 5306aee7b7..74cbfe8f6c 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -31,6 +31,8 @@ enum MessageType mtREPLAY_DELTA_RESPONSE = 60; mtGET_PEER_SHARD_INFO_V2 = 61; mtPEER_SHARD_INFO_V2 = 62; + mtHAVE_TRANSACTIONS = 63; + mtTRANSACTIONS = 64; } // token, iterations, target, challenge = issue demand for proof of work @@ -176,6 +178,11 @@ message TMTransaction optional bool deferred = 4; // not applied to open ledger } +message TMTransactions +{ + repeated TMTransaction transactions = 1; +} + enum NodeStatus { @@ -316,6 +323,7 @@ message TMGetObjectByHash otSTATE_NODE = 4; otCAS_OBJECT = 5; otFETCH_PACK = 6; + otTRANSACTIONS = 7; } required ObjectType type = 1; @@ -437,3 +445,8 @@ message TMReplayDeltaResponse optional TMReplyError error = 4; } +message TMHaveTransactions +{ + repeated bytes hashes = 1; +} + diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index 1b0c4793aa..f1218474ae 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -570,6 +570,20 @@ JSS(tx_json); // in/out: TransactionSign JSS(tx_signing_hash); // out: TransactionSign JSS(tx_unsigned); // out: TransactionSign JSS(txn_count); // out: NetworkOPs +JSS(txr_tx_cnt); // out: protocol message tx's count +JSS(txr_tx_sz); // out: protocol message tx's size +JSS(txr_have_txs_cnt); // out: protocol message have tx count +JSS(txr_have_txs_sz); // out: protocol message have tx size +JSS(txr_get_ledger_cnt); // out: protocol message get ledger count +JSS(txr_get_ledger_sz); // out: protocol message get ledger size +JSS(txr_ledger_data_cnt); // out: protocol message ledger data count +JSS(txr_ledger_data_sz); // out: protocol message ledger data size +JSS(txr_transactions_cnt); // out: protocol message get object count +JSS(txr_transactions_sz); // out: protocol message get object size +JSS(txr_selected_cnt); // out: selected peers count +JSS(txr_suppressed_cnt); // out: suppressed peers count +JSS(txr_not_enabled_cnt); // out: peers with tx reduce-relay disabled count +JSS(txr_missing_tx_freq); // out: missing tx frequency average JSS(txs); // out: TxHistory JSS(type); // in: AccountObjects // out: NetworkOPs diff --git a/src/ripple/rpc/handlers/Handlers.h b/src/ripple/rpc/handlers/Handlers.h index ec8a0ff355..5fb42268f5 100644 --- a/src/ripple/rpc/handlers/Handlers.h +++ b/src/ripple/rpc/handlers/Handlers.h @@ -141,6 +141,8 @@ doTxJson(RPC::JsonContext&); Json::Value doTxHistory(RPC::JsonContext&); Json::Value +doTxReduceRelay(RPC::JsonContext&); +Json::Value doUnlList(RPC::JsonContext&); Json::Value doUnsubscribe(RPC::JsonContext&); diff --git a/src/ripple/rpc/handlers/TxReduceRelay.cpp b/src/ripple/rpc/handlers/TxReduceRelay.cpp new file mode 100644 index 0000000000..bb883b8dea --- /dev/null +++ b/src/ripple/rpc/handlers/TxReduceRelay.cpp @@ -0,0 +1,33 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include + +namespace ripple { + +Json::Value +doTxReduceRelay(RPC::JsonContext& context) +{ + return context.app.overlay().txMetrics(); +} + +} // namespace ripple diff --git a/src/ripple/rpc/impl/Handler.cpp b/src/ripple/rpc/impl/Handler.cpp index c4360b7a5b..6e313931f8 100644 --- a/src/ripple/rpc/impl/Handler.cpp +++ b/src/ripple/rpc/impl/Handler.cpp @@ -147,6 +147,7 @@ Handler const handlerArray[]{ {"transaction_entry", byRef(&doTransactionEntry), Role::USER, NO_CONDITION}, {"tx", byRef(&doTxJson), Role::USER, NEEDS_NETWORK_CONNECTION}, {"tx_history", byRef(&doTxHistory), Role::USER, NO_CONDITION}, + {"tx_reduce_relay", byRef(&doTxReduceRelay), Role::USER, NO_CONDITION}, {"unl_list", byRef(&doUnlList), Role::ADMIN, NO_CONDITION}, {"validation_create", byRef(&doValidationCreate), diff --git a/src/test/app/LedgerReplay_test.cpp b/src/test/app/LedgerReplay_test.cpp index 7e1ecf9024..549495d40b 100644 --- a/src/test/app/LedgerReplay_test.cpp +++ b/src/test/app/LedgerReplay_test.cpp @@ -289,6 +289,23 @@ public: { return false; } + void + sendTxQueue() override + { + } + void + addTxQueue(const uint256&) override + { + } + void + removeTxQueue(const uint256&) override + { + } + bool + txReduceRelayEnabled() const override + { + return false; + } bool ledgerReplayEnabled_; }; @@ -1060,7 +1077,8 @@ struct LedgerReplayer_test : public beast::unit_test::suite { testcase("handshake test"); auto handshake = [&](bool client, bool server, bool expecting) -> bool { - auto request = ripple::makeRequest(true, false, false, client); + auto request = + ripple::makeRequest(true, false, client, false, false); http_request_type http_request; http_request.version(request.version()); http_request.base() = request.base(); diff --git a/src/test/overlay/compression_test.cpp b/src/test/overlay/compression_test.cpp index 66155da20b..e24cb09f96 100644 --- a/src/test/overlay/compression_test.cpp +++ b/src/test/overlay/compression_test.cpp @@ -488,8 +488,9 @@ public: auto request = ripple::makeRequest( true, env->app().config().COMPRESSION, - env->app().config().VP_REDUCE_RELAY_ENABLE, - false); + false, + env->app().config().TX_REDUCE_RELAY_ENABLE, + env->app().config().VP_REDUCE_RELAY_ENABLE); http_request_type http_request; http_request.version(request.version()); http_request.base() = request.base(); diff --git a/src/test/overlay/reduce_relay_test.cpp b/src/test/overlay/reduce_relay_test.cpp index dec1d87988..1839220dc4 100644 --- a/src/test/overlay/reduce_relay_test.cpp +++ b/src/test/overlay/reduce_relay_test.cpp @@ -159,6 +159,23 @@ public: { return false; } + bool + txReduceRelayEnabled() const override + { + return false; + } + void + sendTxQueue() override + { + } + void + addTxQueue(const uint256&) override + { + } + void + removeTxQueue(const uint256&) override + { + } }; /** Manually advanced clock. */ @@ -491,7 +508,7 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler public: using id_t = Peer::id_t; using clock_type = ManualClock; - OverlaySim(Application& app) : slots_(app, *this), app_(app) + OverlaySim(Application& app) : slots_(app.logs(), *this), logs_(app.logs()) { } @@ -545,7 +562,7 @@ public: Peer::id_t id; if (peersCache_.empty() || !useCache) { - peer = std::make_shared(*this, app_.journal("Squelch")); + peer = std::make_shared(*this, logs_.journal("Squelch")); id = peer->id(); } else @@ -665,7 +682,7 @@ private: Peers peers_; Peers peersCache_; reduce_relay::Slots slots_; - Application& app_; + Logs& logs_; }; class Network @@ -1398,7 +1415,8 @@ vp_squelched=1 auto run = [&](int npeers) { handler.maxDuration_ = 0; - reduce_relay::Slots slots(env_.app(), handler); + reduce_relay::Slots slots( + env_.app().logs(), handler); // 1st message from a new peer switches the slot // to counting state and resets the counts of all peers + // MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold @@ -1494,8 +1512,9 @@ vp_squelched=1 auto request = ripple::makeRequest( true, env_.app().config().COMPRESSION, - env_.app().config().VP_REDUCE_RELAY_ENABLE, - false); + false, + env_.app().config().TX_REDUCE_RELAY_ENABLE, + env_.app().config().VP_REDUCE_RELAY_ENABLE); http_request_type http_request; http_request.version(request.version()); http_request.base() = request.base(); diff --git a/src/test/overlay/tx_reduce_relay_test.cpp b/src/test/overlay/tx_reduce_relay_test.cpp new file mode 100644 index 0000000000..7b26ad0962 --- /dev/null +++ b/src/test/overlay/tx_reduce_relay_test.cpp @@ -0,0 +1,275 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== +#include +#include +#include +#include +#include +#include + +namespace ripple { + +namespace test { + +class tx_reduce_relay_test : public beast::unit_test::suite +{ +public: + using socket_type = boost::asio::ip::tcp::socket; + using middle_type = boost::beast::tcp_stream; + using stream_type = boost::beast::ssl_stream; + using shared_context = std::shared_ptr; + +private: + void + doTest(const std::string& msg, bool log, std::function f) + { + testcase(msg); + f(log); + } + + void + testConfig(bool log) + { + doTest("Config Test", log, [&](bool log) { + auto test = [&](bool enable, + bool metrics, + std::uint16_t min, + std::uint16_t pct, + bool success = true) { + std::stringstream str("[reduce_relay]"); + str << "[reduce_relay]\n" + << "tx_enable=" << static_cast(enable) << "\n" + << "tx_metrics=" << static_cast(metrics) << "\n" + << "tx_min_peers=" << min << "\n" + << "tx_relay_percentage=" << pct << "\n"; + Config c; + try + { + c.loadFromString(str.str()); + + BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable); + BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics); + BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min); + BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct); + if (success) + pass(); + else + fail(); + } + catch (...) + { + if (success) + fail(); + else + pass(); + } + }; + + test(true, true, 20, 25); + test(false, false, 20, 25); + test(false, false, 20, 0, false); + test(false, false, 20, 101, false); + test(false, false, 9, 10, false); + test(false, false, 10, 9, false); + }); + } + + class PeerTest : public PeerImp + { + public: + PeerTest( + Application& app, + std::shared_ptr const& slot, + http_request_type&& request, + PublicKey const& publicKey, + ProtocolVersion protocol, + Resource::Consumer consumer, + std::unique_ptr&& stream_ptr, + OverlayImpl& overlay) + : PeerImp( + app, + sid_, + slot, + std::move(request), + publicKey, + protocol, + consumer, + std::move(stream_ptr), + overlay) + { + sid_++; + } + ~PeerTest() = default; + + void + run() override + { + } + void + send(std::shared_ptr const&) override + { + sendTx_++; + } + void + addTxQueue(const uint256& hash) override + { + queueTx_++; + } + static void + init() + { + queueTx_ = 0; + sendTx_ = 0; + sid_ = 0; + } + inline static std::size_t sid_ = 0; + inline static std::uint16_t queueTx_ = 0; + inline static std::uint16_t sendTx_ = 0; + }; + + std::uint16_t lid_{0}; + std::uint16_t rid_{1}; + shared_context context_; + ProtocolVersion protocolVersion_; + boost::beast::multi_buffer read_buf_; + +public: + tx_reduce_relay_test() + : context_(make_SSLContext("")), protocolVersion_{1, 7} + { + } + +private: + void + addPeer( + jtx::Env& env, + std::vector>& peers, + std::uint16_t& nDisabled) + { + auto& overlay = dynamic_cast(env.app().overlay()); + boost::beast::http::request request; + (nDisabled == 0) + ? (void)request.insert( + "X-Protocol-Ctl", + makeFeaturesRequestHeader(false, false, true, false)) + : (void)nDisabled--; + auto stream_ptr = std::make_unique( + socket_type(std::forward( + env.app().getIOService())), + *context_); + beast::IP::Endpoint local( + beast::IP::Address::from_string("172.1.1." + std::to_string(lid_))); + beast::IP::Endpoint remote( + beast::IP::Address::from_string("172.1.1." + std::to_string(rid_))); + PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519))); + auto consumer = overlay.resourceManager().newInboundEndpoint(remote); + auto slot = overlay.peerFinder().new_inbound_slot(local, remote); + auto const peer = std::make_shared( + env.app(), + slot, + std::move(request), + key, + protocolVersion_, + consumer, + std::move(stream_ptr), + overlay); + overlay.add_active(peer); + peers.emplace_back(peer); // overlay stores week ptr to PeerImp + lid_ += 2; + rid_ += 2; + assert(lid_ <= 254); + } + + void + testRelay( + std::string const& test, + bool txRREnabled, + std::uint16_t nPeers, + std::uint16_t nDisabled, + std::uint16_t minPeers, + std::uint16_t relayPercentage, + std::uint16_t expectRelay, + std::uint16_t expectQueue, + std::set const& toSkip = {}) + { + testcase(test); + jtx::Env env(*this); + std::vector> peers; + env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled; + env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers; + env.app().config().TX_RELAY_PERCENTAGE = relayPercentage; + PeerTest::init(); + lid_ = 0; + rid_ = 0; + for (int i = 0; i < nPeers; i++) + addPeer(env, peers, nDisabled); + protocol::TMTransaction m; + m.set_rawtransaction("transaction"); + m.set_deferred(false); + m.set_status(protocol::TransactionStatus::tsNEW); + env.app().overlay().relay(uint256{0}, m, toSkip); + BEAST_EXPECT( + PeerTest::sendTx_ == expectRelay && + PeerTest::queueTx_ == expectQueue); + } + + void + run() override + { + bool log = false; + std::set skip = {0, 1, 2, 3, 4}; + testConfig(log); + // relay to all peers, no hash queue + testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0); + // relay to nPeers - skip (10-5=5) + testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0, skip); + // relay to all peers because min is greater than nPeers + testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0); + // relay to all peers because min + disabled is greater thant nPeers + testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0); + // relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30), + // queue the rest (30) + testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30); + // relay to minPeers + 25% of (nPeers - nPeers) - skip + // (20+0.25*(60-20)-5=25), queue the rest, skip counts towards relayed + // (60-25-5=30) + testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip); + // relay to minPeers + disabled + 25% of (nPeers - minPeers - disalbed) + // (20+10+0.25*(70-20-10)=40), queue the rest (30) + testRelay("disabled", true, 70, 10, 20, 25, 40, 30); + // relay to minPeers + disabled-not-in-skip + 25% of (nPeers - minPeers + // - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts + // towards relayed (70-35-5=30)) + testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip); + // relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled) + // - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts + // towards relayed (15-5-10=0) + skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0, skip); + // relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled) + // - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts + // towards relayed (20-14=6) + skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}; + testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6, skip); + } +}; + +BEAST_DEFINE_TESTSUITE(tx_reduce_relay, ripple_data, ripple); +} // namespace test +} // namespace ripple \ No newline at end of file