Apply transaction batches in periodic intervals (#4504)

Add new transaction submission API field, "sync", which
determines behavior of the server while submitting transactions:
1) sync (default): Process transactions in a batch immediately,
   and return only once the transaction has been processed.
2) async: Put transaction into the batch for the next processing
   interval and return immediately.
3) wait: Put transaction into the batch for the next processing
   interval and return only after it is processed.
This commit is contained in:
Mark Travis
2023-08-18 21:58:13 -04:00
committed by Manoj Doshi
parent 21c4aaf993
commit b580049ec0
24 changed files with 398 additions and 163 deletions

View File

@@ -199,6 +199,7 @@ install (
src/ripple/basics/StringUtilities.h
src/ripple/basics/TaggedCache.h
src/ripple/basics/tagged_integer.h
src/ripple/basics/SubmitSync.h
src/ripple/basics/ThreadSafetyAnalysis.h
src/ripple/basics/ToString.h
src/ripple/basics/UnorderedContainers.h

View File

@@ -478,7 +478,7 @@
#
# Configure the maximum number of transactions to have in the job queue
#
# Must be a number between 100 and 1000, defaults to 250
# Must be a number between 1000 and 100000, defaults to 10000
#
#
# [overlay]

View File

@@ -467,7 +467,7 @@
#
# Configure the maximum number of transactions to have in the job queue
#
# Must be a number between 100 and 1000, defaults to 250
# Must be a number between 1000 and 100000, defaults to 10000
#
#
# [overlay]

View File

@@ -549,22 +549,25 @@ void
LedgerMaster::applyHeldTransactions()
{
std::lock_guard sl(m_mutex);
// It can be expensive to modify the open ledger even with no transactions
// to process. Regardless, make sure to reset held transactions with
// the parent.
if (mHeldTransactions.size())
{
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
bool any = false;
for (auto const& it : mHeldTransactions)
{
ApplyFlags flags = tapNONE;
auto const result =
app_.getTxQ().apply(app_, view, it.second, flags, j);
if (result.second)
any = true;
}
return any;
});
}
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
bool any = false;
for (auto const& it : mHeldTransactions)
{
ApplyFlags flags = tapNONE;
auto const result =
app_.getTxQ().apply(app_, view, it.second, flags, j);
if (result.second)
any = true;
}
return any;
});
// VFALCO TODO recreate the CanonicalTxSet object instead of resetting
// it.
// VFALCO NOTE The hash for an open ledger is undefined so we use
// something that is a reasonable substitute.
mHeldTransactions.reset(app_.openLedger().current()->info().parentHash);

View File

@@ -1527,6 +1527,7 @@ ApplicationImp::start(bool withTimers)
{
setSweepTimer();
setEntropyTimer();
m_networkOPs->setBatchApplyTimer();
}
m_io_latency_sampler.start();

View File

