From fbfb4bd74ecfffcf3d77f28aade0b0fcebeebcbb Mon Sep 17 00:00:00 2001 From: Mark Travis Date: Sat, 9 Dec 2017 17:10:54 -0800 Subject: [PATCH] Control transaction dispatch rate: Do not dispatch a transaction received from a peer for processing, if it has already been dispatched within the past ten seconds. Increase the number of transaction handlers that can be in flight in the job queue and decrease the relative cost for peers to share transaction and ledger data. Additionally, make better use of resources by adjusting the number of threads we initialize, by reverting commit 68b8ffdb638d07937f841f7217edeb25efdb3b5d. --- src/ripple/app/main/Application.cpp | 4 +--- src/ripple/app/misc/HashRouter.cpp | 12 ++++++++++ src/ripple/app/misc/HashRouter.h | 13 ++++++++++ src/ripple/core/JobQueue.h | 3 +-- src/ripple/core/impl/JobQueue.cpp | 11 +++------ src/ripple/overlay/impl/PeerImp.cpp | 12 ++++++---- src/ripple/resource/impl/Fees.cpp | 37 ++++++++++++----------------- src/ripple/resource/impl/Tuning.h | 6 ++--- 8 files changed, 56 insertions(+), 42 deletions(-) diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 946374dccf..6a3b058591 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -1050,9 +1050,7 @@ private: bool ApplicationImp::setup() { // VFALCO NOTE: 0 means use heuristics to determine the thread count. - m_jobQueue->setThreadCount (config_->WORKERS, config_->standalone(), - config_->exists (SECTION_VALIDATOR_TOKEN) || - config_->exists (SECTION_VALIDATION_SEED)); + m_jobQueue->setThreadCount (config_->WORKERS, config_->standalone()); // We want to intercept and wait for CTRL-C to terminate the process m_signals.add (SIGINT); diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index 8e40712605..42a846a416 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -71,6 +71,18 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int& return result.second; } +bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags, + Stopwatch::time_point now, std::chrono::seconds interval) +{ + std::lock_guard lock (mutex_); + + auto result = emplace(key); + auto& s = result.first; + s.addPeer (peer); + flags = s.getFlags (); + return s.shouldProcess (now, interval); +} + int HashRouter::getFlags (uint256 const& key) { std::lock_guard lock (mutex_); diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index a40d3558bf..aa27b255ba 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -118,12 +118,21 @@ private: return ++recoveries_ % limit != 0; } + bool shouldProcess(Stopwatch::time_point now, std::chrono::seconds interval) + { + if (processed_ && ((*processed_ + interval) > now)) + return false; + processed_.emplace (now); + return true; + } + private: int flags_ = 0; std::set peers_; // This could be generalized to a map, if more // than one flag needs to expire independently. boost::optional relayed_; + boost::optional processed_; std::uint32_t recoveries_ = 0; }; @@ -161,6 +170,10 @@ public: bool addSuppressionPeer (uint256 const& key, PeerShortID peer, int& flags); + // Add a peer suppression and return whether the entry should be processed + bool shouldProcess (uint256 const& key, PeerShortID peer, + int& flags, Stopwatch::time_point now, std::chrono::seconds interval); + /** Set the flags on a hash. @return `true` if the flags were changed. `false` if unchanged. diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h index b5c9228e89..303a5542b9 100644 --- a/src/ripple/core/JobQueue.h +++ b/src/ripple/core/JobQueue.h @@ -180,8 +180,7 @@ public: /** Set the number of thread serving the job queue to precisely this number. */ - void setThreadCount (int c, bool const standaloneMode, - bool const validator=true); + void setThreadCount (int c, bool const standaloneMode); /** Return a scoped LoadEvent. */ diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index f68897e802..0a83ffcb50 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -153,8 +153,7 @@ JobQueue::getJobCountGE (JobType t) const } void -JobQueue::setThreadCount (int c, bool const standaloneMode, - bool const validator) +JobQueue::setThreadCount (int c, bool const standaloneMode) { if (standaloneMode) { @@ -163,13 +162,9 @@ JobQueue::setThreadCount (int c, bool const standaloneMode, else if (c == 0) { c = static_cast(std::thread::hardware_concurrency()); - if (validator) - c = 2 + std::min(c, 4); // I/O will bottleneck - else - c *= 2; // Tested to improve stability under high RPC load. + c = 2 + std::min(c, 4); // I/O will bottleneck JLOG (m_journal.info()) << "Auto-tuning to " << c << - " validation/transaction/proposal threads for " << - (validator ? "" : "non-") << "validator."; + " validation/transaction/proposal threads."; } else { diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 442f6e063a..3bc8f15a88 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1055,8 +1055,9 @@ PeerImp::onMessage (std::shared_ptr const& m) int flags; - if (! app_.getHashRouter ().addSuppressionPeer ( - txID, id_, flags)) + constexpr std::chrono::seconds tx_interval = 10s; + if (! app_.getHashRouter ().shouldProcess ( + txID, id_, flags, clock_type::now(), tx_interval)) { // we have seen this transaction recently if (flags & SF_BAD) @@ -1064,8 +1065,9 @@ PeerImp::onMessage (std::shared_ptr const& m) fee_ = Resource::feeInvalidSignature; JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID; - return; } + + return; } JLOG(p_journal_.debug()) << "Got tx " << txID; @@ -1088,7 +1090,9 @@ PeerImp::onMessage (std::shared_ptr const& m) } } - if (app_.getJobQueue().getJobCount(jtTRANSACTION) > 100) + // The maximum number of transactions to have in the job queue. + constexpr int max_transactions = 250; + if (app_.getJobQueue().getJobCount(jtTRANSACTION) > max_transactions) { JLOG(p_journal_.info()) << "Transaction queue is full"; } diff --git a/src/ripple/resource/impl/Fees.cpp b/src/ripple/resource/impl/Fees.cpp index b1d03526be..0ca4e4b41d 100644 --- a/src/ripple/resource/impl/Fees.cpp +++ b/src/ripple/resource/impl/Fees.cpp @@ -23,31 +23,24 @@ namespace ripple { namespace Resource { -Charge const feeInvalidRequest ( 10, "malformed request" ); -Charge const feeRequestNoReply ( 1, "unsatisfiable request" ); -Charge const feeInvalidSignature ( 100, "invalid signature" ); -Charge const feeUnwantedData ( 15, "useless data" ); -Charge const feeBadData ( 20, "invalid data" ); +Charge const feeInvalidRequest ( 100, "malformed request" ); +Charge const feeRequestNoReply ( 10, "unsatisfiable request" ); +Charge const feeInvalidSignature ( 1000, "invalid signature" ); +Charge const feeUnwantedData ( 150, "useless data" ); +Charge const feeBadData ( 200, "invalid data" ); -Charge const feeInvalidRPC ( 10, "malformed RPC" ); -Charge const feeReferenceRPC ( 2, "reference RPC" ); -Charge const feeExceptionRPC ( 10, "exceptioned RPC" ); -Charge const feeLightRPC ( 5, "light RPC" ); // DAVID: Check the cost -Charge const feeLowBurdenRPC ( 20, "low RPC" ); -Charge const feeMediumBurdenRPC ( 40, "medium RPC" ); -Charge const feeHighBurdenRPC ( 300, "heavy RPC" ); +Charge const feeInvalidRPC ( 100, "malformed RPC" ); +Charge const feeReferenceRPC ( 20, "reference RPC" ); +Charge const feeExceptionRPC ( 100, "exceptioned RPC" ); +Charge const feeMediumBurdenRPC ( 400, "medium RPC" ); +Charge const feeHighBurdenRPC ( 3000, "heavy RPC" ); -Charge const feeLightPeer (1, "trivial peer request" ); -Charge const feeLowBurdenPeer (2, "simple peer request" ); -Charge const feeMediumBurdenPeer (50, "moderate peer request" ); -Charge const feeHighBurdenPeer (250, "heavy peer request" ); +Charge const feeLightPeer ( 1, "trivial peer request" ); +Charge const feeMediumBurdenPeer ( 250, "moderate peer request" ); +Charge const feeHighBurdenPeer ( 2000, "heavy peer request" ); -Charge const feeNewTrustedNote ( 10, "trusted note" ); -Charge const feeNewValidTx ( 10, "valid tx" ); -Charge const feeSatisfiedRequest ( 10, "needed data" ); - -Charge const feeWarning ( 200, "received warning" ); -Charge const feeDrop ( 300, "dropped" ); +Charge const feeWarning ( 2000, "received warning" ); +Charge const feeDrop ( 3000, "dropped" ); } } diff --git a/src/ripple/resource/impl/Tuning.h b/src/ripple/resource/impl/Tuning.h index 9b300be8fa..8368f77e1e 100644 --- a/src/ripple/resource/impl/Tuning.h +++ b/src/ripple/resource/impl/Tuning.h @@ -27,10 +27,10 @@ namespace Resource { enum { // Balance at which a warning is issued - warningThreshold = 500 + warningThreshold = 5000 // Balance at which the consumer is disconnected - ,dropThreshold = 1500 + ,dropThreshold = 15000 // The number of seconds until an inactive table item is removed ,secondsUntilExpiration = 300 @@ -40,7 +40,7 @@ enum ,decayWindowSeconds = 32 // The minimum balance required in order to include a load source in gossip - ,minimumGossipBalance = 100 + ,minimumGossipBalance = 1000 // Number of seconds until imported gossip expires ,gossipExpirationSeconds = 30