Improve TxQ edge-case handling logic (RIPD-1200):

* HashRouter: Track relay expiration separately from item lifespan.
** Renamed `swapSet` to `shouldRelay`.
** Cleaned up `HashRouter` member names and removed unused code.
** Remove `SF_RELAYED` flag.
* Fix TxQ edge case replacing dropped transactions.
* Fix TxQ bug in maximumTxnPerAccount check.
This commit is contained in:
Edward Hennis
2016-06-15 15:01:21 -04:00
committed by seelabs
parent a22fa21ce4
commit 177a52473a
8 changed files with 223 additions and 96 deletions

View File

@@ -1300,8 +1300,8 @@ void LedgerConsensusImp::addDisputedTransaction (
}
}
// If we didn't relay this transaction recently, relay it
if (app_.getHashRouter ().setFlags (txID, SF_RELAYED))
// If we didn't relay this transaction recently, relay it to all peers
if (app_.getHashRouter ().shouldRelay (txID))
{
protocol::TMTransaction msg;
msg.set_rawtransaction (& (tx.front ()), tx.size ());

View File

@@ -26,35 +26,34 @@ auto
HashRouter::emplace (uint256 const& key)
-> std::pair<Entry&, bool>
{
auto iter = mSuppressionMap.find (key);
auto iter = suppressionMap_.find (key);
if (iter != mSuppressionMap.end ())
if (iter != suppressionMap_.end ())
{
mSuppressionMap.touch(iter);
suppressionMap_.touch(iter);
return std::make_pair(
std::ref(iter->second), false);
}
// See if any supressions need to be expired
expire(mSuppressionMap,
mHoldTime);
expire(suppressionMap_, holdTime_);
return std::make_pair(std::ref(
mSuppressionMap.emplace (
suppressionMap_.emplace (
key, Entry ()).first->second),
true);
}
void HashRouter::addSuppression (uint256 const& key)
{
std::lock_guard <std::mutex> lock (mMutex);
std::lock_guard <std::mutex> lock (mutex_);
emplace (key);
}
bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer)
{
std::lock_guard <std::mutex> lock (mMutex);
std::lock_guard <std::mutex> lock (mutex_);
auto result = emplace(key);
result.first.addPeer(peer);
@@ -63,7 +62,7 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer)
bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int& flags)
{
std::lock_guard <std::mutex> lock (mMutex);
std::lock_guard <std::mutex> lock (mutex_);
auto result = emplace(key);
auto& s = result.first;
@@ -74,7 +73,7 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int&
int HashRouter::getFlags (uint256 const& key)
{
std::lock_guard <std::mutex> lock (mMutex);
std::lock_guard <std::mutex> lock (mutex_);
return emplace(key).first.getFlags ();
}
@@ -83,7 +82,7 @@ bool HashRouter::setFlags (uint256 const& key, int flags)
{
assert (flags != 0);
std::lock_guard <std::mutex> lock (mMutex);
std::lock_guard <std::mutex> lock (mutex_);
auto& s = emplace(key).first;
@@ -94,19 +93,18 @@ bool HashRouter::setFlags (uint256 const& key, int flags)
return true;
}
bool HashRouter::swapSet (uint256 const& key, std::set<PeerShortID>& peers, int flag)
auto
HashRouter::shouldRelay (uint256 const& key)
-> boost::optional<std::set<PeerShortID>>
{
std::lock_guard <std::mutex> lock (mMutex);
std::lock_guard <std::mutex> lock (mutex_);
auto& s = emplace(key).first;
if ((s.getFlags () & flag) == flag)
return false;
if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_))
return boost::none;
s.swapSet (peers);
s.setFlags (flag);
return true;
return std::move(s.peekPeers());
}
} // ripple

View File

