Revert "Apply transaction batches in periodic intervals (#4504)" (#4852)

This reverts commit 002893f280.

There were two files with conflicts in the automated revert:

- src/ripple/rpc/impl/RPCHelpers.h and
- src/test/rpc/JSONRPC_test.cpp

Those files were manually resolved.
This commit is contained in:
Scott Schurr
2023-12-20 09:30:12 -08:00
committed by GitHub
parent ffb53f2085
commit c53a5e7a72
24 changed files with 167 additions and 402 deletions

View File

@@ -201,7 +201,6 @@ install (
src/ripple/basics/StringUtilities.h src/ripple/basics/StringUtilities.h
src/ripple/basics/TaggedCache.h src/ripple/basics/TaggedCache.h
src/ripple/basics/tagged_integer.h src/ripple/basics/tagged_integer.h
src/ripple/basics/SubmitSync.h
src/ripple/basics/ThreadSafetyAnalysis.h src/ripple/basics/ThreadSafetyAnalysis.h
src/ripple/basics/ToString.h src/ripple/basics/ToString.h
src/ripple/basics/UnorderedContainers.h src/ripple/basics/UnorderedContainers.h

View File

@@ -482,7 +482,7 @@
# #
# Configure the maximum number of transactions to have in the job queue # Configure the maximum number of transactions to have in the job queue
# #
# Must be a number between 1000 and 100000, defaults to 10000 # Must be a number between 100 and 1000, defaults to 250
# #
# #
# [overlay] # [overlay]

View File

@@ -454,7 +454,7 @@
# #
# Configure the maximum number of transactions to have in the job queue # Configure the maximum number of transactions to have in the job queue
# #
# Must be a number between 1000 and 100000, defaults to 10000 # Must be a number between 100 and 1000, defaults to 250
# #
# #
# [overlay] # [overlay]

View File

@@ -549,25 +549,22 @@ void
LedgerMaster::applyHeldTransactions() LedgerMaster::applyHeldTransactions()
{ {
std::lock_guard sl(m_mutex); std::lock_guard sl(m_mutex);
// It can be expensive to modify the open ledger even with no transactions
// to process. Regardless, make sure to reset held transactions with
// the parent.
if (mHeldTransactions.size())
{
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
bool any = false;
for (auto const& it : mHeldTransactions)
{
ApplyFlags flags = tapNONE;
auto const result =
app_.getTxQ().apply(app_, view, it.second, flags, j);
if (result.second)
any = true;
}
return any;
});
}
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
bool any = false;
for (auto const& it : mHeldTransactions)
{
ApplyFlags flags = tapNONE;
auto const result =
app_.getTxQ().apply(app_, view, it.second, flags, j);
if (result.second)
any = true;
}
return any;
});
// VFALCO TODO recreate the CanonicalTxSet object instead of resetting
// it.
// VFALCO NOTE The hash for an open ledger is undefined so we use // VFALCO NOTE The hash for an open ledger is undefined so we use
// something that is a reasonable substitute. // something that is a reasonable substitute.
mHeldTransactions.reset(app_.openLedger().current()->info().parentHash); mHeldTransactions.reset(app_.openLedger().current()->info().parentHash);

View File

@@ -1533,7 +1533,6 @@ ApplicationImp::start(bool withTimers)
{ {
setSweepTimer(); setSweepTimer();
setEntropyTimer(); setEntropyTimer();
m_networkOPs->setBatchApplyTimer();
} }
m_io_latency_sampler.start(); m_io_latency_sampler.start();

View File

