From 7cffafb8cefbb4d56238b749b17b871e4d29d72b Mon Sep 17 00:00:00 2001 From: Denis Angell Date: Sat, 19 Jul 2025 10:10:25 +0200 Subject: [PATCH] relay transactions at earliest moment - update TxQ::apply to minimize preflight/preclaim calls - add read only open view for NetworkOPs::apply preflight & preclaim - NetworkOPs::apply call preflight and preclaim, then relay, then apply --- src/xrpld/app/ledger/OpenLedger.h | 3 + src/xrpld/app/ledger/detail/OpenLedger.cpp | 9 ++ src/xrpld/app/misc/NetworkOPs.cpp | 111 ++++++++++++++------- src/xrpld/app/misc/TxQ.h | 34 ++++++- src/xrpld/app/misc/detail/TxQ.cpp | 72 ++++++++++--- 5 files changed, 177 insertions(+), 52 deletions(-) diff --git a/src/xrpld/app/ledger/OpenLedger.h b/src/xrpld/app/ledger/OpenLedger.h index 9fe56ff488..d29e42d9a6 100644 --- a/src/xrpld/app/ledger/OpenLedger.h +++ b/src/xrpld/app/ledger/OpenLedger.h @@ -114,6 +114,9 @@ public: std::shared_ptr current() const; + std::shared_ptr + read() const; + /** Modify the open ledger Thread safety: diff --git a/src/xrpld/app/ledger/detail/OpenLedger.cpp b/src/xrpld/app/ledger/detail/OpenLedger.cpp index 2c98caaa6d..ecf2bfe4e4 100644 --- a/src/xrpld/app/ledger/detail/OpenLedger.cpp +++ b/src/xrpld/app/ledger/detail/OpenLedger.cpp @@ -54,6 +54,15 @@ OpenLedger::current() const return current_; } +std::shared_ptr +OpenLedger::read() const +{ + std::lock_guard lock(current_mutex_); + // Create a copy of the current view for read-only access + // This snapshot won't change even if current_ is updated + return std::make_shared(*current_); +} + bool OpenLedger::modify(modify_type const& f) { diff --git a/src/xrpld/app/misc/NetworkOPs.cpp b/src/xrpld/app/misc/NetworkOPs.cpp index a7ddbe912c..13392e35f2 100644 --- a/src/xrpld/app/misc/NetworkOPs.cpp +++ b/src/xrpld/app/misc/NetworkOPs.cpp @@ -1491,15 +1491,75 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) { std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock}; bool changed = false; + + // Structure to hold preclaim results + std::vector preapplyResults; + preapplyResults.reserve(transactions.size()); + { std::unique_lock ledgerLock{ m_ledgerMaster.peekMutex(), std::defer_lock}; std::lock(masterLock, ledgerLock); - app_.openLedger().modify([&](OpenView& view, beast::Journal j) { - for (TransactionStatus& e : transactions) + // Stage 1: Pre-apply and broadcast in single loop + auto newOL = app_.openLedger().read(); + + for (TransactionStatus& e : transactions) + { + // Use read-only view for preApply + auto const result = app_.getTxQ().preApply( + app_, + *newOL, + e.transaction->getSTransaction(), + e.failType == FailHard::yes ? tapFAIL_HARD : tapNONE, + m_journal); + preapplyResults.push_back(result); + + // Immediately broadcast if transaction is likely to claim a fee + bool shouldBroadcast = result.pcresult.likelyToClaimFee; + + // Check for hard failure + bool enforceFailHard = + (e.failType == FailHard::yes && + !isTesSuccess(result.pcresult.ter)); + + if (shouldBroadcast && !enforceFailHard) { - // we check before adding to the batch + auto const toSkip = app_.getHashRouter().shouldRelay( + e.transaction->getID()); + + if (auto const sttx = *(e.transaction->getSTransaction()); + toSkip && + // Skip relaying if it's an inner batch txn and batch + // feature is enabled + !(sttx.isFlag(tfInnerBatchTxn) && + newOL->rules().enabled(featureBatch))) + { + protocol::TMTransaction tx; + Serializer s; + + sttx.add(s); + tx.set_rawtransaction(s.data(), s.size()); + tx.set_status(protocol::tsCURRENT); + tx.set_receivetimestamp( + app_.timeKeeper().now().time_since_epoch().count()); + tx.set_deferred(result.pcresult.ter == terQUEUED); + + app_.overlay().relay( + e.transaction->getID(), tx, *toSkip); + e.transaction->setBroadcast(); + } + } + } + + // Stage 2: Actually apply the transactions using pre-computed + // results + app_.openLedger().modify([&](OpenView& view, beast::Journal j) { + for (size_t i = 0; i < transactions.size(); ++i) + { + auto& e = transactions[i]; + auto const& preResult = preapplyResults[i]; + ApplyFlags flags = tapNONE; if (e.admin) flags |= tapUNLIMITED; @@ -1507,8 +1567,16 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) if (e.failType == FailHard::yes) flags |= tapFAIL_HARD; - auto const result = app_.getTxQ().apply( - app_, view, e.transaction->getSTransaction(), flags, j); + // Use the pre-computed results from Stage 1 + auto const result = app_.getTxQ().replayApply( + app_, + view, + e.transaction->getSTransaction(), + flags, + preResult.pfresult, + preResult.pcresult, + j); + e.result = result.ter; e.applied = result.applied; changed = changed || result.applied; @@ -1516,6 +1584,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) return changed; }); } + if (changed) reportFeeChange(); @@ -1524,6 +1593,8 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) validatedLedgerIndex = l->info().seq; auto newOL = app_.openLedger().current(); + + // Process results (rest of the method remains the same) for (TransactionStatus& e : transactions) { e.transaction->clearSubmitResult(); @@ -1672,36 +1743,6 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) e.transaction->setKept(); } - if ((e.applied || - ((mMode != OperatingMode::FULL) && - (e.failType != FailHard::yes) && e.local) || - (e.result == terQUEUED)) && - !enforceFailHard) - { - auto const toSkip = - app_.getHashRouter().shouldRelay(e.transaction->getID()); - if (auto const sttx = *(e.transaction->getSTransaction()); - toSkip && - // Skip relaying if it's an inner batch txn and batch - // feature is enabled - !(sttx.isFlag(tfInnerBatchTxn) && - newOL->rules().enabled(featureBatch))) - { - protocol::TMTransaction tx; - Serializer s; - - sttx.add(s); - tx.set_rawtransaction(s.data(), s.size()); - tx.set_status(protocol::tsCURRENT); - tx.set_receivetimestamp( - app_.timeKeeper().now().time_since_epoch().count()); - tx.set_deferred(e.result == terQUEUED); - // FIXME: This should be when we received it - app_.overlay().relay(e.transaction->getID(), tx, *toSkip); - e.transaction->setBroadcast(); - } - } - if (validatedLedgerIndex) { auto [fee, accountSeq, availableSeq] = diff --git a/src/xrpld/app/misc/TxQ.h b/src/xrpld/app/misc/TxQ.h index f6ac2c6861..06c7e9b509 100644 --- a/src/xrpld/app/misc/TxQ.h +++ b/src/xrpld/app/misc/TxQ.h @@ -36,6 +36,12 @@ namespace ripple { +struct PreApplyResult +{ + PreflightResult pfresult; + PreclaimResult pcresult; +}; + class Application; class Config; @@ -261,6 +267,31 @@ public: /// Destructor virtual ~TxQ(); + /** + Prepares the transaction for application to the open ledger. + This is a preflight step that checks the transaction and + prepares it for application. + + @return A `PreApplyResult` with the result of the preflight. + */ + PreApplyResult + preApply( + Application& app, + OpenView const& view, + std::shared_ptr const& tx, + ApplyFlags flags, + beast::Journal j); + + ApplyResult + replayApply( + Application& app, + OpenView& view, + std::shared_ptr const& tx, + ApplyFlags flags, + PreflightResult const& pfresult, + PreclaimResult const& pcresult, + beast::Journal j); + /** Add a new transaction to the open ledger, hold it in the queue, or reject it. @@ -739,6 +770,7 @@ private: OpenView& view, std::shared_ptr const& tx, ApplyFlags flags, + PreclaimResult const& pcresult, beast::Journal j); // Helper function that removes a replaced entry in _byFee. @@ -845,7 +877,7 @@ private: AccountMap::iterator const& accountIter, TxQAccount::TxMap::iterator, FeeLevel64 feeLevelPaid, - PreflightResult const& pfresult, + PreclaimResult const& pcresult, std::size_t const txExtraCount, ApplyFlags flags, FeeMetrics::Snapshot const& metricsSnapshot, diff --git a/src/xrpld/app/misc/detail/TxQ.cpp b/src/xrpld/app/misc/detail/TxQ.cpp index 6924dae6c8..5f9526041b 100644 --- a/src/xrpld/app/misc/detail/TxQ.cpp +++ b/src/xrpld/app/misc/detail/TxQ.cpp @@ -522,7 +522,7 @@ TxQ::tryClearAccountQueueUpThruTx( TxQ::AccountMap::iterator const& accountIter, TxQAccount::TxMap::iterator beginTxIter, FeeLevel64 feeLevelPaid, - PreflightResult const& pfresult, + PreclaimResult const& pcresult, std::size_t const txExtraCount, ApplyFlags flags, FeeMetrics::Snapshot const& metricsSnapshot, @@ -597,7 +597,7 @@ TxQ::tryClearAccountQueueUpThruTx( } // Apply the current tx. Because the state of the view has been changed // by the queued txs, we also need to preclaim again. - auto const txResult = doApply(preclaim(pfresult, app, view), app, view); + auto const txResult = doApply(pcresult, app, view); if (txResult.applied) { @@ -726,12 +726,28 @@ TxQ::tryClearAccountQueueUpThruTx( // b. The entire queue also has a (dynamic) maximum size. Transactions // beyond that limit are rejected. // +PreApplyResult +TxQ::preApply( + Application& app, + OpenView const& view, + std::shared_ptr const& tx, + ApplyFlags flags, + beast::Journal j) +{ + PreflightResult const pfresult = + preflight(app, view.rules(), *tx, flags, j); + PreclaimResult const& pcresult = preclaim(pfresult, app, view); + return {pfresult, pcresult}; +} + ApplyResult -TxQ::apply( +TxQ::replayApply( Application& app, OpenView& view, std::shared_ptr const& tx, ApplyFlags flags, + PreflightResult const& pfresult, + PreclaimResult const& pcresult, beast::Journal j) { STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)}; @@ -740,13 +756,15 @@ TxQ::apply( // See if the transaction is valid, properly formed, // etc. before doing potentially expensive queue // replace and multi-transaction operations. - auto const pfresult = preflight(app, view.rules(), *tx, flags, j); if (pfresult.ter != tesSUCCESS) return {pfresult.ter, false}; + if (!pcresult.likelyToClaimFee) + return {pcresult.ter, false}; + // See if the transaction paid a high enough fee that it can go straight // into the ledger. - if (auto directApplied = tryDirectApply(app, view, tx, flags, j)) + if (auto directApplied = tryDirectApply(app, view, tx, flags, pcresult, j)) return *directApplied; // If we get past tryDirectApply() without returning then we expect @@ -1164,10 +1182,10 @@ TxQ::apply( // Note that earlier code has already verified that the sequence/ticket // is valid. So we use a special entry point that runs all of the // preclaim checks with the exception of the sequence check. - auto const pcresult = + auto const pcresultRetry = preclaim(pfresult, app, multiTxn ? multiTxn->openView : view); - if (!pcresult.likelyToClaimFee) - return {pcresult.ter, false}; + if (!pcresultRetry.likelyToClaimFee) + return {pcresultRetry.ter, false}; // Too low of a fee should get caught by preclaim XRPL_ASSERT(feeLevelPaid >= baseLevel, "ripple::TxQ::apply : minimum fee"); @@ -1208,7 +1226,7 @@ TxQ::apply( accountIter, txIter->first, feeLevelPaid, - pfresult, + pcresultRetry, view.txCount(), flags, metricsSnapshot, @@ -1350,6 +1368,28 @@ TxQ::apply( return {terQUEUED, false}; } +ApplyResult +TxQ::apply( + Application& app, + OpenView& view, + std::shared_ptr const& tx, + ApplyFlags flags, + beast::Journal j) +{ + // See if the transaction is valid, properly formed, + // etc. before doing potentially expensive queue + // replace and multi-transaction operations. + auto const pfresult = preflight(app, view.rules(), *tx, flags, j); + if (pfresult.ter != tesSUCCESS) + return {pfresult.ter, false}; + + PreclaimResult const& pcresult = preclaim(pfresult, app, view); + if (!pcresult.likelyToClaimFee) + return {pcresult.ter, false}; + + return replayApply(app, view, tx, flags, pfresult, pcresult, j); +} + /* 1. Update the fee metrics based on the fee levels of the txs in the validated ledger and whether consensus is @@ -1681,6 +1721,7 @@ TxQ::tryDirectApply( OpenView& view, std::shared_ptr const& tx, ApplyFlags flags, + PreclaimResult const& pcresult, beast::Journal j) { auto const account = (*tx)[sfAccount]; @@ -1715,15 +1756,14 @@ TxQ::tryDirectApply( JLOG(j_.trace()) << "Applying transaction " << transactionID << " to open ledger."; - auto const [txnResult, didApply, metadata] = - ripple::apply(app, view, *tx, flags, j); + auto const applyResult = doApply(pcresult, app, view); JLOG(j_.trace()) << "New transaction " << transactionID - << (didApply ? " applied successfully with " - : " failed with ") - << transToken(txnResult); + << (applyResult.applied ? " applied successfully with " + : " failed with ") + << transToken(applyResult.ter); - if (didApply) + if (applyResult.applied) { // If the applied transaction replaced a transaction in the // queue then remove the replaced transaction. @@ -1741,7 +1781,7 @@ TxQ::tryDirectApply( } } } - return ApplyResult{txnResult, didApply, metadata}; + return applyResult; } return {}; }