From 76ad06ef47e29efcf77cce57051c937bb23c3b60 Mon Sep 17 00:00:00 2001 From: Mark Travis Date: Wed, 17 Jan 2018 08:34:22 -0800 Subject: [PATCH] Control transaction dispatch rate: Do not process a transaction received from a peer if it has been processed within the past ten seconds. Increase the number of transaction handlers that can be in flight in the job queue and decrease the relative cost for peers to share transaction and ledger data. Additionally, make better use of resources by adjusting the number of threads we initialize, by reverting commit 68b8ffdb638d07937f841f7217edeb25efdb3b5d. Performance counter modifications: * Create and display counters to track: 1) Pending transaction limit overruns. 2) Total peer disconnections. 3) Peers disconnections due to resource consumption. Avoid a potential double-free in Json library. --- src/ripple/app/misc/HashRouter.cpp | 6 ++--- src/ripple/app/misc/HashRouter.h | 4 +-- src/ripple/app/misc/NetworkOPs.cpp | 8 +++++- src/ripple/json/impl/json_value.cpp | 3 ++- src/ripple/json/json_value.h | 2 +- src/ripple/overlay/Overlay.h | 12 +++++++++ src/ripple/overlay/impl/OverlayImpl.h | 39 +++++++++++++++++++++++++++ src/ripple/overlay/impl/PeerImp.cpp | 9 ++++--- src/ripple/protocol/JsonFields.h | 4 +++ src/test/app/HashRouter_test.cpp | 19 +++++++++++++ 10 files changed, 95 insertions(+), 11 deletions(-) diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index 42a846a41..49ed0e4ae 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -71,8 +71,8 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int& return result.second; } -bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags, - Stopwatch::time_point now, std::chrono::seconds interval) +bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, + int& flags, std::chrono::seconds tx_interval) { std::lock_guard lock (mutex_); @@ -80,7 +80,7 @@ bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags auto& s = result.first; s.addPeer (peer); flags = s.getFlags (); - return s.shouldProcess (now, interval); + return s.shouldProcess (suppressionMap_.clock().now(), tx_interval); } int HashRouter::getFlags (uint256 const& key) diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index aa27b255b..f0b92d460 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -171,8 +171,8 @@ public: int& flags); // Add a peer suppression and return whether the entry should be processed - bool shouldProcess (uint256 const& key, PeerShortID peer, - int& flags, Stopwatch::time_point now, std::chrono::seconds interval); + bool shouldProcess (uint256 const& key, PeerShortID peer, int& flags, + std::chrono::seconds tx_interval); /** Set the flags on a hash. diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index b18587dde..140c86b89 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -2367,6 +2367,12 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin) info[jss::state_accounting] = accounting_.json(); info[jss::uptime] = UptimeTimer::getInstance ().getElapsedSeconds (); + info[jss::jq_trans_overflow] = std::to_string( + app_.overlay().getJqTransOverflow()); + info[jss::peer_disconnects] = std::to_string( + app_.overlay().getPeerDisconnect()); + info[jss::peer_disconnects_resources] = std::to_string( + app_.overlay().getPeerDisconnectCharges()); return info; } @@ -3365,7 +3371,7 @@ Json::Value NetworkOPsImp::StateAccounting::json() const ret[states_[i]] = Json::objectValue; auto& state = ret[states_[i]]; state[jss::transitions] = counters[i].transitions; - state[jss::duration_us] = std::to_string (counters[i].dur.count()); + state[jss::duration_us] = std::to_string(counters[i].dur.count()); } return ret; diff --git a/src/ripple/json/impl/json_value.cpp b/src/ripple/json/impl/json_value.cpp index 3b4f06d8d..a9a965f58 100644 --- a/src/ripple/json/impl/json_value.cpp +++ b/src/ripple/json/impl/json_value.cpp @@ -344,7 +344,8 @@ Value::~Value () case arrayValue: case objectValue: - delete value_.map_; + if (value_.map_) + delete value_.map_; break; default: diff --git a/src/ripple/json/json_value.h b/src/ripple/json/json_value.h index 2644223ce..423942dd1 100644 --- a/src/ripple/json/json_value.h +++ b/src/ripple/json/json_value.h @@ -385,7 +385,7 @@ private: double real_; bool bool_; char* string_; - ObjectValues* map_; + ObjectValues* map_ {nullptr}; } value_; ValueType type_ : 8; int allocated_ : 1; // Notes: if declared as bool, bitfield is useless. diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index 103240bba..75cfdf971 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -227,6 +227,18 @@ public: std::size_t selectPeers (PeerSet& set, std::size_t limit, std::function< bool(std::shared_ptr const&)> score) = 0; + + /** Increment and retrieve counter for transaction job queue overflows. */ + virtual void incJqTransOverflow() = 0; + virtual std::uint64_t getJqTransOverflow() const = 0; + + /** Increment and retrieve counters for total peer disconnects, and + * disconnects we initiate for excessive resource consumption. + */ + virtual void incPeerDisconnect() = 0; + virtual std::uint64_t getPeerDisconnect() const = 0; + virtual void incPeerDisconnectCharges() = 0; + virtual std::uint64_t getPeerDisconnectCharges() const = 0; }; struct ScoreHasLedger diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 6791c2020..81b518fcc 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -119,6 +119,9 @@ private: Resolver& m_resolver; std::atomic next_id_; int timer_count_; + std::atomic jqTransOverflow_ {0}; + std::atomic peerDisconnects_ {0}; + std::atomic peerDisconnectsCharges_ {0}; //-------------------------------------------------------------------------- @@ -301,6 +304,42 @@ public: bool isInbound, int bytes); + void + incJqTransOverflow() override + { + ++jqTransOverflow_; + } + + std::uint64_t + getJqTransOverflow() const override + { + return jqTransOverflow_; + } + + void + incPeerDisconnect() override + { + ++peerDisconnects_; + } + + std::uint64_t + getPeerDisconnect() const override + { + return peerDisconnects_; + } + + void + incPeerDisconnectCharges() override + { + ++peerDisconnectsCharges_; + } + + std::uint64_t + getPeerDisconnectCharges() const override + { + return peerDisconnectsCharges_; + }; + private: std::shared_ptr makeRedirectResponse (PeerFinder::Slot::ptr const& slot, diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 0cdc63465..48a55891f 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -222,6 +222,7 @@ PeerImp::charge (Resource::Charge const& fee) usage_.disconnect() && strand_.running_in_this_thread()) { // Sever the connection + overlay_.incPeerDisconnectCharges(); fail("charge: Resources"); } } @@ -414,6 +415,7 @@ PeerImp::close() error_code ec; timer_.cancel(ec); socket_.close(ec); + overlay_.incPeerDisconnect(); if(m_inbound) { JLOG(journal_.debug()) << "Closed"; @@ -1054,10 +1056,10 @@ PeerImp::onMessage (std::shared_ptr const& m) uint256 txID = stx->getTransactionID (); int flags; - constexpr std::chrono::seconds tx_interval = 10s; - if (! app_.getHashRouter ().shouldProcess ( - txID, id_, flags, clock_type::now(), tx_interval)) + + if (! app_.getHashRouter ().shouldProcess (txID, id_, flags, + tx_interval)) { // we have seen this transaction recently if (flags & SF_BAD) @@ -1094,6 +1096,7 @@ PeerImp::onMessage (std::shared_ptr const& m) constexpr int max_transactions = 250; if (app_.getJobQueue().getJobCount(jtTRANSACTION) > max_transactions) { + overlay_.incJqTransOverflow(); JLOG(p_journal_.info()) << "Transaction queue is full"; } else if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min) diff --git a/src/ripple/protocol/JsonFields.h b/src/ripple/protocol/JsonFields.h index 089d702bc..61c9e3e0a 100644 --- a/src/ripple/protocol/JsonFields.h +++ b/src/ripple/protocol/JsonFields.h @@ -215,6 +215,7 @@ JSS ( issuer ); // in: RipplePathFind, Subscribe, // Unsubscribe, BookOffers // out: paths/Node, STPathSet, STAmount JSS ( jsonrpc ); // json version +JSS ( jq_trans_overflow ); // JobQueue transaction limit overflow. JSS ( key ); // out: WalletSeed JSS ( key_type ); // in/out: WalletPropose, TransactionSign JSS ( latency ); // out: PeerImp @@ -326,6 +327,9 @@ JSS ( peer ); // in: AccountLines JSS ( peer_authorized ); // out: AccountLines JSS ( peer_id ); // out: RCLCxPeerPos JSS ( peers ); // out: InboundLedger, handlers/Peers, Overlay +JSS ( peer_disconnects ); // Severed peer connection counter. +JSS ( peer_disconnects_resources ); // Severed peer connections because of + // excess resource consumption. JSS ( port ); // in: Connect JSS ( previous_ledger ); // out: LedgerPropose JSS ( proof ); // in: BookOffers diff --git a/src/test/app/HashRouter_test.cpp b/src/test/app/HashRouter_test.cpp index 85178e08a..0247db65c 100644 --- a/src/test/app/HashRouter_test.cpp +++ b/src/test/app/HashRouter_test.cpp @@ -264,6 +264,24 @@ class HashRouter_test : public beast::unit_test::suite BEAST_EXPECT(!router.shouldRecover(key1)); } + void + testProcess() + { + using namespace std::chrono_literals; + TestStopwatch stopwatch; + HashRouter router(stopwatch, 5s, 5); + uint256 const key(1); + HashRouter::PeerShortID peer = 1; + int flags; + + BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s)); + BEAST_EXPECT(! router.shouldProcess(key, peer, flags, 1s)); + ++stopwatch; + ++stopwatch; + BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s)); + } + + public: void @@ -275,6 +293,7 @@ public: testSetFlags(); testRelay(); testRecover(); + testProcess(); } };