@@ -43,7 +43,6 @@
#include <ripple/app/reporting/ReportingETL.h> #include <ripple/app/reporting/ReportingETL.h>
#include <ripple/app/tx/apply.h> #include <ripple/app/tx/apply.h>
#include <ripple/basics/PerfLog.h> #include <ripple/basics/PerfLog.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/basics/UptimeClock.h> #include <ripple/basics/UptimeClock.h>
#include <ripple/basics/mulDiv.h> #include <ripple/basics/mulDiv.h>
#include <ripple/basics/safe_cast.h> #include <ripple/basics/safe_cast.h>
@@ -238,7 +237,6 @@ public:
, heartbeatTimer_(io_svc) , heartbeatTimer_(io_svc)
, clusterTimer_(io_svc) , clusterTimer_(io_svc)
, accountHistoryTxTimer_(io_svc) , accountHistoryTxTimer_(io_svc)
, batchApplyTimer_(io_svc)
, mConsensus( , mConsensus(
app, app,
make_FeeVote( make_FeeVote(
@@ -288,12 +286,43 @@ public:
processTransaction( processTransaction(
std::shared_ptr<Transaction>& transaction, std::shared_ptr<Transaction>& transaction,
bool bUnlimited, bool bUnlimited,
RPC::SubmitSync sync,
bool bLocal, bool bLocal,
FailHard failType) override; FailHard failType) override;
bool /**
transactionBatch(bool drain) override; * For transactions submitted directly by a client, apply batch of
* transactions and wait for this transaction to complete.
*
* @param transaction Transaction object.
* @param bUnliimited Whether a privileged client connection submitted it.
* @param failType fail_hard setting from transaction submission.
*/
void
doTransactionSync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType);
/**
* For transactions not submitted by a locally connected client, fire and
* forget. Add to batch and trigger it to be processed if there's no batch
* currently being applied.
*
* @param transaction Transaction object
* @param bUnlimited Whether a privileged client connection submitted it.
* @param failType fail_hard setting from transaction submission.
*/
void
doTransactionAsync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failtype);
/**
* Apply transactions in batches. Continue until none are queued.
*/
void
transactionBatch();
/** /**
* Attempt to apply transactions and post-process based on the results. * Attempt to apply transactions and post-process based on the results.
@@ -567,15 +596,6 @@ public:
<< "NetworkOPs: accountHistoryTxTimer cancel error: " << "NetworkOPs: accountHistoryTxTimer cancel error: "
<< ec.message(); << ec.message();
} }
ec.clear();
batchApplyTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: batchApplyTimer cancel error: "
<< ec.message();
}
} }
// Make sure that any waitHandlers pending in our timers are done. // Make sure that any waitHandlers pending in our timers are done.
using namespace std::chrono_literals; using namespace std::chrono_literals;
@@ -697,9 +717,6 @@ private:
void void
setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo); setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo);
void
setBatchApplyTimer() override;
Application& app_; Application& app_;
beast::Journal m_journal; beast::Journal m_journal;
@@ -718,8 +735,6 @@ private:
boost::asio::steady_timer heartbeatTimer_; boost::asio::steady_timer heartbeatTimer_;
boost::asio::steady_timer clusterTimer_; boost::asio::steady_timer clusterTimer_;
boost::asio::steady_timer accountHistoryTxTimer_; boost::asio::steady_timer accountHistoryTxTimer_;
//! This timer is for applying transaction batches.
boost::asio::steady_timer batchApplyTimer_;
RCLConsensus mConsensus; RCLConsensus mConsensus;
@@ -993,42 +1008,6 @@ NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
[this, subInfo]() { setAccountHistoryJobTimer(subInfo); }); [this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
} }
void
NetworkOPsImp::setBatchApplyTimer()
{
using namespace std::chrono_literals;
// 100ms lag between batch intervals provides significant throughput gains
// with little increased latency. Tuning this figure further will
// require further testing. In general, increasing this figure will
// also increase theoretical throughput, but with diminishing returns.
auto constexpr batchInterval = 100ms;
setTimer(
batchApplyTimer_,
batchInterval,
[this]() {
{
std::lock_guard lock(mMutex);
// Only do the job if there's work to do and it's not currently
// being done.
if (mTransactions.size() &&
mDispatchState == DispatchState::none)
{
if (m_job_queue.addJob(
jtBATCH, "transactionBatch", [this]() {
transactionBatch(false);
}))
{
mDispatchState = DispatchState::scheduled;
}
return;
}
}
setBatchApplyTimer();
},
[this]() { setBatchApplyTimer(); });
}
void void
NetworkOPsImp::processHeartbeatTimer() NetworkOPsImp::processHeartbeatTimer()
{ {
@@ -1205,8 +1184,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() { m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
auto t = tx; auto t = tx;
processTransaction( processTransaction(t, false, false, FailHard::no);
t, false, RPC::SubmitSync::async, false, FailHard::no);
}); });
} }
@@ -1214,7 +1192,6 @@ void
NetworkOPsImp::processTransaction( NetworkOPsImp::processTransaction(
std::shared_ptr<Transaction>& transaction, std::shared_ptr<Transaction>& transaction,
bool bUnlimited, bool bUnlimited,
RPC::SubmitSync sync,
bool bLocal, bool bLocal,
FailHard failType) FailHard failType)
{ {
@@ -1244,7 +1221,7 @@ NetworkOPsImp::processTransaction(
// Not concerned with local checks at this point. // Not concerned with local checks at this point.
if (validity == Validity::SigBad) if (validity == Validity::SigBad)
{ {
JLOG(m_journal.trace()) << "Transaction has bad signature: " << reason; JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
transaction->setStatus(INVALID); transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE); transaction->setResult(temBAD_SIGNATURE);
app_.getHashRouter().setFlags(transaction->getID(), SF_BAD); app_.getHashRouter().setFlags(transaction->getID(), SF_BAD);
@@ -1254,72 +1231,100 @@ NetworkOPsImp::processTransaction(
// canonicalize can change our pointer // canonicalize can change our pointer
app_.getMasterTransaction().canonicalize(&transaction); app_.getMasterTransaction().canonicalize(&transaction);
std::unique_lock lock(mMutex); if (bLocal)
if (!transaction->getApplying()) doTransactionSync(transaction, bUnlimited, failType);
else
doTransactionAsync(transaction, bUnlimited, failType);
}
void
NetworkOPsImp::doTransactionAsync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType)
{
std::lock_guard lock(mMutex);
if (transaction->getApplying())
return;
mTransactions.push_back(
TransactionStatus(transaction, bUnlimited, false, failType));
transaction->setApplying();
if (mDispatchState == DispatchState::none)
{ {
transaction->setApplying(); if (m_job_queue.addJob(
mTransactions.push_back( jtBATCH, "transactionBatch", [this]() { transactionBatch(); }))
TransactionStatus(transaction, bUnlimited, bLocal, failType)); {
} mDispatchState = DispatchState::scheduled;
switch (sync) }
{
using enum RPC::SubmitSync;
case sync:
do
{
// If a batch is being processed, then wait. Otherwise,
// process a batch.
if (mDispatchState == DispatchState::running)
mCond.wait(lock);
else
apply(lock);
} while (transaction->getApplying());
break;
case async:
// It's conceivable for the submitted transaction to be
// processed and its result to be modified before being returned
// to the client. Make a copy of the transaction and set its
// status to guarantee that the client gets the terSUBMITTED
// result in all cases.
transaction = std::make_shared<Transaction>(*transaction);
transaction->setResult(terSUBMITTED);
break;
case wait:
mCond.wait(
lock, [&transaction] { return !transaction->getApplying(); });
break;
default:
assert(false);
} }
} }
bool void
NetworkOPsImp::transactionBatch(bool const drain) NetworkOPsImp::doTransactionSync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType)
{ {
{ std::unique_lock<std::mutex> lock(mMutex);
std::unique_lock<std::mutex> lock(mMutex);
if (mDispatchState == DispatchState::running || mTransactions.empty())
return false;
do if (!transaction->getApplying())
apply(lock); {
while (drain && mTransactions.size()); mTransactions.push_back(
TransactionStatus(transaction, bUnlimited, true, failType));
transaction->setApplying();
}
do
{
if (mDispatchState == DispatchState::running)
{
// A batch processing job is already running, so wait.
mCond.wait(lock);
}
else
{
apply(lock);
if (mTransactions.size())
{
// More transactions need to be applied, but by another job.
if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
transactionBatch();
}))
{
mDispatchState = DispatchState::scheduled;
}
}
}
} while (transaction->getApplying());
}
void
NetworkOPsImp::transactionBatch()
{
std::unique_lock<std::mutex> lock(mMutex);
if (mDispatchState == DispatchState::running)
return;
while (mTransactions.size())
{
apply(lock);
} }
setBatchApplyTimer();
return true;
} }
void void
NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock) NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{ {
assert(!mTransactions.empty());
assert(mDispatchState != DispatchState::running);
std::vector<TransactionStatus> submit_held; std::vector<TransactionStatus> submit_held;
std::vector<TransactionStatus> transactions; std::vector<TransactionStatus> transactions;
mTransactions.swap(transactions); mTransactions.swap(transactions);
assert(!transactions.empty());
assert(mDispatchState != DispatchState::running);
mDispatchState = DispatchState::running; mDispatchState = DispatchState::running;
batchLock.unlock(); batchLock.unlock();
@@ -1703,9 +1708,7 @@ NetworkOPsImp::checkLastClosedLedger(
switchLedgers = false; switchLedgers = false;
} }
else else
{
networkClosed = closedLedger; networkClosed = closedLedger;
}
if (!switchLedgers) if (!switchLedgers)
return false; return false;

View File

@@ -71,10 +71,6 @@ enum class OperatingMode {
FULL = 4 //!< we have the ledger and can even validate FULL = 4 //!< we have the ledger and can even validate
}; };
namespace RPC {
enum class SubmitSync;
}
/** Provides server functionality for clients. /** Provides server functionality for clients.
Clients include backend applications, local commands, and connected Clients include backend applications, local commands, and connected
@@ -127,47 +123,22 @@ public:
virtual void virtual void
submitTransaction(std::shared_ptr<STTx const> const&) = 0; submitTransaction(std::shared_ptr<STTx const> const&) = 0;
/** Process a transaction. /**
* Process transactions as they arrive from the network or which are
* submitted by clients. Process local transactions synchronously
* *
* The transaction has been submitted either from the peer network or * @param transaction Transaction object
* from a client. For client submissions, there are 3 distinct behaviors:
* 1) sync (default): process transactions in a batch immediately,
* and return only once the transaction has been processed.
* 2) async: Put transaction into the batch for the next processing
* interval and return immediately.
* 3) wait: Put transaction into the batch for the next processing
* interval and return only after it is processed.
*
* @param transaction Transaction object.
* @param bUnlimited Whether a privileged client connection submitted it. * @param bUnlimited Whether a privileged client connection submitted it.
* @param sync Client submission synchronous behavior type requested. * @param bLocal Client submission.
* @param bLocal Whether submitted by client (local) or peer. * @param failType fail_hard setting from transaction submission.
* @param failType Whether to fail hard or not.
*/ */
virtual void virtual void
processTransaction( processTransaction(
std::shared_ptr<Transaction>& transaction, std::shared_ptr<Transaction>& transaction,
bool bUnlimited, bool bUnlimited,
RPC::SubmitSync sync,
bool bLocal, bool bLocal,
FailHard failType) = 0; FailHard failType) = 0;
/** Apply transactions in batches.
*
* Only a single batch unless drain is set. This is to optimize performance
* because there is significant overhead in applying each batch, whereas
* processing an individual transaction is fast.
*
* Setting the drain parameter is relevant for some transaction
* processing unit tests that expect all submitted transactions to
* be processed synchronously.
*
* @param drain Whether to process batches until none remain.
* @return Whether any transactions were processed.
*/
virtual bool
transactionBatch(bool drain) = 0;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// //
// Owner functions // Owner functions
@@ -216,8 +187,6 @@ public:
setStandAlone() = 0; setStandAlone() = 0;
virtual void virtual void
setStateTimer() = 0; setStateTimer() = 0;
virtual void
setBatchApplyTimer() = 0;
virtual void virtual void
setNeedNetworkLedger() = 0; setNeedNetworkLedger() = 0;

View File

@@ -134,7 +134,7 @@ applyTransaction(
if (retryAssured) if (retryAssured)
flags = flags | tapRETRY; flags = flags | tapRETRY;
JLOG(j.trace()) << "TXN " << txn.getTransactionID() JLOG(j.debug()) << "TXN " << txn.getTransactionID()
<< (retryAssured ? "/retry" : "/final"); << (retryAssured ? "/retry" : "/final");
try try
@@ -142,7 +142,7 @@ applyTransaction(
auto const result = apply(app, view, txn, flags, j); auto const result = apply(app, view, txn, flags, j);
if (result.second) if (result.second)
{ {
JLOG(j.trace()) JLOG(j.debug())
<< "Transaction applied: " << transHuman(result.first); << "Transaction applied: " << transHuman(result.first);
return ApplyResult::Success; return ApplyResult::Success;
} }
@@ -151,17 +151,17 @@ applyTransaction(
isTelLocal(result.first)) isTelLocal(result.first))
{ {
// failure // failure
JLOG(j.trace()) JLOG(j.debug())
<< "Transaction failure: " << transHuman(result.first); << "Transaction failure: " << transHuman(result.first);
return ApplyResult::Fail; return ApplyResult::Fail;
} }
JLOG(j.trace()) << "Transaction retry: " << transHuman(result.first); JLOG(j.debug()) << "Transaction retry: " << transHuman(result.first);
return ApplyResult::Retry; return ApplyResult::Retry;
} }
catch (std::exception const& ex) catch (std::exception const& ex)
{ {
JLOG(j.trace()) << "Throws: " << ex.what(); JLOG(j.warn()) << "Throws: " << ex.what();
return ApplyResult::Fail; return ApplyResult::Fail;
} }
} }

