Speed up out of order transaction processing (RIPD-239):

* After successfully applying a transaction to the open ledger, resubmit any held transactions from the same account.
* All held transactions will continue to be retried after consensus round.
This commit is contained in:
Edward Hennis
2015-12-02 19:25:06 -05:00
committed by Nik Bougalis
parent 4d2e7ed404
commit 8be67c1766
5 changed files with 59 additions and 9 deletions

View File

@@ -130,6 +130,16 @@ public:
*/
void applyHeldTransactions ();
/** Get all the transactions held for a particular account.
This is normally called when a transaction for that
account is successfully applied to the open
ledger so those transactions can be resubmitted without
waiting for ledger close.
*/
std::vector<std::shared_ptr<STTx const>>
pruneHeldTransactions(AccountID const& account,
std::uint32_t const seq);
/** Get a ledger's hash by sequence number using the cache
*/
uint256 getHashBySeq (std::uint32_t index);
@@ -251,7 +261,7 @@ private:
Application& app_;
beast::Journal m_journal;
std::recursive_mutex m_mutex;
std::recursive_mutex mutable m_mutex;
// The ledger that most recently closed.
LedgerHolder mClosedLedger;

View File

@@ -324,6 +324,15 @@ LedgerMaster::applyHeldTransactions ()
app_.openLedger().current()->info().parentHash);
}
std::vector<std::shared_ptr<STTx const>>
LedgerMaster::pruneHeldTransactions(AccountID const& account,
std::uint32_t const seq)
{
ScopedLockType sl(m_mutex);
return mHeldTransactions.prune(account, seq);
}
LedgerIndex
LedgerMaster::getBuildingLedger ()
{
@@ -1384,13 +1393,6 @@ LedgerMaster::walkHashBySeq (std::uint32_t index)
return ledgerHash;
}
/** Walk the chain of ledger hashes to determine the hash of the
ledger with the specified index. The referenceLedger is used as
the base of the chain and should be fully validated and must not
precede the target index. This function may throw if nodes
from the reference ledger or any prior ledger are not present
in the node store.
*/
boost::optional<LedgerHash>
LedgerMaster::walkHashBySeq (std::uint32_t index, Ledger::ref referenceLedger)
{

View File

@@ -19,6 +19,7 @@
#include <BeastConfig.h>
#include <ripple/app/misc/CanonicalTXSet.h>
#include <boost/range/adaptor/transformed.hpp>
namespace ripple {
@@ -96,6 +97,33 @@ void CanonicalTXSet::insert (std::shared_ptr<STTx const> const& txn)
txn));
}
std::vector<std::shared_ptr<STTx const>>
CanonicalTXSet::prune(AccountID const& account,
std::uint32_t const seq)
{
auto effectiveAccount = accountKey (account);
Key keyLow(effectiveAccount, seq, zero);
Key keyHigh(effectiveAccount, seq+1, zero);
auto range = boost::make_iterator_range(
mMap.lower_bound(keyLow),
mMap.lower_bound(keyHigh));
auto txRange = boost::adaptors::transform(
range,
[](auto const& p)
{
return p.second;
});
std::vector<std::shared_ptr<STTx const>> result(
txRange.begin(), txRange.end());
mMap.erase(range.begin(), range.end());
return result;
}
CanonicalTXSet::iterator CanonicalTXSet::erase (iterator const& it)
{
iterator tmp = it;

View File

@@ -86,6 +86,9 @@ public:
void insert (std::shared_ptr<STTx const> const& txn);
std::vector<std::shared_ptr<STTx const>>
prune(AccountID const& account, std::uint32_t const seq);
// VFALCO TODO remove this function
void reset (LedgerHash const& saltHash)
{

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include <BeastConfig.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/protocol/Quality.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/app/main/Application.h>
@@ -35,7 +36,6 @@
#include <ripple/app/ledger/TransactionMaster.h>
#include <ripple/app/main/LoadManager.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/Transaction.h>
#include <ripple/app/misc/TxQ.h>
#include <ripple/app/misc/Validations.h>
@@ -979,6 +979,13 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
{
m_journal.debug << "Transaction is now included in open ledger";
e.transaction->setStatus (INCLUDED);
auto txCur = e.transaction->getSTransaction();
for (auto const& tx : m_ledgerMaster.pruneHeldTransactions(
txCur->getAccountID(sfAccount), txCur->getSequence() + 1))
{
submitTransaction(tx);
}
}
else if (e.result == tefPAST_SEQ)
{