@@ -42,6 +42,7 @@
#include <ripple/app/reporting/ReportingETL.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/PerfLog.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/basics/UptimeClock.h>
#include <ripple/basics/mulDiv.h>
#include <ripple/basics/safe_cast.h>
@@ -233,6 +234,7 @@ public:
, heartbeatTimer_(io_svc)
, clusterTimer_(io_svc)
, accountHistoryTxTimer_(io_svc)
, batchApplyTimer_(io_svc)
, mConsensus(
app,
make_FeeVote(
@@ -282,43 +284,12 @@ public:
processTransaction(
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
RPC::SubmitSync sync,
bool bLocal,
FailHard failType) override;
/**
* For transactions submitted directly by a client, apply batch of
* transactions and wait for this transaction to complete.
*
* @param transaction Transaction object.
* @param bUnliimited Whether a privileged client connection submitted it.
* @param failType fail_hard setting from transaction submission.
*/
void
doTransactionSync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType);
/**
* For transactions not submitted by a locally connected client, fire and
* forget. Add to batch and trigger it to be processed if there's no batch
* currently being applied.
*
* @param transaction Transaction object
* @param bUnlimited Whether a privileged client connection submitted it.
* @param failType fail_hard setting from transaction submission.
*/
void
doTransactionAsync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failtype);
/**
* Apply transactions in batches. Continue until none are queued.
*/
void
transactionBatch();
bool
transactionBatch(bool drain) override;
/**
* Attempt to apply transactions and post-process based on the results.
@@ -592,6 +563,15 @@ public:
<< "NetworkOPs: accountHistoryTxTimer cancel error: "
<< ec.message();
}
ec.clear();
batchApplyTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: batchApplyTimer cancel error: "
<< ec.message();
}
}
// Make sure that any waitHandlers pending in our timers are done.
using namespace std::chrono_literals;
@@ -710,6 +690,9 @@ private:
void
setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo);
void
setBatchApplyTimer() override;
Application& app_;
beast::Journal m_journal;
@@ -728,6 +711,8 @@ private:
boost::asio::steady_timer heartbeatTimer_;
boost::asio::steady_timer clusterTimer_;
boost::asio::steady_timer accountHistoryTxTimer_;
//! This timer is for applying transaction batches.
boost::asio::steady_timer batchApplyTimer_;
RCLConsensus mConsensus;
@@ -1002,6 +987,42 @@ NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
[this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
}
void
NetworkOPsImp::setBatchApplyTimer()
{
using namespace std::chrono_literals;
// 100ms lag between batch intervals provides significant throughput gains
// with little increased latency. Tuning this figure further will
// require further testing. In general, increasing this figure will
// also increase theoretical throughput, but with diminishing returns.
auto constexpr batchInterval = 100ms;
setTimer(
batchApplyTimer_,
batchInterval,
[this]() {
{
std::lock_guard lock(mMutex);
// Only do the job if there's work to do and it's not currently
// being done.
if (mTransactions.size() &&
mDispatchState == DispatchState::none)
{
if (m_job_queue.addJob(
jtBATCH, "transactionBatch", [this]() {
transactionBatch(false);
}))
{
mDispatchState = DispatchState::scheduled;
}
return;
}
}
setBatchApplyTimer();
},
[this]() { setBatchApplyTimer(); });
}
void
NetworkOPsImp::processHeartbeatTimer()
{
@@ -1178,7 +1199,8 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
auto t = tx;
processTransaction(t, false, false, FailHard::no);
processTransaction(
t, false, RPC::SubmitSync::async, false, FailHard::no);
});
}
@@ -1186,6 +1208,7 @@ void
NetworkOPsImp::processTransaction(
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
RPC::SubmitSync sync,
bool bLocal,
FailHard failType)
{
@@ -1215,7 +1238,7 @@ NetworkOPsImp::processTransaction(
// Not concerned with local checks at this point.
if (validity == Validity::SigBad)
{
JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
JLOG(m_journal.trace()) << "Transaction has bad signature: " << reason;
transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE);
app_.getHashRouter().setFlags(transaction->getID(), SF_BAD);
@@ -1225,100 +1248,72 @@ NetworkOPsImp::processTransaction(
// canonicalize can change our pointer
app_.getMasterTransaction().canonicalize(&transaction);
if (bLocal)
doTransactionSync(transaction, bUnlimited, failType);
else
doTransactionAsync(transaction, bUnlimited, failType);
}
void
NetworkOPsImp::doTransactionAsync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType)
{
std::lock_guard lock(mMutex);
if (transaction->getApplying())
return;
mTransactions.push_back(
TransactionStatus(transaction, bUnlimited, false, failType));
transaction->setApplying();
if (mDispatchState == DispatchState::none)
{
if (m_job_queue.addJob(
jtBATCH, "transactionBatch", [this]() { transactionBatch(); }))
{
mDispatchState = DispatchState::scheduled;
}
}
}
void
NetworkOPsImp::doTransactionSync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType)
{
std::unique_lock<std::mutex> lock(mMutex);
std::unique_lock lock(mMutex);
if (!transaction->getApplying())
{
mTransactions.push_back(
TransactionStatus(transaction, bUnlimited, true, failType));
transaction->setApplying();
mTransactions.push_back(
TransactionStatus(transaction, bUnlimited, bLocal, failType));
}
do
switch (sync)
{
if (mDispatchState == DispatchState::running)
{
// A batch processing job is already running, so wait.
mCond.wait(lock);
}
else
{
apply(lock);
if (mTransactions.size())
using enum RPC::SubmitSync;
case sync:
do
{
// More transactions need to be applied, but by another job.
if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
transactionBatch();
}))
{
mDispatchState = DispatchState::scheduled;
}
}
}
} while (transaction->getApplying());
// If a batch is being processed, then wait. Otherwise,
// process a batch.
if (mDispatchState == DispatchState::running)
mCond.wait(lock);
else
apply(lock);
} while (transaction->getApplying());
break;
case async:
// It's conceivable for the submitted transaction to be
// processed and its result to be modified before being returned
// to the client. Make a copy of the transaction and set its
// status to guarantee that the client gets the terSUBMITTED
// result in all cases.
transaction = std::make_shared<Transaction>(*transaction);
transaction->setResult(terSUBMITTED);
break;
case wait:
mCond.wait(
lock, [&transaction] { return !transaction->getApplying(); });
break;
default:
assert(false);
}
}
void
NetworkOPsImp::transactionBatch()
bool
NetworkOPsImp::transactionBatch(bool const drain)
{
std::unique_lock<std::mutex> lock(mMutex);
if (mDispatchState == DispatchState::running)
return;
while (mTransactions.size())
{
apply(lock);
std::unique_lock<std::mutex> lock(mMutex);
if (mDispatchState == DispatchState::running || mTransactions.empty())
return false;
do
apply(lock);
while (drain && mTransactions.size());
}
setBatchApplyTimer();
return true;
}
void
NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{
assert(!mTransactions.empty());
assert(mDispatchState != DispatchState::running);
std::vector<TransactionStatus> submit_held;
std::vector<TransactionStatus> transactions;
mTransactions.swap(transactions);
assert(!transactions.empty());
assert(mDispatchState != DispatchState::running);
mDispatchState = DispatchState::running;
batchLock.unlock();
@@ -1702,7 +1697,9 @@ NetworkOPsImp::checkLastClosedLedger(
switchLedgers = false;
}
else
{
networkClosed = closedLedger;
}
if (!switchLedgers)
return false;

View File

@@ -71,6 +71,10 @@ enum class OperatingMode {
FULL = 4 //!< we have the ledger and can even validate
};
namespace RPC {
enum class SubmitSync;
}
/** Provides server functionality for clients.
Clients include backend applications, local commands, and connected
@@ -123,22 +127,47 @@ public:
virtual void
submitTransaction(std::shared_ptr<STTx const> const&) = 0;
/**
* Process transactions as they arrive from the network or which are
* submitted by clients. Process local transactions synchronously
/** Process a transaction.
*
* @param transaction Transaction object
* The transaction has been submitted either from the peer network or
* from a client. For client submissions, there are 3 distinct behaviors:
* 1) sync (default): process transactions in a batch immediately,
* and return only once the transaction has been processed.
* 2) async: Put transaction into the batch for the next processing
* interval and return immediately.
* 3) wait: Put transaction into the batch for the next processing
* interval and return only after it is processed.
*
* @param transaction Transaction object.
* @param bUnlimited Whether a privileged client connection submitted it.
* @param bLocal Client submission.
* @param failType fail_hard setting from transaction submission.
* @param sync Client submission synchronous behavior type requested.
* @param bLocal Whether submitted by client (local) or peer.
* @param failType Whether to fail hard or not.
*/
virtual void
processTransaction(
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
RPC::SubmitSync sync,
bool bLocal,
FailHard failType) = 0;
/** Apply transactions in batches.
*
* Only a single batch unless drain is set. This is to optimize performance
* because there is significant overhead in applying each batch, whereas
* processing an individual transaction is fast.
*
* Setting the drain parameter is relevant for some transaction
* processing unit tests that expect all submitted transactions to
* be processed synchronously.
*
* @param drain Whether to process batches until none remain.
* @return Whether any transactions were processed.
*/
virtual bool
transactionBatch(bool drain) = 0;
//--------------------------------------------------------------------------
//
// Owner functions
@@ -187,6 +216,8 @@ public:
setStandAlone() = 0;
virtual void
setStateTimer() = 0;
virtual void
setBatchApplyTimer() = 0;
virtual void
setNeedNetworkLedger() = 0;