View File

@@ -1,41 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2023 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_SUBMITSYNC_H_INCLUDED
#define RIPPLE_BASICS_SUBMITSYNC_H_INCLUDED
namespace ripple {
namespace RPC {
/**
* Possible values for defining synchronous behavior of the transaction
* submission API.
* 1) sync (default): Process transactions in a batch immediately,
* and return only once the transaction has been processed.
* 2) async: Put transaction into the batch for the next processing
* interval and return immediately.
* 3) wait: Put transaction into the batch for the next processing
* interval and return only after it is processed.
*/
enum class SubmitSync { sync, async, wait };
} // namespace RPC
} // namespace ripple
#endif

View File

@@ -215,7 +215,7 @@ public:
// Node storage configuration // Node storage configuration
std::uint32_t LEDGER_HISTORY = 256; std::uint32_t LEDGER_HISTORY = 256;
std::uint32_t FETCH_DEPTH = 1'000'000'000; std::uint32_t FETCH_DEPTH = 1000000000;
// Tunable that adjusts various parameters, typically associated // Tunable that adjusts various parameters, typically associated
// with hardware parameters (RAM size and CPU cores). The default // with hardware parameters (RAM size and CPU cores). The default
@@ -232,11 +232,10 @@ public:
// Enable the experimental Ledger Replay functionality // Enable the experimental Ledger Replay functionality
bool LEDGER_REPLAY = false; bool LEDGER_REPLAY = false;
// Work queue limits. 10000 transactions is 2 full seconds of slowdown at // Work queue limits
// 5000/s. int MAX_TRANSACTIONS = 250;
int MAX_TRANSACTIONS = 10'000; static constexpr int MAX_JOB_QUEUE_TX = 1000;
static constexpr int MAX_JOB_QUEUE_TX = 100'000; static constexpr int MIN_JOB_QUEUE_TX = 100;
static constexpr int MIN_JOB_QUEUE_TX = 1'000;
// Amendment majority time // Amendment majority time
std::chrono::seconds AMENDMENT_MAJORITY_TIME = defaultAmendmentMajorityTime; std::chrono::seconds AMENDMENT_MAJORITY_TIME = defaultAmendmentMajorityTime;

