feat: add ledger range-based TX priority for faster tx detection

Simplify TX priority mechanism using RangeSet instead of per-TX hash
tracking. When submit_and_wait is called, it registers a ledger range
where TX nodes should be fetched before state nodes.

Key changes:
- Add prioritizeTxForLedgers(start, end) and isTxPrioritized(seq)
  to InboundLedgers using RangeSet<uint32_t>
- InboundLedger::trigger() checks range to decide TX-before-state order
- Remove complex per-TX hash tracking that couldn't help due to
  Merkle tree structure (need parent hashes to request children)
- Format CMake and source files
This commit is contained in:
Nicholas Dudfield
2025-12-01 15:31:15 +07:00
parent aeb2888fe9
commit 9ddf649e2a
8 changed files with 1032 additions and 964 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -126,6 +126,14 @@ public:
bool
hasTx(uint256 const& txHash) const;
/** Return the count of known transaction hashes (for debugging). */
std::size_t
knownTxCount() const
{
ScopedLockType sl(mtx_);
return knownTxHashes_.size();
}
void
touch()
{

View File

@@ -83,6 +83,19 @@ public:
virtual void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) = 0;
/** Add a ledger range where TX fetching should be prioritized.
Ledgers in this range will fetch TX nodes BEFORE state nodes.
Used by submit_and_wait to quickly detect transactions.
@param start First ledger sequence (inclusive)
@param end Last ledger sequence (inclusive)
*/
virtual void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) = 0;
/** Check if TX fetching should be prioritized for a ledger sequence. */
virtual bool
isTxPrioritized(std::uint32_t seq) const = 0;
// VFALCO TODO Remove the dependency on the Peer object.
//
virtual bool

View File

@@ -101,6 +101,8 @@ InboundLedger::InboundLedger(
, mPeerSet(std::move(peerSet))
{
JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
JLOG(app_.journal("TxTrack").warn())
<< "NEW LEDGER seq=" << seq << " hash=" << hash;
touch();
}
@@ -736,7 +738,12 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
// Get the state data first because it's the most likely to be useful
// if we wind up abandoning this fetch.
if (mHaveHeader && !mHaveState && !failed_)
// When TX is prioritized for this ledger range, skip state until TX
// complete.
bool const txPrioritized =
mSeq != 0 && app_.getInboundLedgers().isTxPrioritized(mSeq);
if (mHaveHeader && !mHaveState && !failed_ &&
!(txPrioritized && !mHaveTransactions))
{
assert(mLedger);
@@ -955,6 +962,9 @@ InboundLedger::takeHeader(std::string const& data)
mLedger->txMap().setLedgerSeq(mSeq);
mHaveHeader = true;
JLOG(app_.journal("TxTrack").warn())
<< "GOT HEADER seq=" << mSeq << " txHash=" << mLedger->info().txHash;
Serializer s(data.size() + 4);
s.add32(HashPrefix::ledgerMaster);
s.addRaw(data.data(), data.size());
@@ -1040,7 +1050,13 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
uint256 txHash;
std::memcpy(
txHash.data(), data.data() + data.size() - 33, 32);
knownTxHashes_.insert(txHash);
auto [it, inserted] = knownTxHashes_.insert(txHash);
if (inserted)
{
JLOG(app_.journal("TxTrack").warn())
<< "GOT TX ledger=" << mSeq << " tx=" << txHash
<< " count=" << knownTxHashes_.size();
}
}
}
}

View File