View File

@@ -134,7 +134,7 @@ applyTransaction(
if (retryAssured)
flags = flags | tapRETRY;
JLOG(j.debug()) << "TXN " << txn.getTransactionID()
JLOG(j.trace()) << "TXN " << txn.getTransactionID()
<< (retryAssured ? "/retry" : "/final");
try
@@ -142,7 +142,7 @@ applyTransaction(
auto const result = apply(app, view, txn, flags, j);
if (result.second)
{
JLOG(j.debug())
JLOG(j.trace())
<< "Transaction applied: " << transHuman(result.first);
return ApplyResult::Success;
}
@@ -151,17 +151,17 @@ applyTransaction(
isTelLocal(result.first))
{
// failure
JLOG(j.debug())
JLOG(j.trace())
<< "Transaction failure: " << transHuman(result.first);
return ApplyResult::Fail;
}
JLOG(j.debug()) << "Transaction retry: " << transHuman(result.first);
JLOG(j.trace()) << "Transaction retry: " << transHuman(result.first);
return ApplyResult::Retry;
}
catch (std::exception const& ex)
{
JLOG(j.warn()) << "Throws: " << ex.what();
JLOG(j.trace()) << "Throws: " << ex.what();
return ApplyResult::Fail;
}
}

View File

@@ -0,0 +1,41 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2023 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_SUBMITSYNC_H_INCLUDED
#define RIPPLE_BASICS_SUBMITSYNC_H_INCLUDED
namespace ripple {
namespace RPC {
/**
* Possible values for defining synchronous behavior of the transaction
* submission API.
* 1) sync (default): Process transactions in a batch immediately,
* and return only once the transaction has been processed.
* 2) async: Put transaction into the batch for the next processing
* interval and return immediately.
* 3) wait: Put transaction into the batch for the next processing
* interval and return only after it is processed.
*/
enum class SubmitSync { sync, async, wait };
} // namespace RPC
} // namespace ripple
#endif

