relay transactions at earliest moment

- update TxQ::apply to minimize preflight/preclaim calls
- add read only open view for NetworkOPs::apply preflight & preclaim
- NetworkOPs::apply call preflight and preclaim, then relay, then apply
This commit is contained in:
Denis Angell
2025-07-19 10:10:25 +02:00
parent 8bfaa7fe0a
commit 7cffafb8ce
5 changed files with 177 additions and 52 deletions

View File

@@ -114,6 +114,9 @@ public:
std::shared_ptr<OpenView const>
current() const;
std::shared_ptr<OpenView const>
read() const;
/** Modify the open ledger
Thread safety:

View File

@@ -54,6 +54,15 @@ OpenLedger::current() const
return current_;
}
std::shared_ptr<OpenView const>
OpenLedger::read() const
{
std::lock_guard lock(current_mutex_);
// Create a copy of the current view for read-only access
// This snapshot won't change even if current_ is updated
return std::make_shared<OpenView const>(*current_);
}
bool
OpenLedger::modify(modify_type const& f)
{

View File

@@ -1491,15 +1491,75 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{
std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock};
bool changed = false;
// Structure to hold preclaim results
std::vector<PreApplyResult> preapplyResults;
preapplyResults.reserve(transactions.size());
{
std::unique_lock ledgerLock{
m_ledgerMaster.peekMutex(), std::defer_lock};
std::lock(masterLock, ledgerLock);
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
for (TransactionStatus& e : transactions)
// Stage 1: Pre-apply and broadcast in single loop
auto newOL = app_.openLedger().read();
for (TransactionStatus& e : transactions)
{
// Use read-only view for preApply
auto const result = app_.getTxQ().preApply(
app_,
*newOL,
e.transaction->getSTransaction(),
e.failType == FailHard::yes ? tapFAIL_HARD : tapNONE,
m_journal);
preapplyResults.push_back(result);
// Immediately broadcast if transaction is likely to claim a fee
bool shouldBroadcast = result.pcresult.likelyToClaimFee;
// Check for hard failure
bool enforceFailHard =
(e.failType == FailHard::yes &&
!isTesSuccess(result.pcresult.ter));
if (shouldBroadcast && !enforceFailHard)
{
// we check before adding to the batch
auto const toSkip = app_.getHashRouter().shouldRelay(
e.transaction->getID());
if (auto const sttx = *(e.transaction->getSTransaction());
toSkip &&
// Skip relaying if it's an inner batch txn and batch
// feature is enabled
!(sttx.isFlag(tfInnerBatchTxn) &&
newOL->rules().enabled(featureBatch)))
{
protocol::TMTransaction tx;
Serializer s;
sttx.add(s);
tx.set_rawtransaction(s.data(), s.size());
tx.set_status(protocol::tsCURRENT);
tx.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
tx.set_deferred(result.pcresult.ter == terQUEUED);
app_.overlay().relay(
e.transaction->getID(), tx, *toSkip);
e.transaction->setBroadcast();
}
}
}
// Stage 2: Actually apply the transactions using pre-computed
// results
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
for (size_t i = 0; i < transactions.size(); ++i)
{
auto& e = transactions[i];
auto const& preResult = preapplyResults[i];
ApplyFlags flags = tapNONE;
if (e.admin)
flags |= tapUNLIMITED;
@@ -1507,8 +1567,16 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (e.failType == FailHard::yes)
flags |= tapFAIL_HARD;
auto const result = app_.getTxQ().apply(
app_, view, e.transaction->getSTransaction(), flags, j);
// Use the pre-computed results from Stage 1
auto const result = app_.getTxQ().replayApply(
app_,
view,
e.transaction->getSTransaction(),
flags,
preResult.pfresult,
preResult.pcresult,
j);
e.result = result.ter;
e.applied = result.applied;
changed = changed || result.applied;
@@ -1516,6 +1584,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
return changed;
});
}
if (changed)
reportFeeChange();
@@ -1524,6 +1593,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
validatedLedgerIndex = l->info().seq;
auto newOL = app_.openLedger().current();
// Process results (rest of the method remains the same)
for (TransactionStatus& e : transactions)
{
e.transaction->clearSubmitResult();
@@ -1672,36 +1743,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
e.transaction->setKept();
}
if ((e.applied ||
((mMode != OperatingMode::FULL) &&
(e.failType != FailHard::yes) && e.local) ||
(e.result == terQUEUED)) &&
!enforceFailHard)
{
auto const toSkip =
app_.getHashRouter().shouldRelay(e.transaction->getID());
if (auto const sttx = *(e.transaction->getSTransaction());
toSkip &&
// Skip relaying if it's an inner batch txn and batch
// feature is enabled
!(sttx.isFlag(tfInnerBatchTxn) &&
newOL->rules().enabled(featureBatch)))
{
protocol::TMTransaction tx;
Serializer s;
sttx.add(s);
tx.set_rawtransaction(s.data(), s.size());
tx.set_status(protocol::tsCURRENT);
tx.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
tx.set_deferred(e.result == terQUEUED);
// FIXME: This should be when we received it
app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
e.transaction->setBroadcast();
}
}
if (validatedLedgerIndex)
{
auto [fee, accountSeq, availableSeq] =

View File

@@ -36,6 +36,12 @@
namespace ripple {
struct PreApplyResult
{
PreflightResult pfresult;
PreclaimResult pcresult;
};
class Application;
class Config;
@@ -261,6 +267,31 @@ public:
/// Destructor
virtual ~TxQ();
/**
Prepares the transaction for application to the open ledger.
This is a preflight step that checks the transaction and
prepares it for application.
@return A `PreApplyResult` with the result of the preflight.
*/
PreApplyResult
preApply(
Application& app,
OpenView const& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
beast::Journal j);
ApplyResult
replayApply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreflightResult const& pfresult,
PreclaimResult const& pcresult,
beast::Journal j);
/**
Add a new transaction to the open ledger, hold it in the queue,
or reject it.
@@ -739,6 +770,7 @@ private:
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreclaimResult const& pcresult,
beast::Journal j);
// Helper function that removes a replaced entry in _byFee.
@@ -845,7 +877,7 @@ private:
AccountMap::iterator const& accountIter,
TxQAccount::TxMap::iterator,
FeeLevel64 feeLevelPaid,
PreflightResult const& pfresult,
PreclaimResult const& pcresult,
std::size_t const txExtraCount,
ApplyFlags flags,
FeeMetrics::Snapshot const& metricsSnapshot,

View File

@@ -522,7 +522,7 @@ TxQ::tryClearAccountQueueUpThruTx(
TxQ::AccountMap::iterator const& accountIter,
TxQAccount::TxMap::iterator beginTxIter,
FeeLevel64 feeLevelPaid,
PreflightResult const& pfresult,
PreclaimResult const& pcresult,
std::size_t const txExtraCount,
ApplyFlags flags,
FeeMetrics::Snapshot const& metricsSnapshot,
@@ -597,7 +597,7 @@ TxQ::tryClearAccountQueueUpThruTx(
}
// Apply the current tx. Because the state of the view has been changed
// by the queued txs, we also need to preclaim again.
auto const txResult = doApply(preclaim(pfresult, app, view), app, view);
auto const txResult = doApply(pcresult, app, view);
if (txResult.applied)
{
@@ -726,12 +726,28 @@ TxQ::tryClearAccountQueueUpThruTx(
// b. The entire queue also has a (dynamic) maximum size. Transactions
// beyond that limit are rejected.
//
PreApplyResult
TxQ::preApply(
Application& app,
OpenView const& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
beast::Journal j)
{
PreflightResult const pfresult =
preflight(app, view.rules(), *tx, flags, j);
PreclaimResult const& pcresult = preclaim(pfresult, app, view);
return {pfresult, pcresult};
}
ApplyResult
TxQ::apply(
TxQ::replayApply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreflightResult const& pfresult,
PreclaimResult const& pcresult,
beast::Journal j)
{
STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)};
@@ -740,13 +756,15 @@ TxQ::apply(
// See if the transaction is valid, properly formed,
// etc. before doing potentially expensive queue
// replace and multi-transaction operations.
auto const pfresult = preflight(app, view.rules(), *tx, flags, j);
if (pfresult.ter != tesSUCCESS)
return {pfresult.ter, false};
if (!pcresult.likelyToClaimFee)
return {pcresult.ter, false};
// See if the transaction paid a high enough fee that it can go straight
// into the ledger.
if (auto directApplied = tryDirectApply(app, view, tx, flags, j))
if (auto directApplied = tryDirectApply(app, view, tx, flags, pcresult, j))
return *directApplied;
// If we get past tryDirectApply() without returning then we expect
@@ -1164,10 +1182,10 @@ TxQ::apply(
// Note that earlier code has already verified that the sequence/ticket
// is valid. So we use a special entry point that runs all of the
// preclaim checks with the exception of the sequence check.
auto const pcresult =
auto const pcresultRetry =
preclaim(pfresult, app, multiTxn ? multiTxn->openView : view);
if (!pcresult.likelyToClaimFee)
return {pcresult.ter, false};
if (!pcresultRetry.likelyToClaimFee)
return {pcresultRetry.ter, false};
// Too low of a fee should get caught by preclaim
XRPL_ASSERT(feeLevelPaid >= baseLevel, "ripple::TxQ::apply : minimum fee");
@@ -1208,7 +1226,7 @@ TxQ::apply(
accountIter,
txIter->first,
feeLevelPaid,
pfresult,
pcresultRetry,
view.txCount(),
flags,
metricsSnapshot,
@@ -1350,6 +1368,28 @@ TxQ::apply(
return {terQUEUED, false};
}
ApplyResult
TxQ::apply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
beast::Journal j)
{
// See if the transaction is valid, properly formed,
// etc. before doing potentially expensive queue
// replace and multi-transaction operations.
auto const pfresult = preflight(app, view.rules(), *tx, flags, j);
if (pfresult.ter != tesSUCCESS)
return {pfresult.ter, false};
PreclaimResult const& pcresult = preclaim(pfresult, app, view);
if (!pcresult.likelyToClaimFee)
return {pcresult.ter, false};
return replayApply(app, view, tx, flags, pfresult, pcresult, j);
}
/*
1. Update the fee metrics based on the fee levels of the
txs in the validated ledger and whether consensus is
@@ -1681,6 +1721,7 @@ TxQ::tryDirectApply(
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreclaimResult const& pcresult,
beast::Journal j)
{
auto const account = (*tx)[sfAccount];
@@ -1715,15 +1756,14 @@ TxQ::tryDirectApply(
JLOG(j_.trace()) << "Applying transaction " << transactionID
<< " to open ledger.";
auto const [txnResult, didApply, metadata] =
ripple::apply(app, view, *tx, flags, j);
auto const applyResult = doApply(pcresult, app, view);
JLOG(j_.trace()) << "New transaction " << transactionID
<< (didApply ? " applied successfully with "
: " failed with ")
<< transToken(txnResult);
<< (applyResult.applied ? " applied successfully with "
: " failed with ")
<< transToken(applyResult.ter);
if (didApply)
if (applyResult.applied)
{
// If the applied transaction replaced a transaction in the
// queue then remove the replaced transaction.
@@ -1741,7 +1781,7 @@ TxQ::tryDirectApply(
}
}
}
return ApplyResult{txnResult, didApply, metadata};
return applyResult;
}
return {};
}