Batched transaction application:

Applying multiple transactions to the open ledger
reduces SHAMap modification overhead.
This commit is contained in:
Mark Travis
2015-05-29 00:09:44 -07:00
committed by Nik Bougalis
parent 4225b78bf5
commit ca800f9e8d
11 changed files with 370 additions and 172 deletions

View File

@@ -58,8 +58,7 @@ void ConsensusTransSetSF::gotNode (bool fromFilter, const SHAMapNodeID& id, uint
getApp().getJobQueue ().addJob (
jtTRANSACTION, "TXS->TXN",
std::bind (&NetworkOPs::submitTransaction, &getApp().getOPs (),
std::placeholders::_1, stx,
NetworkOPs::stCallback ()));
std::placeholders::_1, stx));
}
catch (...)
{

View File

@@ -21,6 +21,7 @@
#define RIPPLE_APP_LEDGER_LEDGERMASTER_H_INCLUDED
#include <ripple/app/ledger/LedgerEntrySet.h>
#include <ripple/app/ledger/LedgerHolder.h>
#include <ripple/basics/StringUtilities.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <ripple/core/Config.h>
@@ -62,6 +63,9 @@ public:
// The current ledger is the ledger we believe new transactions should go in
virtual Ledger::pointer getCurrentLedger () = 0;
// The holder for the current ledger
virtual LedgerHolder& getCurrentLedgerHolder() = 0;
// The finalized ledger is the last closed/accepted ledger
virtual Ledger::pointer getClosedLedger () = 0;
@@ -75,10 +79,6 @@ public:
virtual int getValidatedLedgerAge () = 0;
virtual bool isCaughtUp(std::string& reason) = 0;
virtual TER doTransaction (
STTx::ref txn,
TransactionEngineParams params, bool& didApply) = 0;
virtual int getMinValidations () = 0;
virtual void setMinValidations (int v) = 0;
@@ -148,7 +148,7 @@ public:
virtual beast::PropertyStream::Source& getPropertySource () = 0;
static bool shouldAcquire (std::uint32_t currentLedgerID,
static bool shouldAcquire (std::uint32_t currentLedgerID,
std::uint32_t ledgerHistory, std::uint32_t ledgerHistoryIndex,
std::uint32_t targetLedger);

View File

@@ -21,7 +21,6 @@
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/LedgerHistory.h>
#include <ripple/app/ledger/LedgerHolder.h>
#include <ripple/app/ledger/OrderBookDB.h>
#include <ripple/app/ledger/PendingSaves.h>
#include <ripple/app/ledger/impl/LedgerCleaner.h>
@@ -398,27 +397,6 @@ public:
mBuildingLedgerSeq.store (i);
}
TER doTransaction (STTx::ref txn, TransactionEngineParams params, bool& didApply)
{
Ledger::pointer ledger;
TransactionEngine engine;
TER result;
didApply = false;
{
ScopedLockType sl (m_mutex);
ledger = mCurrentLedger.getMutable ();
engine.setLedger (ledger);
std::tie(result, didApply) = engine.applyTransaction (*txn, params);
}
if (didApply)
{
mCurrentLedger.set (ledger);
getApp().getOPs ().pubProposedTransaction (ledger, txn, result);
}
return result;
}
bool haveLedgerRange (std::uint32_t from, std::uint32_t to)
{
ScopedLockType sl (mCompleteLock);
@@ -1371,6 +1349,11 @@ public:
return mCurrentLedger.get ();
}
LedgerHolder& getCurrentLedgerHolder() override
{
return mCurrentLedger;
}
// The finalized ledger is the last closed/accepted ledger
Ledger::pointer getClosedLedger ()
{

View File

@@ -65,6 +65,7 @@
#include <beast/utility/make_lock.h>
#include <boost/optional.hpp>
#include <tuple>
#include <condition_variable>
namespace ripple {
@@ -72,12 +73,39 @@ class NetworkOPsImp final
: public NetworkOPs
, public beast::DeadlineTimer::Listener
{
public:
enum Fault
/**
* Transaction with input flags and results to be applied in batches.
*/
class TransactionStatus
{
// Exceptions these functions can throw.
IO_ERROR = 1,
NO_NETWORK = 2,
public:
Transaction::pointer transaction;
bool admin;
bool local;
FailHard failType;
bool applied;
TER result;
TransactionStatus (
Transaction::pointer t,
bool a,
bool l,
FailHard f)
: transaction (t)
, admin (a)
, local (l)
, failType (f)
{}
};
/**
* Synchronization states for transaction batches.
*/
enum class DispatchState : unsigned char
{
none,
scheduled,
running,
};
public:
@@ -207,35 +235,60 @@ public:
//
// Must complete immediately.
using stCallback = std::function<void (Transaction::pointer, TER)>;
void submitTransaction (
Job&, STTx::pointer,
stCallback callback = stCallback ()) override;
void submitTransaction (Job&, STTx::pointer) override;
Transaction::pointer processTransactionCb (
Transaction::pointer,
bool bAdmin, bool bLocal, FailHard failType, stCallback) override;
Transaction::pointer processTransaction (
void processTransaction (
Transaction::pointer transaction,
bool bAdmin, bool bLocal, FailHard failType) override
{
return processTransactionCb (
transaction, bAdmin, bLocal, failType, stCallback ());
}
bool bAdmin, bool bLocal, FailHard failType) override;
// VFALCO Workaround for MSVC std::function which doesn't swallow return
// types.
//
private:
void processTransactionCbVoid (
Transaction::pointer p,
bool bAdmin, bool bLocal, FailHard failType, stCallback cb)
{
processTransactionCb (p, bAdmin, bLocal, failType, cb);
}
/**
* For transactions submitted directly by a client, apply batch of
* transactions and wait for this transaction to complete.
*
* @param transaction Transaction object.
* @param bAdmin Whether an administrative client connection submitted it.
* @param failType fail_hard setting from transaction submission.
*/
void doTransactionSync (Transaction::pointer transaction,
bool bAdmin, FailHard failType);
/**
* For transactions not submitted by a locally connected client, fire and
* forget. Add to batch and trigger it to be processed if there's no batch
* currently being applied.
*
* @param transaction Transaction object
* @param bAdmin Whether an administrative client connection submitted it.
* @param failType fail_hard setting from transaction submission.
*/
void doTransactionAsync (Transaction::pointer transaction,
bool bAdmin, FailHard failtype);
/**
* Apply transactions in batches. Continue until none are queued.
*/
void transactionBatch();
/**
* Attempt to apply transactions and post-process based on the results.
*
* @param Lock that protects the transaction batching
*/
void apply (std::unique_lock<std::mutex>& lock);
/**
* Apply each transaction to open ledger.
*
* @param ledger Open ledger.
* @param engine Engine that applies transactions to open ledger.
* @param transactions Batch of transactions to apply.
* @return Whether any transactions in batch succeeded.
*/
bool batchApply (
Ledger::pointer& ledger,
TransactionEngine& engine,
std::vector<TransactionStatus>& transactions);
public:
Transaction::pointer findTransactionByID (
uint256 const& transactionID) override;
@@ -594,6 +647,12 @@ private:
// The number of nodes that we need to consider ourselves connected.
std::size_t const m_network_quorum;
// Transaction batching.
std::condition_variable mCond;
std::mutex mMutex;
DispatchState mDispatchState = DispatchState::none;
std::vector <TransactionStatus> mTransactions;
};
//------------------------------------------------------------------------------
@@ -865,8 +924,7 @@ bool NetworkOPsImp::isValidated (std::uint32_t seq)
seq <= m_ledgerMaster.getValidatedLedger ()->getLedgerSeq ();
}
void NetworkOPsImp::submitTransaction (
Job&, STTx::pointer iTrans, stCallback callback)
void NetworkOPsImp::submitTransaction (Job&, STTx::pointer iTrans)
{
if (isNeedNetworkLedger ())
{
@@ -923,28 +981,26 @@ void NetworkOPsImp::submitTransaction (
}
m_job_queue.addJob (jtTRANSACTION, "submitTxn",
std::bind (&NetworkOPsImp::processTransactionCbVoid,
std::bind (&NetworkOPsImp::processTransaction,
this,
std::make_shared<Transaction> (trans, Validate::NO, reason),
false,
false,
FailHard::no,
callback));
FailHard::no));
}
Transaction::pointer NetworkOPsImp::processTransactionCb (
Transaction::pointer trans,
bool bAdmin, bool bLocal, FailHard failType, stCallback callback)
void NetworkOPsImp::processTransaction (Transaction::pointer transaction,
bool bAdmin, bool bLocal, FailHard failType)
{
auto ev = m_job_queue.getLoadEventAP (jtTXN_PROC, "ProcessTXN");
int newFlags = getApp().getHashRouter ().getFlags (trans->getID ());
int newFlags = getApp().getHashRouter ().getFlags (transaction->getID ());
if ((newFlags & SF_BAD) != 0)
{
// cached bad
trans->setStatus (INVALID);
trans->setResult (temBAD_SIGNATURE);
return trans;
transaction->setStatus (INVALID);
transaction->setResult (temBAD_SIGNATURE);
return;
}
if ((newFlags & SF_SIGGOOD) == 0)
@@ -952,112 +1008,236 @@ Transaction::pointer NetworkOPsImp::processTransactionCb (
// signature not checked
std::string reason;
if (! trans->checkSign (reason))
if (! transaction->checkSign (reason))
{
m_journal.info << "Transaction has bad signature: " << reason;
trans->setStatus (INVALID);
trans->setResult (temBAD_SIGNATURE);
getApp().getHashRouter ().setFlag (trans->getID (), SF_BAD);
return trans;
transaction->setStatus (INVALID);
transaction->setResult (temBAD_SIGNATURE);
getApp().getHashRouter ().setFlag (transaction->getID (), SF_BAD);
return;
}
getApp().getHashRouter ().setFlag (trans->getID (), SF_SIGGOOD);
}
getApp().getHashRouter ().setFlag (transaction->getID (), SF_SIGGOOD);
if (bLocal)
doTransactionSync (transaction, bAdmin, failType);
else
doTransactionAsync (transaction, bAdmin, failType);
}
void NetworkOPsImp::doTransactionAsync (Transaction::pointer transaction,
bool bAdmin, FailHard failType)
{
std::lock_guard<std::mutex> lock (mMutex);
if (transaction->getApplying())
return;
mTransactions.push_back (TransactionStatus (transaction, bAdmin, false,
failType));
transaction->setApplying();
if (mDispatchState == DispatchState::none)
{
m_job_queue.addJob (jtBATCH, "transactionBatch",
std::bind (&NetworkOPsImp::transactionBatch, this));
mDispatchState = DispatchState::scheduled;
}
}
void NetworkOPsImp::doTransactionSync (Transaction::pointer transaction,
bool bAdmin, FailHard failType)
{
std::unique_lock<std::mutex> lock (mMutex);
if (! transaction->getApplying())
{
mTransactions.push_back (TransactionStatus (transaction, bAdmin, true,
failType));
transaction->setApplying();
}
do
{
if (mDispatchState == DispatchState::running)
{
// A batch processing job is already running, so wait.
mCond.wait (lock);
}
else
{
apply (lock);
if (mTransactions.size())
{
// More transactions need to be applied, but by another job.
m_job_queue.addJob (jtBATCH, "transactionBatch",
std::bind (&NetworkOPsImp::transactionBatch, this));
mDispatchState = DispatchState::scheduled;
}
}
}
while (transaction->getApplying());
}
void NetworkOPsImp::transactionBatch()
{
std::unique_lock<std::mutex> lock (mMutex);
if (mDispatchState == DispatchState::running)
return;
while (mTransactions.size())
{
apply (lock);
}
}
void NetworkOPsImp::apply (std::unique_lock<std::mutex>& lock)
{
std::vector<TransactionStatus> transactions;
mTransactions.swap (transactions);
assert (! transactions.empty());
assert (mDispatchState != DispatchState::running);
mDispatchState = DispatchState::running;
lock.unlock();
Ledger::pointer ledger;
TransactionEngine engine;
{
auto lock = beast::make_lock(getApp().getMasterMutex());
bool didApply;
TER r = m_ledgerMaster.doTransaction (
trans->getSTransaction (),
bAdmin ? (tapOPEN_LEDGER | tapNO_CHECK_SIGN | tapADMIN)
: (tapOPEN_LEDGER | tapNO_CHECK_SIGN), didApply);
trans->setResult (r);
if (isTemMalformed (r)) // malformed, cache bad
getApp().getHashRouter ().setFlag (trans->getID (), SF_BAD);
#ifdef BEAST_DEBUG
if (r != tesSUCCESS)
if (batchApply (ledger, engine, transactions))
{
std::string token, human;
if (transResultInfo (r, token, human))
m_journal.info << "TransactionResult: "
<< token << ": " << human;
ledger->setImmutable();
m_ledgerMaster.getCurrentLedgerHolder().set (ledger);
}
#endif
if (callback)
callback (trans, r);
if (r == tefFAILURE)
throw Fault (IO_ERROR);
bool addLocal = bLocal;
if (r == tesSUCCESS)
for (TransactionStatus& e : transactions)
{
m_journal.debug << "Transaction is now included in open ledger";
trans->setStatus (INCLUDED);
if (e.applied)
{
pubProposedTransaction (ledger,
e.transaction->getSTransaction(), e.result);
}
// VFALCO NOTE The value of trans can be changed here!
getApp().getMasterTransaction ().canonicalize (&trans);
}
else if (r == tefPAST_SEQ)
{
// duplicate or conflict
m_journal.info << "Transaction is obsolete";
trans->setStatus (OBSOLETE);
}
else if (isTerRetry (r))
{
if (failType == FailHard::yes)
addLocal = false;
e.transaction->setResult (e.result);
if (isTemMalformed (e.result))
getApp().getHashRouter().setFlag (e.transaction->getID(), SF_BAD);
#ifdef BEAST_DEBUG
if (e.result != tesSUCCESS)
{
std::string token, human;
if (transResultInfo (e.result, token, human))
m_journal.info << "TransactionResult: "
<< token << ": " << human;
}
#endif
bool addLocal = e.local;
if (e.result == tesSUCCESS)
{
m_journal.debug << "Transaction is now included in open ledger";
e.transaction->setStatus (INCLUDED);
// VFALCO NOTE The value of trans can be changed here!
getApp().getMasterTransaction ().canonicalize (&e.transaction);
}
else if (e.result == tefPAST_SEQ)
{
// duplicate or conflict
m_journal.info << "Transaction is obsolete";
e.transaction->setStatus (OBSOLETE);
}
else if (isTerRetry (e.result))
{
if (e.failType == FailHard::yes)
{
addLocal = false;
}
else
{
// transaction should be held
m_journal.debug << "Transaction should be held: " << e.result;
e.transaction->setStatus (HELD);
getApp().getMasterTransaction().canonicalize (&e.transaction);
m_ledgerMaster.addHeldTransaction (e.transaction);
}
}
else
{
// transaction should be held
m_journal.debug << "Transaction should be held: " << r;
trans->setStatus (HELD);
getApp().getMasterTransaction ().canonicalize (&trans);
m_ledgerMaster.addHeldTransaction (trans);
m_journal.debug << "Status other than success " << e.result;
e.transaction->setStatus (INVALID);
}
}
else
{
m_journal.debug << "Status other than success " << r;
trans->setStatus (INVALID);
}
if (addLocal)
{
addLocalTx (m_ledgerMaster.getCurrentLedger (),
trans->getSTransaction ());
}
if (didApply ||
((mMode != omFULL) && (failType != FailHard::yes) && bLocal))
{
std::set<Peer::id_t> peers;
if (getApp().getHashRouter ().swapSet (
trans->getID (), peers, SF_RELAYED))
if (addLocal)
{
protocol::TMTransaction tx;
Serializer s;
trans->getSTransaction ()->add (s);
tx.set_rawtransaction (&s.getData ().front (), s.getLength ());
tx.set_status (protocol::tsCURRENT);
tx.set_receivetimestamp (getNetworkTimeNC ());
// FIXME: This should be when we received it
getApp ().overlay ().foreach (send_if_not (
std::make_shared<Message> (tx, protocol::mtTRANSACTION),
peer_in_set(peers)));
addLocalTx (m_ledgerMaster.getCurrentLedger(),
e.transaction->getSTransaction());
}
if (e.applied ||
((mMode != omFULL) && (e.failType != FailHard::yes) && e.local))
{
std::set<Peer::id_t> peers;
if (getApp().getHashRouter().swapSet (
e.transaction->getID(), peers, SF_RELAYED))
{
protocol::TMTransaction tx;
Serializer s;
e.transaction->getSTransaction()->add (s);
tx.set_rawtransaction (&s.getData().front(), s.getLength());
tx.set_status (protocol::tsCURRENT);
tx.set_receivetimestamp (getApp().getOPs().getNetworkTimeNC());
// FIXME: This should be when we received it
getApp().overlay().foreach (send_if_not (
std::make_shared<Message> (tx, protocol::mtTRANSACTION),
peer_in_set(peers)));
}
}
}
}
return trans;
lock.lock();
for (TransactionStatus& e : transactions)
e.transaction->clearApplying();
mCond.notify_all();
mDispatchState = DispatchState::none;
}
bool NetworkOPsImp::batchApply (Ledger::pointer& ledger,
TransactionEngine& engine,
std::vector<TransactionStatus>& transactions)
{
bool applied = false;
std::lock_guard <std::recursive_mutex> lock (m_ledgerMaster.peekMutex());
ledger = m_ledgerMaster.getCurrentLedgerHolder().getMutable();
engine.setLedger (ledger);
for (TransactionStatus& e : transactions)
{
std::tie (e.result, e.applied) = engine.applyTransaction (
*e.transaction->getSTransaction(),
e.admin ? (tapOPEN_LEDGER | tapNO_CHECK_SIGN | tapADMIN) : (
tapOPEN_LEDGER | tapNO_CHECK_SIGN));
applied |= e.applied;
}
return applied;
}
Transaction::pointer NetworkOPsImp::findTransactionByID (

View File

@@ -157,11 +157,18 @@ public:
// must complete immediately
// VFALCO TODO Make this a TxCallback structure
using stCallback = std::function<void (Transaction::pointer, TER)>;
virtual void submitTransaction (Job&, STTx::pointer,
stCallback callback = stCallback ()) = 0;
virtual Transaction::pointer processTransactionCb (Transaction::pointer,
bool bAdmin, bool bLocal, FailHard failType, stCallback) = 0;
virtual Transaction::pointer processTransaction (Transaction::pointer transaction,
virtual void submitTransaction (Job&, STTx::pointer) = 0;
/**
* Process transactions as they arrive from the network or which are
* submitted by clients. Process local transactions synchronously
*
* @param transaction Transaction object
* @param bAdmin Whether an administrative client connection submitted it.
* @param bLocal Client submission.
* @param failType fail_hard setting from transaction submission.
*/
virtual void processTransaction (Transaction::pointer transaction,
bool bAdmin, bool bLocal, FailHard failType) = 0;
virtual Transaction::pointer findTransactionByID (uint256 const& transactionID) = 0;
virtual int findTransactionsByDestination (std::list<Transaction::pointer>&,

View File

@@ -79,7 +79,7 @@ public:
static
TransStatus
sqlTransactionStatus(boost::optional<std::string> const& status);
bool checkSign (std::string&) const;
STTx::ref getSTransaction ()
@@ -124,6 +124,32 @@ public:
mInLedger = ledger;
}
/**
* Set this flag once added to a batch.
*/
void setApplying()
{
mApplying = true;
}
/**
* Detect if transaction is being batched.
*
* @return Whether transaction is being applied within a batch.
*/
bool getApplying()
{
return mApplying;
}
/**
* Indicate that transaction application has been attempted.
*/
void clearApplying()
{
mApplying = false;
}
Json::Value getJson (int options, bool binary = false) const;
static Transaction::pointer load (uint256 const& id);
@@ -134,9 +160,10 @@ private:
RippleAddress mFromPubKey; // Sign transaction with this. mSignPubKey
RippleAddress mSourcePrivate; // Sign transaction with this.
LedgerIndex mInLedger;
TransStatus mStatus;
TER mResult;
LedgerIndex mInLedger = 0;
TransStatus mStatus = INVALID;
TER mResult = temUNCERTAIN;
bool mApplying = false;
STTx::pointer mTransaction;
};

View File

@@ -30,10 +30,7 @@ namespace ripple {
Transaction::Transaction (STTx::ref sit, Validate validate, std::string& reason)
noexcept
: mInLedger (0),
mStatus (INVALID),
mResult (temUNCERTAIN),
mTransaction (sit)
: mTransaction (sit)
{
try
{

View File

@@ -48,6 +48,7 @@ enum JobType
jtRPC, // A websocket command from the client
jtUPDATE_PF, // Update pathfinding requests
jtTRANSACTION, // A transaction received from the network
jtBATCH, // Apply batched transactions
jtUNL, // A Score or Fetch of the UNL (DEPRECATED)
jtADVANCE, // Advance validated/acquired ledgers
jtPUBLEDGER, // Publish a fully-accepted ledger

View File

@@ -82,6 +82,10 @@ public:
add (jtTRANSACTION, "transaction",
maxLimit, true, false, 250, 1000);
// Apply batched transactions
add (jtBATCH, "batch",
maxLimit, true, false, 250, 1000);
// A Score or Fetch of the UNL (DEPRECATED)
add (jtUNL, "unl",
1, true, false, 0, 0);

View File

@@ -132,16 +132,16 @@ std::uint32_t TxnSignApiFacade::getSeq () const
return accountState_->sle().getFieldU32(sfSequence);
}
Transaction::pointer TxnSignApiFacade::processTransaction (
void TxnSignApiFacade::processTransaction (
Transaction::ref tpTrans,
bool bAdmin,
bool bLocal,
NetworkOPs::FailHard failType)
{
if (!netOPs_) // Unit testing.
return tpTrans;
return;
return netOPs_->processTransaction (tpTrans, bAdmin, bLocal, failType);
netOPs_->processTransaction (tpTrans, bAdmin, bLocal, failType);
}
bool TxnSignApiFacade::findPathsForOneIssuer (

View File

@@ -74,7 +74,7 @@ public:
STPathSet& pathsOut,
STPath& fullLiquidityPath) const;
Transaction::pointer processTransaction (
void processTransaction (
Transaction::ref tpTrans,
bool bAdmin,
bool bLocal,