View File

@@ -28,7 +28,6 @@
#include <ripple/app/misc/Transaction.h> #include <ripple/app/misc/Transaction.h>
#include <ripple/app/misc/ValidatorList.h> #include <ripple/app/misc/ValidatorList.h>
#include <ripple/app/tx/apply.h> #include <ripple/app/tx/apply.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/basics/UptimeClock.h> #include <ripple/basics/UptimeClock.h>
#include <ripple/basics/base64.h> #include <ripple/basics/base64.h>
#include <ripple/basics/random.h> #include <ripple/basics/random.h>
@@ -40,14 +39,13 @@
#include <ripple/overlay/impl/PeerImp.h> #include <ripple/overlay/impl/PeerImp.h>
#include <ripple/overlay/impl/Tuning.h> #include <ripple/overlay/impl/Tuning.h>
#include <ripple/overlay/predicates.h> #include <ripple/overlay/predicates.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/digest.h> #include <ripple/protocol/digest.h>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp> #include <boost/algorithm/string/predicate.hpp>
#include <boost/beast/core/ostream.hpp> #include <boost/beast/core/ostream.hpp>
#include <algorithm> #include <algorithm>
#include <chrono>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <numeric> #include <numeric>
@@ -3111,11 +3109,7 @@ PeerImp::checkTransaction(
bool const trusted(flags & SF_TRUSTED); bool const trusted(flags & SF_TRUSTED);
app_.getOPs().processTransaction( app_.getOPs().processTransaction(
tx, tx, trusted, false, NetworkOPs::FailHard::no);
trusted,
RPC::SubmitSync::async,
false,
NetworkOPs::FailHard::no);
} }
catch (std::exception const& ex) catch (std::exception const& ex)
{ {

View File

@@ -218,7 +218,6 @@ enum TERcodes : TERUnderlyingType {
terQUEUED, // Transaction is being held in TxQ until fee drops terQUEUED, // Transaction is being held in TxQ until fee drops
terPRE_TICKET, // Ticket is not yet in ledger but might be on its way terPRE_TICKET, // Ticket is not yet in ledger but might be on its way
terNO_AMM, // AMM doesn't exist for the asset pair terNO_AMM, // AMM doesn't exist for the asset pair
terSUBMITTED // Has been submitted async.
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -211,7 +211,6 @@ transResults()
MAKE_ERROR(terQUEUED, "Held until escalated fee drops."), MAKE_ERROR(terQUEUED, "Held until escalated fee drops."),
MAKE_ERROR(terPRE_TICKET, "Ticket is not yet in ledger."), MAKE_ERROR(terPRE_TICKET, "Ticket is not yet in ledger."),
MAKE_ERROR(terNO_AMM, "AMM doesn't exist for the asset pair."), MAKE_ERROR(terNO_AMM, "AMM doesn't exist for the asset pair."),
MAKE_ERROR(terSUBMITTED, "Transaction has been submitted."),
MAKE_ERROR(tesSUCCESS, "The transaction was applied. Only final in a validated ledger."), MAKE_ERROR(tesSUCCESS, "The transaction was applied. Only final in a validated ledger."),
}; };

View File

@@ -636,7 +636,6 @@ JSS(sub_index); // in: LedgerEntry
JSS(subcommand); // in: PathFind JSS(subcommand); // in: PathFind
JSS(success); // rpc JSS(success); // rpc
JSS(supported); // out: AmendmentTableImpl JSS(supported); // out: AmendmentTableImpl
JSS(sync_mode); // in: Submit
JSS(system_time_offset); // out: NetworkOPs JSS(system_time_offset); // out: NetworkOPs
JSS(tag); // out: Peers JSS(tag); // out: Peers
JSS(taker); // in: Subscribe, BookOffers JSS(taker); // in: Subscribe, BookOffers

View File

@@ -21,7 +21,6 @@
#include <ripple/app/misc/HashRouter.h> #include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/Transaction.h> #include <ripple/app/misc/Transaction.h>
#include <ripple/app/tx/apply.h> #include <ripple/app/tx/apply.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/net/RPCErr.h> #include <ripple/net/RPCErr.h>
#include <ripple/protocol/ErrorCodes.h> #include <ripple/protocol/ErrorCodes.h>
#include <ripple/resource/Fees.h> #include <ripple/resource/Fees.h>
@@ -49,10 +48,6 @@ doSubmit(RPC::JsonContext& context)
{ {
context.loadType = Resource::feeMediumBurdenRPC; context.loadType = Resource::feeMediumBurdenRPC;
auto const sync = RPC::getSubmitSyncMode(context.params);
if (!sync)
return sync.error();
if (!context.params.isMember(jss::tx_blob)) if (!context.params.isMember(jss::tx_blob))
{ {
auto const failType = getFailHard(context); auto const failType = getFailHard(context);
@@ -68,8 +63,7 @@ doSubmit(RPC::JsonContext& context)
context.role, context.role,
context.ledgerMaster.getValidatedLedgerAge(), context.ledgerMaster.getValidatedLedgerAge(),
context.app, context.app,
RPC::getProcessTxnFn(context.netOps), RPC::getProcessTxnFn(context.netOps));
*sync);
ret[jss::deprecated] = ret[jss::deprecated] =
"Signing support in the 'submit' command has been " "Signing support in the 'submit' command has been "
@@ -138,7 +132,7 @@ doSubmit(RPC::JsonContext& context)
auto const failType = getFailHard(context); auto const failType = getFailHard(context);
context.netOps.processTransaction( context.netOps.processTransaction(
tpTrans, isUnlimited(context.role), *sync, true, failType); tpTrans, isUnlimited(context.role), true, failType);
} }
catch (std::exception& e) catch (std::exception& e)
{ {

View File

@@ -18,12 +18,10 @@
//============================================================================== //==============================================================================
#include <ripple/app/ledger/LedgerMaster.h> #include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/protocol/ErrorCodes.h> #include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/Feature.h> #include <ripple/protocol/Feature.h>
#include <ripple/resource/Fees.h> #include <ripple/resource/Fees.h>
#include <ripple/rpc/Context.h> #include <ripple/rpc/Context.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/TransactionSign.h> #include <ripple/rpc/impl/TransactionSign.h>
namespace ripple { namespace ripple {
@@ -39,10 +37,6 @@ doSubmitMultiSigned(RPC::JsonContext& context)
auto const failHard = context.params[jss::fail_hard].asBool(); auto const failHard = context.params[jss::fail_hard].asBool();
auto const failType = NetworkOPs::doFailHard(failHard); auto const failType = NetworkOPs::doFailHard(failHard);
auto const sync = RPC::getSubmitSyncMode(context.params);
if (!sync)
return sync.error();
return RPC::transactionSubmitMultiSigned( return RPC::transactionSubmitMultiSigned(
context.params, context.params,
context.apiVersion, context.apiVersion,
@@ -50,8 +44,7 @@ doSubmitMultiSigned(RPC::JsonContext& context)
context.role, context.role,
context.ledgerMaster.getValidatedLedgerAge(), context.ledgerMaster.getValidatedLedgerAge(),
context.app, context.app,
RPC::getProcessTxnFn(context.netOps), RPC::getProcessTxnFn(context.netOps));
*sync);
} }
} // namespace ripple } // namespace ripple

View File

@@ -1125,26 +1125,5 @@ getLedgerByContext(RPC::JsonContext& context)
return RPC::make_error( return RPC::make_error(
rpcNOT_READY, "findCreate failed to return an inbound ledger"); rpcNOT_READY, "findCreate failed to return an inbound ledger");
} }
ripple::Expected<RPC::SubmitSync, Json::Value>
getSubmitSyncMode(Json::Value const& params)
{
using enum RPC::SubmitSync;
if (params.isMember(jss::sync_mode))
{
std::string const syncMode = params[jss::sync_mode].asString();
if (syncMode == "async")
return async;
else if (syncMode == "wait")
return wait;
else if (syncMode != "sync")
return Unexpected(RPC::make_error(
rpcINVALID_PARAMS,
"sync_mode parameter must be one of \"sync\", \"async\", or "
"\"wait\"."));
}
return sync;
}
} // namespace RPC } // namespace RPC
} // namespace ripple } // namespace ripple