@@ -25,12 +25,12 @@
#include <ripple/basics/CountedObject.h>
#include <ripple/basics/UnorderedContainers.h>
#include <ripple/beast/container/aged_unordered_map.h>
#include <boost/optional.hpp>
namespace ripple {
// VFALCO NOTE Are these the flags?? Why aren't we using a packed struct?
// VFALCO TODO convert these macros to int constants
#define SF_RELAYED 0x01 // Has already been relayed to other nodes
// VFALCO NOTE How can both bad and good be set on a hash?
#define SF_BAD 0x02 // Temporarily bad
#define SF_SAVED 0x04
@@ -68,50 +68,48 @@ private:
{
}
std::set <PeerShortID> const& peekPeers () const
{
return peers_;
}
void addPeer (PeerShortID peer)
{
if (peer != 0)
peers_.insert (peer);
}
bool hasPeer (PeerShortID peer) const
{
return peers_.count (peer) > 0;
}
int getFlags (void) const
{
return flags_;
}
bool hasFlag (int mask) const
{
return (flags_ & mask) != 0;
}
void setFlags (int flagsToSet)
{
flags_ |= flagsToSet;
}
void clearFlag (int flagsToClear)
std::set <PeerShortID>& peekPeers()
{
flags_ &= ~flagsToClear;
return peers_;
}
void swapSet (std::set <PeerShortID>& other)
/** Determines if this item should be relayed.
Checks whether the item has been recently relayed.
If it has, return false. If it has not, update the
last relay timestamp and return true.
*/
bool shouldRelay (Stopwatch::time_point const& now,
std::chrono::seconds holdTime)
{
peers_.swap (other);
if (relayed_ && *relayed_ + holdTime > now)
return false;
relayed_.emplace(now);
return true;
}
private:
int flags_;
std::set <PeerShortID> peers_;
// This could be generalized to a map, if more
// than one flag needs to expire independently.
boost::optional<Stopwatch::time_point> relayed_;
};
public:
@@ -123,8 +121,8 @@ public:
}
HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds)
: mSuppressionMap(clock)
, mHoldTime (entryHoldTimeInSeconds)
: suppressionMap_(clock)
, holdTime_ (entryHoldTimeInSeconds)
{
}
@@ -149,19 +147,31 @@ public:
int getFlags (uint256 const& key);
bool swapSet (uint256 const& key, std::set<PeerShortID>& peers, int flag);
/** Determines whether the hashed item should be relayed.
Effects:
If the item should be relayed, this function will not
return `true` again until the hold time has expired.
The internal set of peers will also be reset.
@return A `boost::optional` set of peers which do not need to be
relayed to. If the result is uninitialized, the item should
_not_ be relayed.
*/
boost::optional<std::set<PeerShortID>> shouldRelay(uint256 const& key);
private:
// pair.second indicates whether the entry was created
std::pair<Entry&, bool> emplace (uint256 const&);
std::mutex mutable mMutex;
std::mutex mutable mutex_;
// Stores all suppressed hashes and their expiration time
beast::aged_unordered_map<uint256, Entry, Stopwatch::clock_type,
hardened_hash<strong_hash>> mSuppressionMap;
hardened_hash<strong_hash>> suppressionMap_;
std::chrono::seconds const mHoldTime;
std::chrono::seconds const holdTime_;
};
} // ripple

View File

@@ -1059,10 +1059,10 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
(e.failType != FailHard::yes) && e.local) ||
(e.result == terQUEUED))
{
std::set<Peer::id_t> peers;
auto const toSkip = app_.getHashRouter().shouldRelay(
e.transaction->getID());
if (app_.getHashRouter().swapSet (
e.transaction->getID(), peers, SF_RELAYED))
if (toSkip)
{
protocol::TMTransaction tx;
Serializer s;
@@ -1075,7 +1075,7 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
// FIXME: This should be when we received it
app_.overlay().foreach (send_if_not (
std::make_shared<Message> (tx, protocol::mtTRANSACTION),
peer_in_set(peers)));
peer_in_set(*toSkip)));
}
}
}

