diff --git a/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp b/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp index 73468a33e9..b1c53ca33a 100644 --- a/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp +++ b/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp @@ -1300,8 +1300,8 @@ void LedgerConsensusImp::addDisputedTransaction ( } } - // If we didn't relay this transaction recently, relay it - if (app_.getHashRouter ().setFlags (txID, SF_RELAYED)) + // If we didn't relay this transaction recently, relay it to all peers + if (app_.getHashRouter ().shouldRelay (txID)) { protocol::TMTransaction msg; msg.set_rawtransaction (& (tx.front ()), tx.size ()); diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index 83a9c07a06..fa0dc3f4d6 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -26,35 +26,34 @@ auto HashRouter::emplace (uint256 const& key) -> std::pair { - auto iter = mSuppressionMap.find (key); + auto iter = suppressionMap_.find (key); - if (iter != mSuppressionMap.end ()) + if (iter != suppressionMap_.end ()) { - mSuppressionMap.touch(iter); + suppressionMap_.touch(iter); return std::make_pair( std::ref(iter->second), false); } // See if any supressions need to be expired - expire(mSuppressionMap, - mHoldTime); + expire(suppressionMap_, holdTime_); return std::make_pair(std::ref( - mSuppressionMap.emplace ( + suppressionMap_.emplace ( key, Entry ()).first->second), true); } void HashRouter::addSuppression (uint256 const& key) { - std::lock_guard lock (mMutex); + std::lock_guard lock (mutex_); emplace (key); } bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer) { - std::lock_guard lock (mMutex); + std::lock_guard lock (mutex_); auto result = emplace(key); result.first.addPeer(peer); @@ -63,7 +62,7 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer) bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int& flags) { - std::lock_guard lock (mMutex); + std::lock_guard lock (mutex_); auto result = emplace(key); auto& s = result.first; @@ -74,7 +73,7 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int& int HashRouter::getFlags (uint256 const& key) { - std::lock_guard lock (mMutex); + std::lock_guard lock (mutex_); return emplace(key).first.getFlags (); } @@ -83,7 +82,7 @@ bool HashRouter::setFlags (uint256 const& key, int flags) { assert (flags != 0); - std::lock_guard lock (mMutex); + std::lock_guard lock (mutex_); auto& s = emplace(key).first; @@ -94,19 +93,18 @@ bool HashRouter::setFlags (uint256 const& key, int flags) return true; } -bool HashRouter::swapSet (uint256 const& key, std::set& peers, int flag) +auto +HashRouter::shouldRelay (uint256 const& key) + -> boost::optional> { - std::lock_guard lock (mMutex); + std::lock_guard lock (mutex_); auto& s = emplace(key).first; - if ((s.getFlags () & flag) == flag) - return false; + if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_)) + return boost::none; - s.swapSet (peers); - s.setFlags (flag); - - return true; + return std::move(s.peekPeers()); } } // ripple diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index a8a6f1ba1a..adc2d3ab5e 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -25,12 +25,12 @@ #include #include #include +#include namespace ripple { // VFALCO NOTE Are these the flags?? Why aren't we using a packed struct? // VFALCO TODO convert these macros to int constants -#define SF_RELAYED 0x01 // Has already been relayed to other nodes // VFALCO NOTE How can both bad and good be set on a hash? #define SF_BAD 0x02 // Temporarily bad #define SF_SAVED 0x04 @@ -68,50 +68,48 @@ private: { } - std::set const& peekPeers () const - { - return peers_; - } - void addPeer (PeerShortID peer) { if (peer != 0) peers_.insert (peer); } - bool hasPeer (PeerShortID peer) const - { - return peers_.count (peer) > 0; - } - int getFlags (void) const { return flags_; } - bool hasFlag (int mask) const - { - return (flags_ & mask) != 0; - } - void setFlags (int flagsToSet) { flags_ |= flagsToSet; } - void clearFlag (int flagsToClear) + std::set & peekPeers() { - flags_ &= ~flagsToClear; + return peers_; } - void swapSet (std::set & other) + /** Determines if this item should be relayed. + + Checks whether the item has been recently relayed. + If it has, return false. If it has not, update the + last relay timestamp and return true. + */ + bool shouldRelay (Stopwatch::time_point const& now, + std::chrono::seconds holdTime) { - peers_.swap (other); + if (relayed_ && *relayed_ + holdTime > now) + return false; + relayed_.emplace(now); + return true; } private: int flags_; std::set peers_; + // This could be generalized to a map, if more + // than one flag needs to expire independently. + boost::optional relayed_; }; public: @@ -123,8 +121,8 @@ public: } HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds) - : mSuppressionMap(clock) - , mHoldTime (entryHoldTimeInSeconds) + : suppressionMap_(clock) + , holdTime_ (entryHoldTimeInSeconds) { } @@ -149,19 +147,31 @@ public: int getFlags (uint256 const& key); - bool swapSet (uint256 const& key, std::set& peers, int flag); + /** Determines whether the hashed item should be relayed. + + Effects: + + If the item should be relayed, this function will not + return `true` again until the hold time has expired. + The internal set of peers will also be reset. + + @return A `boost::optional` set of peers which do not need to be + relayed to. If the result is uninitialized, the item should + _not_ be relayed. + */ + boost::optional> shouldRelay(uint256 const& key); private: // pair.second indicates whether the entry was created std::pair emplace (uint256 const&); - std::mutex mutable mMutex; + std::mutex mutable mutex_; // Stores all suppressed hashes and their expiration time beast::aged_unordered_map> mSuppressionMap; + hardened_hash> suppressionMap_; - std::chrono::seconds const mHoldTime; + std::chrono::seconds const holdTime_; }; } // ripple diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 55822e1e91..27d47045d0 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1059,10 +1059,10 @@ void NetworkOPsImp::apply (std::unique_lock& batchLock) (e.failType != FailHard::yes) && e.local) || (e.result == terQUEUED)) { - std::set peers; + auto const toSkip = app_.getHashRouter().shouldRelay( + e.transaction->getID()); - if (app_.getHashRouter().swapSet ( - e.transaction->getID(), peers, SF_RELAYED)) + if (toSkip) { protocol::TMTransaction tx; Serializer s; @@ -1075,7 +1075,7 @@ void NetworkOPsImp::apply (std::unique_lock& batchLock) // FIXME: This should be when we received it app_.overlay().foreach (send_if_not ( std::make_shared (tx, protocol::mtTRANSACTION), - peer_in_set(peers))); + peer_in_set(*toSkip))); } } } diff --git a/src/ripple/app/misc/impl/TxQ.cpp b/src/ripple/app/misc/impl/TxQ.cpp index 44ea36f72f..d68f8b1870 100644 --- a/src/ripple/app/misc/impl/TxQ.cpp +++ b/src/ripple/app/misc/impl/TxQ.cpp @@ -308,7 +308,7 @@ TxQ::canBeHeld(STTx const& tx, OpenView const& view, an early one fail or get dropped. */ canBeHeld = accountIter == byAccount_.end() || - !replacementIter || + replacementIter || accountIter->second.getTxnCount() < setup_.maximumTxnPerAccount; } @@ -521,9 +521,10 @@ TxQ::apply(Application& app, OpenView& view, */ if (std::next(existingIter) != txQAcct.transactions.end()) { - // Only the last tx in the queue should have - // !consequences, and this can't be the last tx. - assert(existingIter->second.consequences); + // Normally, only the last tx in the queue will have + // !consequences, but an expired transaction can be + // replaced, and that replacement won't have it set, + // and that's ok. if (!existingIter->second.consequences) existingIter->second.consequences.emplace( calculateConsequences( diff --git a/src/ripple/app/tests/HashRouter_test.cpp b/src/ripple/app/tests/HashRouter_test.cpp index f92fe48cc6..f4646b17e2 100644 --- a/src/ripple/app/tests/HashRouter_test.cpp +++ b/src/ripple/app/tests/HashRouter_test.cpp @@ -30,8 +30,9 @@ class HashRouter_test : public beast::unit_test::suite void testNonExpiration() { + using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, std::chrono::seconds(2)); + HashRouter router(stopwatch, 2s); uint256 const key1(1); uint256 const key2(2); @@ -66,8 +67,9 @@ class HashRouter_test : public beast::unit_test::suite void testExpiration() { + using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, std::chrono::seconds(2)); + HashRouter router(stopwatch, 2s); uint256 const key1(1); uint256 const key2(2); @@ -144,8 +146,9 @@ class HashRouter_test : public beast::unit_test::suite void testSuppression() { // Normal HashRouter + using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, std::chrono::seconds(2)); + HashRouter router(stopwatch, 2s); uint256 const key1(1); uint256 const key2(2); @@ -173,8 +176,9 @@ class HashRouter_test : public beast::unit_test::suite void testSetFlags() { + using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, std::chrono::seconds(2)); + HashRouter router(stopwatch, 2s); uint256 const key1(1); expect(router.setFlags(key1, 10)); @@ -183,33 +187,46 @@ class HashRouter_test : public beast::unit_test::suite } void - testSwapSet() + testRelay() { + using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, std::chrono::seconds(2)); + HashRouter router(stopwatch, 1s); uint256 const key1(1); - std::set peers1; - std::set peers2; + boost::optional> peers; - peers1.emplace(1); - peers1.emplace(3); - peers1.emplace(5); - peers2.emplace(2); - peers2.emplace(4); - - expect(router.swapSet(key1, peers1, 135)); - expect(peers1.empty()); - expect(router.getFlags(key1) == 135); - // No action, because flag matches - expect(!router.swapSet(key1, peers2, 135)); - expect(peers2.size() == 2); - expect(router.getFlags(key1) == 135); - // Do a swap - expect(router.swapSet(key1, peers2, 24)); - expect(peers2.size() == 3); - expect(router.getFlags(key1) == (135 | 24)); + peers = router.shouldRelay(key1); + expect(peers && peers->empty()); + router.addSuppressionPeer(key1, 1); + router.addSuppressionPeer(key1, 3); + router.addSuppressionPeer(key1, 5); + // No action, because relayed + expect(!router.shouldRelay(key1)); + // Expire, but since the next search will + // be for this entry, it will get refreshed + // instead. However, the relay won't. + ++stopwatch; + // Get those peers we added earlier + peers = router.shouldRelay(key1); + expect(peers && peers->size() == 3); + router.addSuppressionPeer(key1, 2); + router.addSuppressionPeer(key1, 4); + // No action, because relayed + expect(!router.shouldRelay(key1)); + // Expire, but since the next search will + // be for this entry, it will get refreshed + // instead. However, the relay won't. + ++stopwatch; + // Relay again + peers = router.shouldRelay(key1); + expect(peers && peers->size() == 2); + // Expire again + ++stopwatch; + // Confirm that peers list is empty. + peers = router.shouldRelay(key1); + expect(peers && peers->size() == 0); } public: @@ -221,7 +238,7 @@ public: testExpiration(); testSuppression(); testSetFlags(); - testSwapSet(); + testRelay(); } }; diff --git a/src/ripple/app/tests/TxQ_test.cpp b/src/ripple/app/tests/TxQ_test.cpp index af59d360b5..592c679853 100644 --- a/src/ripple/app/tests/TxQ_test.cpp +++ b/src/ripple/app/tests/TxQ_test.cpp @@ -831,6 +831,22 @@ public: // Alice is broke env.require(balance(alice, XRP(0))); env(noop(alice), ter(terINSUF_FEE_B)); + + // Bob tries to queue up more than the single + // account limit (10) txs. + fillQueue(env, bob); + bobSeq = env.seq(bob); + checkMetrics(env, 0, 12, 7, 6, 256); + for (int i = 0; i < 10; ++i) + env(noop(bob), seq(bobSeq + i), queued); + checkMetrics(env, 10, 12, 7, 6, 256); + // Bob hit the single account limit + env(noop(bob), seq(bobSeq + 10), ter(terPRE_SEQ)); + checkMetrics(env, 10, 12, 7, 6, 256); + // Bob can replace one of the earlier txs regardless + // of the limit + env(noop(bob), seq(bobSeq + 5), fee(20), queued); + checkMetrics(env, 10, 12, 7, 6, 256); } void testTieBreaking() @@ -1565,6 +1581,93 @@ public: } } + void testExpirationReplacement() + { + /* This test is based on a reported regression where a + replacement candidate transaction found it's replacee + did not have `consequences` set + + Hypothesis: The queue had '22 through '25. At some point(s), + both the original '22 and '23 expired and were removed from + the queue. A second '22 was submitted, and the multi-tx logic + did not kick in, because it matched the account's sequence + number (a_seq == t_seq). The third '22 was submitted and found + the '22 in the queue did not have consequences. + */ + using namespace jtx; + + Env env(*this, makeConfig({ { "minimum_txn_in_ledger_standalone", "1" }, + {"ledgers_in_queue", "10"}, {"maximum_txn_per_account", "20"} }), + features(featureFeeEscalation)); + + // Alice will recreate the scenario. Bob will block. + auto const alice = Account("alice"); + auto const bob = Account("bob"); + + env.fund(XRP(500000), noripple(alice, bob)); + checkMetrics(env, 0, boost::none, 2, 1, 256); + + auto const aliceSeq = env.seq(alice); + expect(env.current()->info().seq == 3); + env(noop(alice), seq(aliceSeq), json(R"({"LastLedgerSequence":5})"), ter(terQUEUED)); + env(noop(alice), seq(aliceSeq + 1), json(R"({"LastLedgerSequence":5})"), ter(terQUEUED)); + env(noop(alice), seq(aliceSeq + 2), json(R"({"LastLedgerSequence":10})"), ter(terQUEUED)); + env(noop(alice), seq(aliceSeq + 3), json(R"({"LastLedgerSequence":11})"), ter(terQUEUED)); + checkMetrics(env, 4, boost::none, 2, 1, 256); + auto const bobSeq = env.seq(bob); + // Ledger 4 gets 3, + // Ledger 5 gets 4, + // Ledger 6 gets 5. + for (int i = 0; i < 3 + 4 + 5; ++i) + { + env(noop(bob), seq(bobSeq + i), fee(200), ter(terQUEUED)); + } + checkMetrics(env, 4 + 3 + 4 + 5, boost::none, 2, 1, 256); + // Close ledger 3 + env.close(); + checkMetrics(env, 4 + 4 + 5, 20, 3, 2, 256); + // Close ledger 4 + env.close(); + checkMetrics(env, 4 + 5, 30, 4, 3, 256); + // Close ledger 5 + env.close(); + // Alice's first two txs expired. + checkMetrics(env, 2, 40, 5, 4, 256); + + // Because aliceSeq is missing, aliceSeq + 1 fails + env(noop(alice), seq(aliceSeq + 1), ter(terPRE_SEQ)); + + // Queue up a new aliceSeq tx. + // This will only do some of the multiTx validation to + // improve the chances that the orphaned txs can be + // recovered. Because the cost of relaying the later txs + // has already been paid, this tx could potentially be a + // blocker. + env(fset(alice, asfAccountTxnID), seq(aliceSeq), ter(terQUEUED)); + checkMetrics(env, 3, 40, 5, 4, 256); + + // Even though consequences were not computed, we can replace it. + env(noop(alice), seq(aliceSeq), fee(20), ter(terQUEUED)); + checkMetrics(env, 3, 40, 5, 4, 256); + + // Queue up a new aliceSeq + 1 tx. + // This tx will also only do some of the multiTx validation. + env(fset(alice, asfAccountTxnID), seq(aliceSeq + 1), ter(terQUEUED)); + checkMetrics(env, 4, 40, 5, 4, 256); + + // Even though consequences were not computed, we can replace it, + // too. + env(noop(alice), seq(aliceSeq +1), fee(20), ter(terQUEUED)); + checkMetrics(env, 4, 40, 5, 4, 256); + + // Close ledger 6 + env.close(); + // We expect that all of alice's queued tx's got into + // the open ledger. + checkMetrics(env, 0, 50, 4, 5, 256); + expect(env.seq(alice) == aliceSeq + 4); + } + void run() { testQueue(); @@ -1583,6 +1686,7 @@ public: testInFlightBalance(); testConsequences(); testRPC(); + testExpirationReplacement(); } }; diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 1c2c60c388..9eddb9ea12 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -747,8 +747,7 @@ OverlayImpl::onManifests ( { // Historical manifests are sent on initial peer connections. // They do not need to be forwarded to other peers. - std::set peers; - hashRouter.swapSet (hash, peers, SF_RELAYED); + hashRouter.shouldRelay (hash); continue; } @@ -757,11 +756,11 @@ OverlayImpl::onManifests ( protocol::TMManifests o; o.add_list ()->set_stobject (s); - std::set peers; - hashRouter.swapSet (hash, peers, SF_RELAYED); - foreach (send_if_not ( - std::make_shared(o, protocol::mtMANIFESTS), - peer_in_set (peers))); + auto const toSkip = hashRouter.shouldRelay (hash); + if(toSkip) + foreach (send_if_not ( + std::make_shared(o, protocol::mtMANIFESTS), + peer_in_set (*toSkip))); } else { @@ -978,15 +977,14 @@ OverlayImpl::relay (protocol::TMProposeSet& m, { if (m.has_hops() && m.hops() >= maxTTL) return; - std::set skip; - if (! app_.getHashRouter().swapSet ( - uid, skip, SF_RELAYED)) + auto const toSkip = app_.getHashRouter().shouldRelay(uid); + if (!toSkip) return; auto const sm = std::make_shared( m, protocol::mtPROPOSE_LEDGER); for_each([&](std::shared_ptr&& p) { - if (skip.find(p->id()) != skip.end()) + if (toSkip->find(p->id()) != toSkip->end()) return; if (! m.has_hops() || p->hopsAware()) p->send(sm); @@ -999,15 +997,14 @@ OverlayImpl::relay (protocol::TMValidation& m, { if (m.has_hops() && m.hops() >= maxTTL) return; - std::set skip; - if (! app_.getHashRouter().swapSet ( - uid, skip, SF_RELAYED)) + auto const toSkip = app_.getHashRouter().shouldRelay(uid); + if (! toSkip) return; auto const sm = std::make_shared( m, protocol::mtVALIDATION); for_each([&](std::shared_ptr&& p) { - if (skip.find(p->id()) != skip.end()) + if (toSkip->find(p->id()) != toSkip->end()) return; if (! m.has_hops() || p->hopsAware()) p->send(sm);