View File

@@ -26,8 +26,6 @@
#include <ripple/app/misc/NetworkOPs.h> #include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/TxQ.h> #include <ripple/app/misc/TxQ.h>
#include <ripple/basics/Expected.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/protocol/SecretKey.h> #include <ripple/protocol/SecretKey.h>
#include <ripple/rpc/Context.h> #include <ripple/rpc/Context.h>
#include <ripple/rpc/Status.h> #include <ripple/rpc/Status.h>
@@ -294,14 +292,6 @@ keypairForSignature(
Json::Value const& params, Json::Value const& params,
Json::Value& error, Json::Value& error,
unsigned int apiVersion = apiVersionIfUnspecified); unsigned int apiVersion = apiVersionIfUnspecified);
/** Helper to parse submit_mode parameter to RPC submit.
*
* @param params RPC parameters
* @return Either the mode or an error object.
*/
ripple::Expected<RPC::SubmitSync, Json::Value>
getSubmitSyncMode(Json::Value const& params);
} // namespace RPC } // namespace RPC
} // namespace ripple } // namespace ripple

View File

@@ -834,8 +834,7 @@ transactionSubmit(
Role role, Role role,
std::chrono::seconds validatedLedgerAge, std::chrono::seconds validatedLedgerAge,
Application& app, Application& app,
ProcessTransactionFn const& processTransaction, ProcessTransactionFn const& processTransaction)
RPC::SubmitSync sync)
{ {
using namespace detail; using namespace detail;
@@ -861,7 +860,8 @@ transactionSubmit(
// Finally, submit the transaction. // Finally, submit the transaction.
try try
{ {
processTransaction(txn.second, isUnlimited(role), sync, failType); // FIXME: For performance, should use asynch interface
processTransaction(txn.second, isUnlimited(role), true, failType);
} }
catch (std::exception&) catch (std::exception&)
{ {
@@ -1072,8 +1072,7 @@ transactionSubmitMultiSigned(
Role role, Role role,
std::chrono::seconds validatedLedgerAge, std::chrono::seconds validatedLedgerAge,
Application& app, Application& app,
ProcessTransactionFn const& processTransaction, ProcessTransactionFn const& processTransaction)
RPC::SubmitSync sync)
{ {
auto const& ledger = app.openLedger().current(); auto const& ledger = app.openLedger().current();
auto j = app.journal("RPCHandler"); auto j = app.journal("RPCHandler");
@@ -1246,7 +1245,7 @@ transactionSubmitMultiSigned(
try try
{ {
// FIXME: For performance, should use asynch interface // FIXME: For performance, should use asynch interface
processTransaction(txn.second, isUnlimited(role), sync, failType); processTransaction(txn.second, isUnlimited(role), true, failType);
} }
catch (std::exception&) catch (std::exception&)
{ {

View File

@@ -21,7 +21,6 @@
#define RIPPLE_RPC_TRANSACTIONSIGN_H_INCLUDED #define RIPPLE_RPC_TRANSACTIONSIGN_H_INCLUDED
#include <ripple/app/misc/NetworkOPs.h> #include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/ledger/ApplyView.h> #include <ripple/ledger/ApplyView.h>
#include <ripple/rpc/Role.h> #include <ripple/rpc/Role.h>
@@ -76,7 +75,7 @@ checkFee(
using ProcessTransactionFn = std::function<void( using ProcessTransactionFn = std::function<void(
std::shared_ptr<Transaction>& transaction, std::shared_ptr<Transaction>& transaction,
bool bUnlimited, bool bUnlimited,
RPC::SubmitSync sync, bool bLocal,
NetworkOPs::FailHard failType)>; NetworkOPs::FailHard failType)>;
inline ProcessTransactionFn inline ProcessTransactionFn
@@ -85,10 +84,9 @@ getProcessTxnFn(NetworkOPs& netOPs)
return [&netOPs]( return [&netOPs](
std::shared_ptr<Transaction>& transaction, std::shared_ptr<Transaction>& transaction,
bool bUnlimited, bool bUnlimited,
RPC::SubmitSync sync, bool bLocal,
NetworkOPs::FailHard failType) { NetworkOPs::FailHard failType) {
netOPs.processTransaction( netOPs.processTransaction(transaction, bUnlimited, bLocal, failType);
transaction, bUnlimited, sync, true, failType);
}; };
} }
@@ -111,8 +109,7 @@ transactionSubmit(
Role role, Role role,
std::chrono::seconds validatedLedgerAge, std::chrono::seconds validatedLedgerAge,
Application& app, Application& app,
ProcessTransactionFn const& processTransaction, ProcessTransactionFn const& processTransaction);
RPC::SubmitSync sync);
/** Returns a Json::objectValue. */ /** Returns a Json::objectValue. */
Json::Value Json::Value
@@ -133,8 +130,7 @@ transactionSubmitMultiSigned(
Role role, Role role,
std::chrono::seconds validatedLedgerAge, std::chrono::seconds validatedLedgerAge,
Application& app, Application& app,
ProcessTransactionFn const& processTransaction, ProcessTransactionFn const& processTransaction);
RPC::SubmitSync sync);
} // namespace RPC } // namespace RPC
} // namespace ripple } // namespace ripple

View File

@@ -15,7 +15,6 @@
*/ */
//============================================================================== //==============================================================================
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/core/JobQueue.h> #include <ripple/core/JobQueue.h>
#include <ripple/protocol/ErrorCodes.h> #include <ripple/protocol/ErrorCodes.h>
#include <test/jtx.h> #include <test/jtx.h>
@@ -92,7 +91,6 @@ struct Transaction_ordering_test : public beast::unit_test::suite
env(tx2, ter(terPRE_SEQ)); env(tx2, ter(terPRE_SEQ));
BEAST_EXPECT(env.seq(alice) == aliceSequence); BEAST_EXPECT(env.seq(alice) == aliceSequence);
env(tx1); env(tx1);
BEAST_EXPECT(env.app().getOPs().transactionBatch(false));
env.app().getJobQueue().rendezvous(); env.app().getJobQueue().rendezvous();
BEAST_EXPECT(env.seq(alice) == aliceSequence + 2); BEAST_EXPECT(env.seq(alice) == aliceSequence + 2);
@@ -145,8 +143,6 @@ struct Transaction_ordering_test : public beast::unit_test::suite
} }
env(tx[0]); env(tx[0]);
// Apply until no more deferred/held transactions.
BEAST_EXPECT(env.app().getOPs().transactionBatch(true));
env.app().getJobQueue().rendezvous(); env.app().getJobQueue().rendezvous();
BEAST_EXPECT(env.seq(alice) == aliceSequence + 5); BEAST_EXPECT(env.seq(alice) == aliceSequence + 5);

View File

@@ -29,7 +29,6 @@
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <optional> #include <optional>
#include <thread>
#include <utility> #include <utility>
namespace ripple { namespace ripple {
@@ -901,95 +900,6 @@ public:
pass(); pass();
} }
void
testSyncSubmit()
{
using namespace jtx;
Env env(*this);
auto const alice = Account{"alice"};
auto const n = XRP(10000);
env.fund(n, alice);
BEAST_EXPECT(env.balance(alice) == n);
// submit only
auto applyBlobTxn = [&env](char const* syncMode, auto&&... txnArgs) {
auto jt = env.jt(txnArgs...);
Serializer s;
jt.stx->add(s);
Json::Value args{Json::objectValue};
args[jss::tx_blob] = strHex(s.slice());
args[jss::fail_hard] = true;
args[jss::sync_mode] = syncMode;
return env.rpc("json", "submit", args.toStyledString());
};
auto jr = applyBlobTxn("sync", noop(alice));
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS");
jr = applyBlobTxn("async", noop(alice));
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "terSUBMITTED");
// Make sure it gets processed before submitting and waiting.
env.app().getOPs().transactionBatch(true);
auto applier = [&env]() {
while (!env.app().getOPs().transactionBatch(false))
std::this_thread::sleep_for(std::chrono::milliseconds(1));
};
auto t = std::thread(applier);
jr = applyBlobTxn("wait", noop(alice));
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS");
t.join();
jr = applyBlobTxn("scott", noop(alice));
BEAST_EXPECT(jr[jss::result][jss::error] == "invalidParams");
// sign and submit
auto applyJsonTxn = [&env](
char const* syncMode,
std::string const secret,
Json::Value const& val) {
Json::Value args{Json::objectValue};
args[jss::secret] = secret;
args[jss::tx_json] = val;
args[jss::fail_hard] = true;
args[jss::sync_mode] = syncMode;
return env.rpc("json", "submit", args.toStyledString());
};
Json::Value payment;
auto secret = toBase58(generateSeed("alice"));
payment = noop("alice");
payment[sfSequence.fieldName] = env.seq("alice");
payment[sfSetFlag.fieldName] = 0;
jr = applyJsonTxn("sync", secret, payment);
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS");
payment[sfSequence.fieldName] = env.seq("alice");
jr = applyJsonTxn("async", secret, payment);
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "terSUBMITTED");
env.app().getOPs().transactionBatch(true);
payment[sfSequence.fieldName] = env.seq("alice");
auto aSeq = env.seq("alice");
t = std::thread(applier);
jr = applyJsonTxn("wait", secret, payment);
BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS");
t.join();
// Ensure the last transaction was processed.
BEAST_EXPECT(env.seq("alice") == aSeq + 1);
payment[sfSequence.fieldName] = env.seq("alice");
jr = applyJsonTxn("scott", secret, payment);
BEAST_EXPECT(jr[jss::result][jss::error] == "invalidParams");
}
void void
run() override run() override
{ {
@@ -1015,7 +925,6 @@ public:
testSignAndSubmit(); testSignAndSubmit();
testFeatures(); testFeatures();
testExceptionalShutdown(); testExceptionalShutdown();
testSyncSubmit();
} }
}; };