@@ -23,6 +23,7 @@
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/DecayingSample.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/beast/container/aged_map.h>
#include <ripple/beast/core/LexicalCast.h>
#include <ripple/core/JobQueue.h>
@@ -205,15 +206,27 @@ public:
std::optional<uint256>
findTxLedger(uint256 const& txHash) override
{
auto const swj = app_.journal("SubmitAndWait");
ScopedLockType sl(mLock);
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " searching "
<< mLedgers.size() << " inbound ledgers";
for (auto const& [hash, inbound] : mLedgers)
{
if (inbound->hasHeader() && !inbound->isFailed() &&
inbound->hasTx(txHash))
bool hasHdr = inbound->hasHeader();
bool failed = inbound->isFailed();
bool hasTx = hasHdr && !failed && inbound->hasTx(txHash);
JLOG(swj.trace())
<< "findTxLedger checking ledger seq=" << inbound->getSeq()
<< " hash=" << hash << " hasHeader=" << hasHdr
<< " failed=" << failed << " hasTx=" << hasTx;
if (hasTx)
{
JLOG(swj.warn()) << "findTxLedger FOUND tx=" << txHash
<< " in ledger seq=" << inbound->getSeq();
return hash;
}
}
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " NOT FOUND";
return std::nullopt;
}
@@ -248,6 +261,22 @@ public:
}
}
void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
{
std::lock_guard lock(txPriorityMutex_);
txPriorityRange_.insert(ClosedInterval<std::uint32_t>(start, end));
JLOG(j_.debug()) << "TX priority added for ledgers " << start << "-"
<< end;
}
bool
isTxPrioritized(std::uint32_t seq) const override
{
std::lock_guard lock(txPriorityMutex_);
return boost::icl::contains(txPriorityRange_, seq);
}
/*
This gets called when
"We got some data from an inbound ledger"
@@ -466,6 +495,11 @@ public:
}
else if ((la + std::chrono::minutes(1)) < start)
{
JLOG(app_.journal("SubmitAndWait").debug())
<< "sweep removing ledger seq=" << it->second->getSeq()
<< " complete=" << it->second->isComplete()
<< " failed=" << it->second->isFailed()
<< " knownTxCount=" << it->second->knownTxCount();
stuffToSweep.push_back(it->second);
// shouldn't cause the actual final delete
// since we are holding a reference in the vector.
@@ -480,8 +514,8 @@ public:
beast::expire(mRecentFailures, kReacquireInterval);
}
JLOG(j_.debug())
<< "Swept " << stuffToSweep.size() << " out of " << total
JLOG(app_.journal("SubmitAndWait").debug())
<< "sweep removed " << stuffToSweep.size() << " out of " << total
<< " inbound ledgers. Duration: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
m_clock.now() - start)
@@ -516,6 +550,10 @@ private:
std::set<uint256> pendingAcquires_;
std::mutex acquiresMutex_;
// Ledger ranges where TX fetching should be prioritized
mutable std::mutex txPriorityMutex_;
RangeSet<std::uint32_t> txPriorityRange_;
};
//------------------------------------------------------------------------------

View File

@@ -1222,7 +1222,7 @@ NetworkOPsImp::broadcastRawTransaction(Blob const& txBlob)
Serializer s;
stx->add(s);
msg.set_rawtransaction(s.data(), s.size());
msg.set_status(protocol::tsCURRENT);
msg.set_status(protocol::tsNEW); // tsNEW = origin node could not validate
msg.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());

View File

@@ -34,6 +34,11 @@
namespace ripple {
// Custom journal partition for submit_and_wait debugging
// Configure with [rpc_startup] { "command": "log_level", "partition":
// "SubmitAndWait", "severity": "debug" }
#define SWLOG(level) JLOG(context.app.journal("SubmitAndWait").level())
// {
// tx_blob: <hex-encoded signed transaction>
// timeout: <optional, max wait time in seconds, default 60>
@@ -118,15 +123,32 @@ doSubmitAndWait(RPC::JsonContext& context)
setCoroFetchTimeout(
std::chrono::duration_cast<std::chrono::milliseconds>(timeout / 2));
// Broadcast the transaction
SWLOG(warn) << "starting for tx=" << txHash
<< " lastLedgerSeq=" << (lastLedgerSeq ? *lastLedgerSeq : 0)
<< " timeout=" << timeout.count() << "s";
// Poll for the transaction result
constexpr auto pollInterval = std::chrono::milliseconds(10);
auto const startTime = std::chrono::steady_clock::now();
// Broadcast IMMEDIATELY - don't wait for anything
SWLOG(warn) << "broadcasting tx=" << txHash;
auto broadcastResult = context.netOps.broadcastRawTransaction(*txBlob);
if (!broadcastResult)
{
SWLOG(warn) << "broadcast FAILED for tx=" << txHash;
jvResult[jss::error] = "broadcastFailed";
jvResult[jss::error_exception] =
"Failed to parse/broadcast transaction";
return jvResult;
}
SWLOG(warn) << "broadcast SUCCESS for tx=" << txHash;
// Prioritize TX fetching for ledgers in our window
// This makes TX nodes fetch before state nodes for faster detection
auto const startSeq = context.ledgerMaster.getValidLedgerIndex();
auto const endSeq = lastLedgerSeq.value_or(startSeq + 20);
context.app.getInboundLedgers().prioritizeTxForLedgers(startSeq, endSeq);
jvResult[jss::tx_hash] = to_string(txHash);
jvResult[jss::broadcast] = true;
@@ -135,10 +157,6 @@ doSubmitAndWait(RPC::JsonContext& context)
std::optional<uint256> foundLedgerHash;
std::optional<std::uint32_t> foundLedgerSeq;
// Poll for the transaction result
constexpr auto pollInterval = std::chrono::milliseconds(500);
auto const startTime = std::chrono::steady_clock::now();
// Helper to check if a ledger is validated (has quorum)
auto isLedgerValidated = [&](uint256 const& ledgerHash) -> bool {
auto const quorum = context.app.validators().quorum();
@@ -152,8 +170,8 @@ doSubmitAndWait(RPC::JsonContext& context)
};
// Helper to read tx result from a ledger
auto readTxResult =
[&](std::shared_ptr<Ledger const> const& ledger) -> bool {
auto readTxResult = [&](std::shared_ptr<Ledger const> const& ledger,
std::string const& source) -> bool {
if (!ledger)
return false;
@@ -163,6 +181,7 @@ doSubmitAndWait(RPC::JsonContext& context)
jvResult[jss::status] = "success";
jvResult[jss::validated] = true;
jvResult["found_via"] = source;
jvResult[jss::tx_json] = sttx->getJson(JsonOptions::none);
jvResult[jss::metadata] = stobj->getJson(JsonOptions::none);
jvResult[jss::ledger_hash] = to_string(ledger->info().hash);
@@ -177,7 +196,7 @@ doSubmitAndWait(RPC::JsonContext& context)
std::string human;
transResultInfo(result, token, human);
jvResult[jss::engine_result] = token;
jvResult[jss::engine_result_code] = static_cast<int>(result);
jvResult[jss::engine_result_code] = TERtoInt(result);
jvResult[jss::engine_result_message] = human;
}
@@ -194,58 +213,36 @@ doSubmitAndWait(RPC::JsonContext& context)
"Transaction not validated within timeout period";
if (foundLedgerSeq)
{
jvResult[jss::found_in_ledger] = *foundLedgerSeq;
jvResult["found_in_ledger"] = *foundLedgerSeq;
auto const valCount =
context.app.getValidations().numTrustedForLedger(
*foundLedgerHash);
auto const quorum = context.app.validators().quorum();
jvResult[jss::validation_count] =
jvResult["validation_count"] =
static_cast<unsigned int>(valCount);
jvResult[jss::quorum] = static_cast<unsigned int>(quorum);
jvResult["quorum"] = static_cast<unsigned int>(quorum);
}
return jvResult;
}
// Check LastLedgerSequence expiry using validated ledger
auto const validatedSeq = context.ledgerMaster.getValidLedgerIndex();
if (lastLedgerSeq && validatedSeq > *lastLedgerSeq)
{
jvResult[jss::error] = "transactionExpired";
jvResult[jss::error_message] =
"LastLedgerSequence exceeded before transaction was validated";
jvResult[jss::last_ledger_sequence] = *lastLedgerSeq;
jvResult[jss::validated_ledger] = validatedSeq;
return jvResult;
}
// If we already found the tx, check if its ledger is now validated
if (foundLedgerHash)
{
if (isLedgerValidated(*foundLedgerHash))
{
// Ledger is validated! Try to read the tx from it
// First try the partial ledger we have
// Ledger is validated! Read the tx result
auto ledger = context.app.getInboundLedgers().getPartialLedger(
*foundLedgerHash);
if (ledger && readTxResult(ledger))
if (ledger && readTxResult(ledger, "InboundLedgers"))
{
return jvResult;
}
// Try getting from LedgerMaster (may have been stored)
ledger = context.ledgerMaster.getLedgerByHash(*foundLedgerHash);
if (ledger && readTxResult(ledger))
{
return jvResult;
}
// Ledger validated but we can't read it yet - keep waiting
// The nodes might still be arriving
// Ledger validated but can't read yet - keep waiting
}
}
else
{
// Look for the transaction in inbound ledgers
// Search InboundLedgers for the tx
auto const ledgerHash =
context.app.getInboundLedgers().findTxLedger(txHash);
@@ -257,21 +254,33 @@ doSubmitAndWait(RPC::JsonContext& context)
if (ledger)
{
// Found the tx - record which ledger
foundLedgerHash = ledgerHash;
foundLedgerSeq = ledger->info().seq;
SWLOG(warn)
<< "FOUND tx in ledger seq=" << ledger->info().seq;
// Check if already validated
if (isLedgerValidated(*ledgerHash))
{
if (readTxResult(ledger))
if (readTxResult(ledger, "InboundLedgers"))
{
return jvResult;
}
}
// Otherwise continue waiting for validation
}
}
// Check LastLedgerSequence expiry
auto const currentValidatedSeq =
context.ledgerMaster.getValidLedgerIndex();
if (lastLedgerSeq && currentValidatedSeq > *lastLedgerSeq)
{
jvResult[jss::error] = "transactionExpired";
jvResult[jss::error_message] =
"LastLedgerSequence exceeded and transaction not found";
jvResult["last_ledger_sequence"] = *lastLedgerSeq;
jvResult["validated_ledger"] = currentValidatedSeq;
return jvResult;
}
}
// Sleep and continue polling

View File

@@ -132,11 +132,28 @@ public:
return {};
}
virtual std::optional<uint256>
findTxLedger(uint256 const& txHash) override
{
return std::nullopt;
}
virtual void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
{
}
virtual void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
{
}
virtual bool
isTxPrioritized(std::uint32_t seq) const override
{
return false;
}
virtual bool
gotLedgerData(
LedgerHash const& ledgerHash,