View File

@@ -308,7 +308,7 @@ TxQ::canBeHeld(STTx const& tx, OpenView const& view,
an early one fail or get dropped.
*/
canBeHeld = accountIter == byAccount_.end() ||
!replacementIter ||
replacementIter ||
accountIter->second.getTxnCount() <
setup_.maximumTxnPerAccount;
}
@@ -521,9 +521,10 @@ TxQ::apply(Application& app, OpenView& view,
*/
if (std::next(existingIter) != txQAcct.transactions.end())
{
// Only the last tx in the queue should have
// !consequences, and this can't be the last tx.
assert(existingIter->second.consequences);
// Normally, only the last tx in the queue will have
// !consequences, but an expired transaction can be
// replaced, and that replacement won't have it set,
// and that's ok.
if (!existingIter->second.consequences)
existingIter->second.consequences.emplace(
calculateConsequences(

View File

@@ -30,8 +30,9 @@ class HashRouter_test : public beast::unit_test::suite
void
testNonExpiration()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, std::chrono::seconds(2));
HashRouter router(stopwatch, 2s);
uint256 const key1(1);
uint256 const key2(2);
@@ -66,8 +67,9 @@ class HashRouter_test : public beast::unit_test::suite
void
testExpiration()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, std::chrono::seconds(2));
HashRouter router(stopwatch, 2s);
uint256 const key1(1);
uint256 const key2(2);
@@ -144,8 +146,9 @@ class HashRouter_test : public beast::unit_test::suite
void testSuppression()
{
// Normal HashRouter
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, std::chrono::seconds(2));
HashRouter router(stopwatch, 2s);
uint256 const key1(1);
uint256 const key2(2);
@@ -173,8 +176,9 @@ class HashRouter_test : public beast::unit_test::suite
void
testSetFlags()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, std::chrono::seconds(2));
HashRouter router(stopwatch, 2s);
uint256 const key1(1);
expect(router.setFlags(key1, 10));
@@ -183,33 +187,46 @@ class HashRouter_test : public beast::unit_test::suite
}
void
testSwapSet()
testRelay()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, std::chrono::seconds(2));
HashRouter router(stopwatch, 1s);
uint256 const key1(1);
std::set<HashRouter::PeerShortID> peers1;
std::set<HashRouter::PeerShortID> peers2;
boost::optional<std::set<HashRouter::PeerShortID>> peers;
peers1.emplace(1);
peers1.emplace(3);
peers1.emplace(5);
peers2.emplace(2);
peers2.emplace(4);
expect(router.swapSet(key1, peers1, 135));
expect(peers1.empty());
expect(router.getFlags(key1) == 135);
// No action, because flag matches
expect(!router.swapSet(key1, peers2, 135));
expect(peers2.size() == 2);
expect(router.getFlags(key1) == 135);
// Do a swap
expect(router.swapSet(key1, peers2, 24));
expect(peers2.size() == 3);
expect(router.getFlags(key1) == (135 | 24));
peers = router.shouldRelay(key1);
expect(peers && peers->empty());
router.addSuppressionPeer(key1, 1);
router.addSuppressionPeer(key1, 3);
router.addSuppressionPeer(key1, 5);
// No action, because relayed
expect(!router.shouldRelay(key1));
// Expire, but since the next search will
// be for this entry, it will get refreshed
// instead. However, the relay won't.
++stopwatch;
// Get those peers we added earlier
peers = router.shouldRelay(key1);
expect(peers && peers->size() == 3);
router.addSuppressionPeer(key1, 2);
router.addSuppressionPeer(key1, 4);
// No action, because relayed
expect(!router.shouldRelay(key1));
// Expire, but since the next search will
// be for this entry, it will get refreshed
// instead. However, the relay won't.
++stopwatch;
// Relay again
peers = router.shouldRelay(key1);
expect(peers && peers->size() == 2);
// Expire again
++stopwatch;
// Confirm that peers list is empty.
peers = router.shouldRelay(key1);
expect(peers && peers->size() == 0);
}
public:
@@ -221,7 +238,7 @@ public:
testExpiration();
testSuppression();
testSetFlags();
testSwapSet();
testRelay();
}
};

