feat(telemetry): add TxQ tracing with 6 spans (Tasks 3.9/3.10)

Instrument the transaction queue lifecycle with full span coverage:

- txq.enqueue: wraps TxQ::apply() enqueue/direct/reject decision
  with tx_hash attribute
- txq.apply_direct: wraps TxQ::tryDirectApply() fast-path
- txq.batch_clear: wraps TxQ::tryClearAccountQueueUpThruTx()
  batch clear on high-fee tx
- txq.accept: wraps TxQ::accept() ledger-close dequeue cycle
  with queue_size attribute
- txq.accept_tx: per-tx span inside accept loop with tx_hash,
  ter_code, retries_remaining attributes
- txq.cleanup: wraps TxQ::processClosedLedger() fee metric updates
  and tx expiration with ledger_seq attribute

New file: TxQSpanNames.h with compile-time constants.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-04-21 15:40:21 +01:00
parent c585d9b66c
commit 2fb165cd54
2 changed files with 149 additions and 0 deletions

View File

@@ -1,4 +1,5 @@
#include <xrpld/app/misc/TxQ.h>
#include <xrpld/telemetry/TxQSpanNames.h>
#include <xrpld/app/ledger/OpenLedger.h>
#include <xrpld/app/main/Application.h>
@@ -29,6 +30,8 @@
#include <xrpl/protocol/Units.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/protocol/st.h>
#include <xrpl/telemetry/SpanGuard.h>
#include <xrpl/tx/apply.h>
#include <xrpl/tx/applySteps.h>
@@ -528,6 +531,10 @@ TxQ::tryClearAccountQueueUpThruTx(
FeeMetrics::Snapshot const& metricsSnapshot,
beast::Journal j)
{
using namespace telemetry;
auto span = SpanGuard::span(
TraceCategory::Transactions, txq_span::prefix::txq, txq_span::op::batchClear);
SeqProxy const tSeqProx{tx.getSeqProxy()};
XRPL_ASSERT(
beginTxIter != accountIter->second.transactions.end(),
@@ -730,6 +737,11 @@ TxQ::apply(
ApplyFlags flags,
beast::Journal j)
{
using namespace telemetry;
auto span =
SpanGuard::span(TraceCategory::Transactions, txq_span::prefix::txq, txq_span::op::enqueue);
span.setAttribute(txq_span::attr::txHash, to_string(tx->getTransactionID()).c_str());
NumberSO const stNumberSO{view.rules().enabled(fixUniversalNumber)};
// See if the transaction is valid, properly formed,
@@ -1332,6 +1344,11 @@ TxQ::apply(
void
TxQ::processClosedLedger(Application& app, ReadView const& view, bool timeLeap)
{
using namespace telemetry;
auto span =
SpanGuard::span(TraceCategory::Transactions, txq_span::prefix::txq, txq_span::op::cleanup);
span.setAttribute(txq_span::attr::ledgerSeq, static_cast<int64_t>(view.header().seq));
std::lock_guard const lock(mutex_);
feeMetrics_.update(app, view, timeLeap, setup_);
@@ -1403,6 +1420,11 @@ TxQ::processClosedLedger(Application& app, ReadView const& view, bool timeLeap)
bool
TxQ::accept(Application& app, OpenView& view)
{
using namespace telemetry;
auto span =
SpanGuard::span(TraceCategory::Transactions, txq_span::prefix::txq, txq_span::op::accept);
span.setAttribute(txq_span::attr::queueSize, static_cast<int64_t>(byFee_.size()));
/* Move transactions from the queue from largest fee level to smallest.
As we add more transactions, the required fee level will increase.
Stop when the transaction fee level gets lower than the required fee
@@ -1440,7 +1462,15 @@ TxQ::accept(Application& app, OpenView& view)
JLOG(j_.trace()) << "Applying queued transaction " << candidateIter->txID
<< " to open ledger.";
auto txSpan = SpanGuard::span(
TraceCategory::Transactions, txq_span::prefix::txq, txq_span::op::acceptTx);
txSpan.setAttribute(txq_span::attr::txHash, to_string(candidateIter->txID).c_str());
txSpan.setAttribute(
txq_span::attr::retriesRemaining,
static_cast<int64_t>(candidateIter->retriesRemaining));
auto const [txnResult, didApply, _metadata] = candidateIter->apply(app, view, j_);
txSpan.setAttribute(txq_span::attr::terCode, transToken(txnResult).c_str());
if (didApply)
{
@@ -1650,6 +1680,10 @@ TxQ::tryDirectApply(
ApplyFlags flags,
beast::Journal j)
{
using namespace telemetry;
auto span = SpanGuard::span(
TraceCategory::Transactions, txq_span::prefix::txq, txq_span::op::applyDirect);
auto const account = (*tx)[sfAccount];
auto const sleAccount = view.read(keylet::account(account));

View File

@@ -0,0 +1,115 @@
#pragma once
/** Compile-time span name constants for Transaction Queue tracing.
*
* Covers the TxQ lifecycle: enqueue decisions, direct apply, batch
* clear, ledger-close accept loop, per-tx apply, and cleanup.
*
* Span hierarchy:
*
* Transaction submission:
*
* +-------------------------------------------------------+
* | tx.process (existing, from TxSpanNames.h) |
* | |
* | +--------------------------------------------------+ |
* | | txq.enqueue | |
* | | TxQ::apply() | |
* | | attrs: tx_hash, status, fee_level | |
* | | | |
* | | +-------------------+ +----------------------+ | |
* | | | txq.apply_direct | | txq.batch_clear | | |
* | | | tryDirectApply() | | tryClearAccount...() | | |
* | | +-------------------+ +----------------------+ | |
* | +--------------------------------------------------+ |
* +-------------------------------------------------------+
*
* Ledger close (consensus thread):
*
* +-------------------------------------------------------+
* | txq.accept |
* | TxQ::accept() |
* | attrs: queue_size, ledger_changed |
* | |
* | +--------------------------------------------------+ |
* | | txq.accept.tx (per queued transaction) | |
* | | attrs: tx_hash, ter_code, retries_remaining | |
* | +--------------------------------------------------+ |
* +-------------------------------------------------------+
*
* Post-close cleanup:
*
* +-------------------------------------------------------+
* | txq.cleanup |
* | TxQ::processClosedLedger() |
* | attrs: ledger_seq, expired_count |
* +-------------------------------------------------------+
*/
#include <xrpl/telemetry/SpanNames.h>
namespace xrpl {
namespace telemetry {
namespace txq_span {
// ===== Span prefixes =======================================================
namespace prefix {
/// "txq" — root prefix for transaction queue spans.
inline constexpr auto txq = makeStr("txq");
} // namespace prefix
// ===== Span operation suffixes =============================================
namespace op {
inline constexpr auto enqueue = makeStr("enqueue");
inline constexpr auto applyDirect = makeStr("apply_direct");
inline constexpr auto batchClear = makeStr("batch_clear");
inline constexpr auto accept = makeStr("accept");
inline constexpr auto acceptTx = makeStr("accept_tx");
inline constexpr auto cleanup = makeStr("cleanup");
} // namespace op
// ===== Attribute keys ======================================================
namespace attr {
inline constexpr auto xrplTxq = join(seg::xrpl, makeStr("txq"));
/// "xrpl.txq.tx_hash"
inline constexpr auto txHash = join(xrplTxq, makeStr("tx_hash"));
/// "xrpl.txq.status"
inline constexpr auto status = join(xrplTxq, makeStr("status"));
/// "xrpl.txq.fee_level_paid"
inline constexpr auto feeLevelPaid = join(xrplTxq, makeStr("fee_level_paid"));
/// "xrpl.txq.required_fee_level"
inline constexpr auto requiredFeeLevel = join(xrplTxq, makeStr("required_fee_level"));
/// "xrpl.txq.queue_size"
inline constexpr auto queueSize = join(xrplTxq, makeStr("queue_size"));
/// "xrpl.txq.ledger_changed"
inline constexpr auto ledgerChanged = join(xrplTxq, makeStr("ledger_changed"));
/// "xrpl.txq.ledger_seq"
inline constexpr auto ledgerSeq = join(xrplTxq, makeStr("ledger_seq"));
/// "xrpl.txq.expired_count"
inline constexpr auto expiredCount = join(xrplTxq, makeStr("expired_count"));
/// "xrpl.txq.ter_code"
inline constexpr auto terCode = join(xrplTxq, makeStr("ter_code"));
/// "xrpl.txq.retries_remaining"
inline constexpr auto retriesRemaining = join(xrplTxq, makeStr("retries_remaining"));
/// "xrpl.txq.num_cleared"
inline constexpr auto numCleared = join(xrplTxq, makeStr("num_cleared"));
} // namespace attr
// ===== Attribute values ====================================================
namespace val {
inline constexpr auto queued = makeStr("queued");
inline constexpr auto appliedDirect = makeStr("applied_direct");
inline constexpr auto rejected = makeStr("rejected");
inline constexpr auto applied = makeStr("applied");
inline constexpr auto failed = makeStr("failed");
inline constexpr auto retried = makeStr("retried");
} // namespace val
} // namespace txq_span
} // namespace telemetry
} // namespace xrpl