View File

@@ -210,7 +210,7 @@ public:
// Node storage configuration
std::uint32_t LEDGER_HISTORY = 256;
std::uint32_t FETCH_DEPTH = 1000000000;
std::uint32_t FETCH_DEPTH = 1'000'000'000;
// Tunable that adjusts various parameters, typically associated
// with hardware parameters (RAM size and CPU cores). The default
@@ -227,10 +227,11 @@ public:
// Enable the experimental Ledger Replay functionality
bool LEDGER_REPLAY = false;
// Work queue limits
int MAX_TRANSACTIONS = 250;
static constexpr int MAX_JOB_QUEUE_TX = 1000;
static constexpr int MIN_JOB_QUEUE_TX = 100;
// Work queue limits. 10000 transactions is 2 full seconds of slowdown at
// 5000/s.
int MAX_TRANSACTIONS = 10'000;
static constexpr int MAX_JOB_QUEUE_TX = 100'000;
static constexpr int MIN_JOB_QUEUE_TX = 1'000;
// Amendment majority time
std::chrono::seconds AMENDMENT_MAJORITY_TIME = defaultAmendmentMajorityTime;

View File

@@ -28,6 +28,7 @@
#include <ripple/app/misc/Transaction.h>
#include <ripple/app/misc/ValidatorList.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/basics/UptimeClock.h>
#include <ripple/basics/base64.h>
#include <ripple/basics/random.h>
@@ -39,13 +40,14 @@
#include <ripple/overlay/impl/PeerImp.h>
#include <ripple/overlay/impl/Tuning.h>
#include <ripple/overlay/predicates.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/digest.h>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/beast/core/ostream.hpp>
#include <algorithm>
#include <chrono>
#include <memory>
#include <mutex>
#include <numeric>
@@ -3109,7 +3111,11 @@ PeerImp::checkTransaction(
bool const trusted(flags & SF_TRUSTED);
app_.getOPs().processTransaction(
tx, trusted, false, NetworkOPs::FailHard::no);
tx,
trusted,
RPC::SubmitSync::async,
false,
NetworkOPs::FailHard::no);
}
catch (std::exception const& ex)
{

View File

@@ -208,6 +208,7 @@ enum TERcodes : TERUnderlyingType {
terQUEUED, // Transaction is being held in TxQ until fee drops
terPRE_TICKET, // Ticket is not yet in ledger but might be on its way
terNO_AMM, // AMM doesn't exist for the asset pair
terSUBMITTED // Has been submitted async.
};
//------------------------------------------------------------------------------

View File

@@ -189,6 +189,7 @@ transResults()
MAKE_ERROR(terQUEUED, "Held until escalated fee drops."),
MAKE_ERROR(terPRE_TICKET, "Ticket is not yet in ledger."),
MAKE_ERROR(terNO_AMM, "AMM doesn't exist for the asset pair."),
MAKE_ERROR(terSUBMITTED, "Transaction has been submitted."),
MAKE_ERROR(tesSUCCESS, "The transaction was applied. Only final in a validated ledger."),
};