View File

@@ -19,7 +19,6 @@
#include <ripple/app/misc/LoadFeeTrack.h> #include <ripple/app/misc/LoadFeeTrack.h>
#include <ripple/app/misc/TxQ.h> #include <ripple/app/misc/TxQ.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/basics/contract.h> #include <ripple/basics/contract.h>
#include <ripple/beast/unit_test.h> #include <ripple/beast/unit_test.h>
#include <ripple/core/ConfigSections.h> #include <ripple/core/ConfigSections.h>
@@ -2499,7 +2498,7 @@ public:
fakeProcessTransaction( fakeProcessTransaction(
std::shared_ptr<Transaction>&, std::shared_ptr<Transaction>&,
bool, bool,
SubmitSync, bool,
NetworkOPs::FailHard) NetworkOPs::FailHard)
{ {
; ;
@@ -2549,8 +2548,7 @@ public:
Role role, Role role,
std::chrono::seconds validatedLedgerAge, std::chrono::seconds validatedLedgerAge,
Application& app, Application& app,
ProcessTransactionFn const& processTransaction, ProcessTransactionFn const& processTransaction);
RPC::SubmitSync sync);
using TestStuff = using TestStuff =
std::tuple<signFunc, submitFunc, char const*, unsigned int>; std::tuple<signFunc, submitFunc, char const*, unsigned int>;
@@ -2605,8 +2603,7 @@ public:
testRole, testRole,
1s, 1s,
env.app(), env.app(),
processTxn, processTxn);
RPC::SubmitSync::sync);
} }
std::string errStr; std::string errStr;