View File

@@ -831,6 +831,22 @@ public:
// Alice is broke
env.require(balance(alice, XRP(0)));
env(noop(alice), ter(terINSUF_FEE_B));
// Bob tries to queue up more than the single
// account limit (10) txs.
fillQueue(env, bob);
bobSeq = env.seq(bob);
checkMetrics(env, 0, 12, 7, 6, 256);
for (int i = 0; i < 10; ++i)
env(noop(bob), seq(bobSeq + i), queued);
checkMetrics(env, 10, 12, 7, 6, 256);
// Bob hit the single account limit
env(noop(bob), seq(bobSeq + 10), ter(terPRE_SEQ));
checkMetrics(env, 10, 12, 7, 6, 256);
// Bob can replace one of the earlier txs regardless
// of the limit
env(noop(bob), seq(bobSeq + 5), fee(20), queued);
checkMetrics(env, 10, 12, 7, 6, 256);
}
void testTieBreaking()
@@ -1565,6 +1581,93 @@ public:
}
}
void testExpirationReplacement()
{
/* This test is based on a reported regression where a
replacement candidate transaction found it's replacee
did not have `consequences` set
Hypothesis: The queue had '22 through '25. At some point(s),
both the original '22 and '23 expired and were removed from
the queue. A second '22 was submitted, and the multi-tx logic
did not kick in, because it matched the account's sequence
number (a_seq == t_seq). The third '22 was submitted and found
the '22 in the queue did not have consequences.
*/
using namespace jtx;
Env env(*this, makeConfig({ { "minimum_txn_in_ledger_standalone", "1" },
{"ledgers_in_queue", "10"}, {"maximum_txn_per_account", "20"} }),
features(featureFeeEscalation));
// Alice will recreate the scenario. Bob will block.
auto const alice = Account("alice");
auto const bob = Account("bob");
env.fund(XRP(500000), noripple(alice, bob));
checkMetrics(env, 0, boost::none, 2, 1, 256);
auto const aliceSeq = env.seq(alice);
expect(env.current()->info().seq == 3);
env(noop(alice), seq(aliceSeq), json(R"({"LastLedgerSequence":5})"), ter(terQUEUED));
env(noop(alice), seq(aliceSeq + 1), json(R"({"LastLedgerSequence":5})"), ter(terQUEUED));
env(noop(alice), seq(aliceSeq + 2), json(R"({"LastLedgerSequence":10})"), ter(terQUEUED));
env(noop(alice), seq(aliceSeq + 3), json(R"({"LastLedgerSequence":11})"), ter(terQUEUED));
checkMetrics(env, 4, boost::none, 2, 1, 256);
auto const bobSeq = env.seq(bob);
// Ledger 4 gets 3,
// Ledger 5 gets 4,
// Ledger 6 gets 5.
for (int i = 0; i < 3 + 4 + 5; ++i)
{
env(noop(bob), seq(bobSeq + i), fee(200), ter(terQUEUED));
}
checkMetrics(env, 4 + 3 + 4 + 5, boost::none, 2, 1, 256);
// Close ledger 3
env.close();
checkMetrics(env, 4 + 4 + 5, 20, 3, 2, 256);
// Close ledger 4
env.close();
checkMetrics(env, 4 + 5, 30, 4, 3, 256);
// Close ledger 5
env.close();
// Alice's first two txs expired.
checkMetrics(env, 2, 40, 5, 4, 256);
// Because aliceSeq is missing, aliceSeq + 1 fails
env(noop(alice), seq(aliceSeq + 1), ter(terPRE_SEQ));
// Queue up a new aliceSeq tx.
// This will only do some of the multiTx validation to
// improve the chances that the orphaned txs can be
// recovered. Because the cost of relaying the later txs
// has already been paid, this tx could potentially be a
// blocker.
env(fset(alice, asfAccountTxnID), seq(aliceSeq), ter(terQUEUED));
checkMetrics(env, 3, 40, 5, 4, 256);
// Even though consequences were not computed, we can replace it.
env(noop(alice), seq(aliceSeq), fee(20), ter(terQUEUED));
checkMetrics(env, 3, 40, 5, 4, 256);
// Queue up a new aliceSeq + 1 tx.
// This tx will also only do some of the multiTx validation.
env(fset(alice, asfAccountTxnID), seq(aliceSeq + 1), ter(terQUEUED));
checkMetrics(env, 4, 40, 5, 4, 256);
// Even though consequences were not computed, we can replace it,
// too.
env(noop(alice), seq(aliceSeq +1), fee(20), ter(terQUEUED));
checkMetrics(env, 4, 40, 5, 4, 256);
// Close ledger 6
env.close();
// We expect that all of alice's queued tx's got into
// the open ledger.
checkMetrics(env, 0, 50, 4, 5, 256);
expect(env.seq(alice) == aliceSeq + 4);
}
void run()
{
testQueue();
@@ -1583,6 +1686,7 @@ public:
testInFlightBalance();
testConsequences();
testRPC();
testExpirationReplacement();
}
};