View File

@@ -599,6 +599,7 @@ JSS(sub_index); // in: LedgerEntry
JSS(subcommand); // in: PathFind
JSS(success); // rpc
JSS(supported); // out: AmendmentTableImpl
JSS(sync_mode); // in: Submit
JSS(system_time_offset); // out: NetworkOPs
JSS(tag); // out: Peers
JSS(taker); // in: Subscribe, BookOffers

View File

@@ -21,6 +21,7 @@
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/Transaction.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/net/RPCErr.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/resource/Fees.h>
@@ -48,6 +49,10 @@ doSubmit(RPC::JsonContext& context)
{
context.loadType = Resource::feeMediumBurdenRPC;
auto const sync = RPC::getSubmitSyncMode(context.params);
if (!sync)
return sync.error();
if (!context.params.isMember(jss::tx_blob))
{
auto const failType = getFailHard(context);
@@ -62,7 +67,8 @@ doSubmit(RPC::JsonContext& context)
context.role,
context.ledgerMaster.getValidatedLedgerAge(),
context.app,
RPC::getProcessTxnFn(context.netOps));
RPC::getProcessTxnFn(context.netOps),
*sync);
ret[jss::deprecated] =
"Signing support in the 'submit' command has been "
@@ -131,7 +137,7 @@ doSubmit(RPC::JsonContext& context)
auto const failType = getFailHard(context);
context.netOps.processTransaction(
tpTrans, isUnlimited(context.role), true, failType);
tpTrans, isUnlimited(context.role), *sync, true, failType);
}
catch (std::exception& e)
{

View File

@@ -18,10 +18,12 @@
//==============================================================================
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/Feature.h>
#include <ripple/resource/Fees.h>
#include <ripple/rpc/Context.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/TransactionSign.h>
namespace ripple {
@@ -37,13 +39,18 @@ doSubmitMultiSigned(RPC::JsonContext& context)
auto const failHard = context.params[jss::fail_hard].asBool();
auto const failType = NetworkOPs::doFailHard(failHard);
auto const sync = RPC::getSubmitSyncMode(context.params);
if (!sync)
return sync.error();
return RPC::transactionSubmitMultiSigned(
context.params,
failType,
context.role,
context.ledgerMaster.getValidatedLedgerAge(),
context.app,
RPC::getProcessTxnFn(context.netOps));
RPC::getProcessTxnFn(context.netOps),
*sync);
}
} // namespace ripple

View File

@@ -1166,5 +1166,26 @@ getLedgerByContext(RPC::JsonContext& context)
return RPC::make_error(
rpcNOT_READY, "findCreate failed to return an inbound ledger");
}
ripple::Expected<RPC::SubmitSync, Json::Value>
getSubmitSyncMode(Json::Value const& params)
{
using enum RPC::SubmitSync;
if (params.isMember(jss::sync_mode))
{
std::string const syncMode = params[jss::sync_mode].asString();
if (syncMode == "async")
return async;
else if (syncMode == "wait")
return wait;
else if (syncMode != "sync")
return Unexpected(RPC::make_error(
rpcINVALID_PARAMS,
"sync_mode parameter must be one of \"sync\", \"async\", or "
"\"wait\"."));
}
return sync;
}
} // namespace RPC
} // namespace ripple

View File

@@ -26,6 +26,8 @@
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/TxQ.h>
#include <ripple/basics/Expected.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/protocol/SecretKey.h>
#include <ripple/rpc/Context.h>
#include <ripple/rpc/Status.h>
@@ -293,6 +295,14 @@ getAPIVersionNumber(const Json::Value& value, bool betaEnabled);
std::variant<std::shared_ptr<Ledger const>, Json::Value>
getLedgerByContext(RPC::JsonContext& context);
/** Helper to parse submit_mode parameter to RPC submit.
*
* @param params RPC parameters
* @return Either the mode or an error object.
*/
ripple::Expected<RPC::SubmitSync, Json::Value>
getSubmitSyncMode(Json::Value const& params);
} // namespace RPC
} // namespace ripple

View File