View File

@@ -17,7 +17,6 @@
*/ */
//============================================================================== //==============================================================================
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/beast/unit_test.h> #include <ripple/beast/unit_test.h>
#include <ripple/core/JobQueue.h> #include <ripple/core/JobQueue.h>
#include <ripple/protocol/jss.h> #include <ripple/protocol/jss.h>
@@ -89,8 +88,7 @@ public:
} }
BEAST_EXPECT(jv[jss::result][jss::engine_result] == "tefPAST_SEQ"); BEAST_EXPECT(jv[jss::result][jss::engine_result] == "tefPAST_SEQ");
// Submit future sequence transaction -- this transaction should be // Submit future sequence transaction
// held until the sequence gap is closed.
payment[jss::tx_json][sfSequence.fieldName] = env.seq("alice") + 1; payment[jss::tx_json][sfSequence.fieldName] = env.seq("alice") + 1;
jv = wsc->invoke("submit", payment); jv = wsc->invoke("submit", payment);
if (wsc->version() == 2) if (wsc->version() == 2)
@@ -116,8 +114,6 @@ public:
} }
BEAST_EXPECT(jv[jss::result][jss::engine_result] == "tesSUCCESS"); BEAST_EXPECT(jv[jss::result][jss::engine_result] == "tesSUCCESS");
// Apply held transactions.
env.app().getOPs().transactionBatch(true);
// Wait for the jobqueue to process everything // Wait for the jobqueue to process everything
env.app().getJobQueue().rendezvous(); env.app().getJobQueue().rendezvous();