View File

@@ -747,8 +747,7 @@ OverlayImpl::onManifests (
{
// Historical manifests are sent on initial peer connections.
// They do not need to be forwarded to other peers.
std::set<Peer::id_t> peers;
hashRouter.swapSet (hash, peers, SF_RELAYED);
hashRouter.shouldRelay (hash);
continue;
}
@@ -757,11 +756,11 @@ OverlayImpl::onManifests (
protocol::TMManifests o;
o.add_list ()->set_stobject (s);
std::set<Peer::id_t> peers;
hashRouter.swapSet (hash, peers, SF_RELAYED);
foreach (send_if_not (
std::make_shared<Message>(o, protocol::mtMANIFESTS),
peer_in_set (peers)));
auto const toSkip = hashRouter.shouldRelay (hash);
if(toSkip)
foreach (send_if_not (
std::make_shared<Message>(o, protocol::mtMANIFESTS),
peer_in_set (*toSkip)));
}
else
{
@@ -978,15 +977,14 @@ OverlayImpl::relay (protocol::TMProposeSet& m,
{
if (m.has_hops() && m.hops() >= maxTTL)
return;
std::set<Peer::id_t> skip;
if (! app_.getHashRouter().swapSet (
uid, skip, SF_RELAYED))
auto const toSkip = app_.getHashRouter().shouldRelay(uid);
if (!toSkip)
return;
auto const sm = std::make_shared<Message>(
m, protocol::mtPROPOSE_LEDGER);
for_each([&](std::shared_ptr<PeerImp>&& p)
{
if (skip.find(p->id()) != skip.end())
if (toSkip->find(p->id()) != toSkip->end())
return;
if (! m.has_hops() || p->hopsAware())
p->send(sm);
@@ -999,15 +997,14 @@ OverlayImpl::relay (protocol::TMValidation& m,
{
if (m.has_hops() && m.hops() >= maxTTL)
return;
std::set<Peer::id_t> skip;
if (! app_.getHashRouter().swapSet (
uid, skip, SF_RELAYED))
auto const toSkip = app_.getHashRouter().shouldRelay(uid);
if (! toSkip)
return;
auto const sm = std::make_shared<Message>(
m, protocol::mtVALIDATION);
for_each([&](std::shared_ptr<PeerImp>&& p)
{
if (skip.find(p->id()) != skip.end())
if (toSkip->find(p->id()) != toSkip->end())
return;
if (! m.has_hops() || p->hopsAware())
p->send(sm);