@@ -802,7 +802,8 @@ transactionSubmit(
Role role,
std::chrono::seconds validatedLedgerAge,
Application& app,
ProcessTransactionFn const& processTransaction)
ProcessTransactionFn const& processTransaction,
RPC::SubmitSync sync)
{
using namespace detail;
@@ -828,8 +829,7 @@ transactionSubmit(
// Finally, submit the transaction.
try
{
// FIXME: For performance, should use asynch interface
processTransaction(txn.second, isUnlimited(role), true, failType);
processTransaction(txn.second, isUnlimited(role), sync, failType);
}
catch (std::exception&)
{
@@ -1038,7 +1038,8 @@ transactionSubmitMultiSigned(
Role role,
std::chrono::seconds validatedLedgerAge,
Application& app,
ProcessTransactionFn const& processTransaction)
ProcessTransactionFn const& processTransaction,
RPC::SubmitSync sync)
{
auto const& ledger = app.openLedger().current();
auto j = app.journal("RPCHandler");
@@ -1211,7 +1212,7 @@ transactionSubmitMultiSigned(
try
{
// FIXME: For performance, should use asynch interface
processTransaction(txn.second, isUnlimited(role), true, failType);
processTransaction(txn.second, isUnlimited(role), sync, failType);
}
catch (std::exception&)
{

View File

@@ -21,6 +21,7 @@
#define RIPPLE_RPC_TRANSACTIONSIGN_H_INCLUDED
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/ledger/ApplyView.h>
#include <ripple/rpc/Role.h>
@@ -75,7 +76,7 @@ checkFee(
using ProcessTransactionFn = std::function<void(
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
bool bLocal,
RPC::SubmitSync sync,
NetworkOPs::FailHard failType)>;
inline ProcessTransactionFn
@@ -84,9 +85,10 @@ getProcessTxnFn(NetworkOPs& netOPs)
return [&netOPs](
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
bool bLocal,
RPC::SubmitSync sync,
NetworkOPs::FailHard failType) {
netOPs.processTransaction(transaction, bUnlimited, bLocal, failType);
netOPs.processTransaction(
transaction, bUnlimited, sync, true, failType);
};
}
@@ -107,7 +109,8 @@ transactionSubmit(
Role role,
std::chrono::seconds validatedLedgerAge,
Application& app,
ProcessTransactionFn const& processTransaction);
ProcessTransactionFn const& processTransaction,
RPC::SubmitSync sync);
/** Returns a Json::objectValue. */
Json::Value
@@ -126,7 +129,8 @@ transactionSubmitMultiSigned(
Role role,
std::chrono::seconds validatedLedgerAge,
Application& app,
ProcessTransactionFn const& processTransaction);
ProcessTransactionFn const& processTransaction,
RPC::SubmitSync sync);
} // namespace RPC
} // namespace ripple

View File

@@ -15,6 +15,7 @@
*/
//==============================================================================
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/core/JobQueue.h>
#include <ripple/protocol/ErrorCodes.h>
#include <test/jtx.h>
@@ -91,6 +92,7 @@ struct Transaction_ordering_test : public beast::unit_test::suite
env(tx2, ter(terPRE_SEQ));
BEAST_EXPECT(env.seq(alice) == aliceSequence);
env(tx1);
BEAST_EXPECT(env.app().getOPs().transactionBatch(false));
env.app().getJobQueue().rendezvous();
BEAST_EXPECT(env.seq(alice) == aliceSequence + 2);
@@ -143,6 +145,8 @@ struct Transaction_ordering_test : public beast::unit_test::suite
}
env(tx[0]);
// Apply until no more deferred/held transactions.
BEAST_EXPECT(env.app().getOPs().transactionBatch(true));
env.app().getJobQueue().rendezvous();
BEAST_EXPECT(env.seq(alice) == aliceSequence + 5);

View File

@@ -29,6 +29,7 @@
#include <boost/lexical_cast.hpp>
#include <optional>
#include <thread>
#include <utility>
namespace ripple {
@@ -900,6 +901,95 @@ public:
pass();
}
void
testSyncSubmit()
{
using namespace jtx;
Env env(*this);
auto const alice = Account{"alice"};
auto const n = XRP(10000);
env.fund(n, alice);
BEAST_EXPECT(env.balance(alice) == n);
// submit only
auto applyBlobTxn = [&env](char const* syncMode, auto&&... txnArgs) {
auto jt = env.jt(txnArgs...);
Serializer s;
jt.stx->add(s);
Json::Value args{Json::objectValue};
args[jss::tx_blob] = strHex(s.slice());
args[jss::fail_hard] = true;
args[jss::sync_mode] = syncMode;
return env.rpc("json", "submit", args.toStyledString());
};
auto jr = applyBlobTxn("sync", noop(alice));
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS");
jr = applyBlobTxn("async", noop(alice));
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "terSUBMITTED");
// Make sure it gets processed before submitting and waiting.
env.app().getOPs().transactionBatch(true);
auto applier = [&env]() {
while (!env.app().getOPs().transactionBatch(false))
std::this_thread::sleep_for(std::chrono::milliseconds(1));
};
auto t = std::thread(applier);
jr = applyBlobTxn("wait", noop(alice));
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS");
t.join();
jr = applyBlobTxn("scott", noop(alice));
BEAST_EXPECT(jr[jss::result][jss::error] == "invalidParams");
// sign and submit
auto applyJsonTxn = [&env](
char const* syncMode,
std::string const secret,
Json::Value const& val) {
Json::Value args{Json::objectValue};
args[jss::secret] = secret;
args[jss::tx_json] = val;
args[jss::fail_hard] = true;
args[jss::sync_mode] = syncMode;
return env.rpc("json", "submit", args.toStyledString());
};
Json::Value payment;
auto secret = toBase58(generateSeed("alice"));
payment = noop("alice");
payment[sfSequence.fieldName] = env.seq("alice");
payment[sfSetFlag.fieldName] = 0;
jr = applyJsonTxn("sync", secret, payment);
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS");
payment[sfSequence.fieldName] = env.seq("alice");
jr = applyJsonTxn("async", secret, payment);
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "terSUBMITTED");
env.app().getOPs().transactionBatch(true);
payment[sfSequence.fieldName] = env.seq("alice");
auto aSeq = env.seq("alice");
t = std::thread(applier);
jr = applyJsonTxn("wait", secret, payment);
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS");
t.join();
// Ensure the last transaction was processed.
BEAST_EXPECT(env.seq("alice") == aSeq + 1);
payment[sfSequence.fieldName] = env.seq("alice");
jr = applyJsonTxn("scott", secret, payment);
BEAST_EXPECT(jr[jss::result][jss::error] == "invalidParams");
}
void
run() override
{
@@ -925,6 +1015,7 @@ public:
testSignAndSubmit();
testFeatures();
testExceptionalShutdown();
testSyncSubmit();
}
};

View File

@@ -19,6 +19,7 @@
#include <ripple/app/misc/LoadFeeTrack.h>
#include <ripple/app/misc/TxQ.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/basics/contract.h>
#include <ripple/beast/unit_test.h>
#include <ripple/core/ConfigSections.h>
@@ -2384,7 +2385,7 @@ public:
fakeProcessTransaction(
std::shared_ptr<Transaction>&,
bool,
bool,
SubmitSync,
NetworkOPs::FailHard)
{
;
@@ -2432,7 +2433,8 @@ public:
Role role,
std::chrono::seconds validatedLedgerAge,
Application & app,
ProcessTransactionFn const& processTransaction);
ProcessTransactionFn const& processTransaction,
RPC::SubmitSync sync);
using TestStuff =
std::tuple<signFunc, submitFunc, char const*, unsigned int>;
@@ -2485,7 +2487,8 @@ public:
testRole,
1s,
env.app(),
processTxn);
processTxn,
RPC::SubmitSync::sync);
}
std::string errStr;

View File

@@ -17,6 +17,7 @@
*/
//==============================================================================
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/beast/unit_test.h>
#include <ripple/core/JobQueue.h>
#include <ripple/protocol/jss.h>
@@ -88,7 +89,8 @@ public:
}
BEAST_EXPECT(jv[jss::result][jss::engine_result] == "tefPAST_SEQ");
// Submit future sequence transaction
// Submit future sequence transaction -- this transaction should be
// held until the sequence gap is closed.
payment[jss::tx_json][sfSequence.fieldName] = env.seq("alice") + 1;
jv = wsc->invoke("submit", payment);
if (wsc->version() == 2)
@@ -114,6 +116,8 @@ public:
}
BEAST_EXPECT(jv[jss::result][jss::engine_result] == "tesSUCCESS");
// Apply held transactions.
env.app().getOPs().transactionBatch(true);
// Wait for the jobqueue to process everything
env.app().getJobQueue().rendezvous();