Merge branch 'pratik/otel-phase6-statsd' into pratik/otel-phase7-native-metrics

# Conflicts:
#	OpenTelemetryPlan/09-data-collection-reference.md
This commit is contained in:
Pratik Mankawde
2026-06-05 12:48:31 +01:00
12 changed files with 574 additions and 76 deletions

View File

@@ -44,6 +44,7 @@
#include <xrpl/tx/SignerEntries.h>
#include <xrpl/tx/apply.h>
#include <xrpl/tx/applySteps.h>
#include <xrpl/tx/detail/TxApplySpanNames.h>
#include <cstddef>
#include <cstdint>
@@ -1199,9 +1200,11 @@ Transactor::operator()()
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Transactions,
telemetry::seg::tx,
telemetry::makeStr("transactor"));
telemetry::tx_apply_span::op::transactor);
// "apply" — the third apply-pipeline stage, after preflight and preclaim.
span.setAttribute(telemetry::tx_apply_span::attr::stage, telemetry::tx_apply_span::val::apply);
if (auto const* fmt = TxFormats::getInstance().findByType(ctx_.tx.getTxnType()))
span.setAttribute("tx_type", fmt->getName().c_str());
span.setAttribute(telemetry::tx_apply_span::attr::txType, fmt->getName().c_str());
JLOG(j_.trace()) << "apply: " << ctx_.tx.getTransactionID();
@@ -1429,8 +1432,8 @@ Transactor::operator()()
JLOG(j_.trace()) << (applied ? "applied " : "not applied ") << transToken(result);
span.setAttribute("ter_result", transToken(result).c_str());
span.setAttribute("applied", applied);
span.setAttribute(telemetry::tx_apply_span::attr::terResult, transToken(result).c_str());
span.setAttribute(telemetry::tx_apply_span::attr::applied, applied);
return {result, applied, metadata};
}

View File

@@ -13,13 +13,16 @@
#include <xrpl/protocol/SeqProxy.h>
#include <xrpl/protocol/TER.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/telemetry/SpanGuard.h>
#include <xrpl/tx/ApplyContext.h>
#include <xrpl/tx/Transactor.h>
#include <xrpl/tx/detail/TxApplySpanNames.h>
#include <cstdint>
#include <exception>
#include <memory>
#include <optional>
#include <string_view>
#include <utility>
#pragma push_macro("TRANSACTION")
#undef TRANSACTION
@@ -51,6 +54,47 @@ struct UnknownTxnType : std::exception
}
};
/** Look up the human-readable transaction type name for span attributes.
* Returns nullptr if the type is unknown so the caller can skip the
* attribute rather than emit an empty value.
*/
char const*
txTypeName(TxType txnType)
{
if (auto const* fmt = TxFormats::getInstance().findByType(txnType))
return fmt->getName().c_str();
return nullptr;
}
/** Create a deterministic-trace span for an apply-pipeline stage.
*
* The trace_id is derived from txID[0:16] so the preflight, preclaim, and
* transactor spans of one transaction share a trace even though they run
* sequentially and often on different threads. Sets the stage, tx_type, and
* (after the stage runs) ter_result attributes that drive the collector
* spanmetrics dimensions. A no-op when telemetry is disabled.
*
* @param name Full span name (tx_apply_span::preflight / ::preclaim).
* @param stage Stage attribute value (tx_apply_span::val::*).
* @param tx The transaction supplying the id and type.
*/
[[nodiscard]] telemetry::SpanGuard
makeStageSpan(std::string_view name, std::string_view stage, STTx const& tx)
{
auto const txID = tx.getTransactionID();
auto span = telemetry::SpanGuard::hashSpan(
telemetry::TraceCategory::Transactions, name, txID.data(), txID.kBytes);
// Guard the type lookup behind the active check: preflight runs for every
// transaction, so findByType() must not run when tracing is off/disabled.
if (span)
{
span.setAttribute(telemetry::tx_apply_span::attr::stage, stage);
if (char const* typeName = txTypeName(tx.getTxnType()))
span.setAttribute(telemetry::tx_apply_span::attr::txType, typeName);
}
return span;
}
// Call a lambda with the concrete transaction type as a template parameter
// throw an "UnknownTxnType" exception on error
template <class F>
@@ -133,82 +177,122 @@ consequencesHelper(PreflightContext const& ctx)
static std::pair<NotTEC, TxConsequences>
invokePreflight(PreflightContext const& ctx)
{
// Trace the preflight stage. The span shares the transaction's
// deterministic trace_id so it correlates with preclaim and transactor.
auto span = makeStageSpan(
telemetry::tx_apply_span::preflight, telemetry::tx_apply_span::val::preflight, ctx.tx);
try
{
return withTxnType(ctx.rules, ctx.tx.getTxnType(), [&]<typename T>() {
auto result = withTxnType(ctx.rules, ctx.tx.getTxnType(), [&]<typename T>() {
auto const tec = Transactor::invokePreflight<T>(ctx);
return std::make_pair(
tec, isTesSuccess(tec) ? consequencesHelper<T>(ctx) : TxConsequences{tec});
});
if (span)
{
span.setAttribute(
telemetry::tx_apply_span::attr::terResult, transToken(result.first).c_str());
}
return result;
}
catch (UnknownTxnType const& e)
{
// Should never happen
// LCOV_EXCL_START
JLOG(ctx.j.fatal()) << "Unknown transaction type in preflight: " << e.txnType;
span.recordException(e);
UNREACHABLE("xrpl::invokePreflight : unknown transaction type");
return {temUNKNOWN, TxConsequences{temUNKNOWN}};
// LCOV_EXCL_STOP
}
catch (std::exception const& e)
{
// The caller's preflight() maps this to tefEXCEPTION. Record it on the
// span before unwinding so per-stage error counts include exceptions.
span.setAttribute(
telemetry::tx_apply_span::attr::terResult, transToken(tefEXCEPTION).c_str());
span.recordException(e);
throw;
}
}
static TER
invokePreclaim(PreclaimContext const& ctx)
{
// Trace the preclaim stage under the transaction's deterministic trace_id.
auto span = makeStageSpan(
telemetry::tx_apply_span::preclaim, telemetry::tx_apply_span::val::preclaim, ctx.tx);
try
{
// use name hiding to accomplish compile-time polymorphism of static
// class functions for Transactor and derived classes.
return withTxnType(ctx.view.rules(), ctx.tx.getTxnType(), [&]<typename T>() -> TER {
// preclaim functionality is divided into two sections:
// 1. Up to and including the signature check: returns NotTEC.
// All transaction checks before and including checkSign
// MUST return NotTEC, or something more restrictive.
// Allowing tec results in these steps risks theft or
// destruction of funds, as a fee will be charged before the
// signature is checked.
// 2. After the signature check: returns TER.
TER const preclaimTer =
withTxnType(ctx.view.rules(), ctx.tx.getTxnType(), [&]<typename T>() -> TER {
// preclaim functionality is divided into two sections:
// 1. Up to and including the signature check: returns NotTEC.
// All transaction checks before and including checkSign
// MUST return NotTEC, or something more restrictive.
// Allowing tec results in these steps risks theft or
// destruction of funds, as a fee will be charged before the
// signature is checked.
// 2. After the signature check: returns TER.
// If the transactor requires a valid account and the
// transaction doesn't list one, preflight will have already
// a flagged a failure.
auto const id = ctx.tx.getAccountID(sfAccount);
// If the transactor requires a valid account and the
// transaction doesn't list one, preflight will have already
// a flagged a failure.
auto const id = ctx.tx.getAccountID(sfAccount);
if (id != beast::kZero)
{
if (NotTEC const preSigResult = [&]() -> NotTEC {
if (NotTEC const result = T::checkSeqProxy(ctx.view, ctx.tx, ctx.j))
return result;
if (id != beast::kZero)
{
if (NotTEC const preSigResult = [&]() -> NotTEC {
if (NotTEC const result = T::checkSeqProxy(ctx.view, ctx.tx, ctx.j))
return result;
if (NotTEC const result = T::checkPriorTxAndLastLedger(ctx))
return result;
if (NotTEC const result = T::checkPriorTxAndLastLedger(ctx))
return result;
if (NotTEC const result = T::checkPermission(ctx.view, ctx.tx))
return result;
if (NotTEC const result = T::checkPermission(ctx.view, ctx.tx))
return result;
if (NotTEC const result = T::checkSign(ctx))
return result;
if (NotTEC const result = T::checkSign(ctx))
return result;
return tesSUCCESS;
}())
return preSigResult;
return tesSUCCESS;
}())
return preSigResult;
if (TER const result = T::checkFee(ctx, calculateBaseFee(ctx.view, ctx.tx)))
return result;
}
if (TER const result = T::checkFee(ctx, calculateBaseFee(ctx.view, ctx.tx)))
return result;
}
return T::preclaim(ctx);
});
return T::preclaim(ctx);
});
if (span)
{
span.setAttribute(
telemetry::tx_apply_span::attr::terResult, transToken(preclaimTer).c_str());
}
return preclaimTer;
}
catch (UnknownTxnType const& e)
{
// Should never happen
// LCOV_EXCL_START
JLOG(ctx.j.fatal()) << "Unknown transaction type in preclaim: " << e.txnType;
span.recordException(e);
UNREACHABLE("xrpl::invokePreclaim : unknown transaction type");
return temUNKNOWN;
// LCOV_EXCL_STOP
}
catch (std::exception const& e)
{
// The caller's preclaim() maps this to tefEXCEPTION. Record it on the
// span before unwinding so per-stage error counts include exceptions.
span.setAttribute(
telemetry::tx_apply_span::attr::terResult, transToken(tefEXCEPTION).c_str());
span.recordException(e);
throw;
}
}
/**

View File

@@ -0,0 +1,52 @@
#include <xrpl/tx/detail/TxApplySpanNames.h>
#include <gtest/gtest.h>
#include <string_view>
/** Contract tests for the transaction apply-pipeline span constants.
*
* The span names and attribute keys in TxApplySpanNames.h are a cross-component
* contract: the collector spanmetrics connector aggregates on these exact
* strings (dimensions tx_type, ter_result, stage) and the Grafana
* transaction-overview dashboard queries them. A silent rename here would
* break per-stage metrics with no compile error, so these tests pin the
* literal values. They need no telemetry runtime and run in every build.
*/
using namespace xrpl::telemetry;
TEST(TxApplySpanNames, span_names_are_dot_qualified)
{
// Full span names feed SpanGuard::hashSpan() in applySteps.cpp.
EXPECT_EQ(std::string_view(tx_apply_span::preflight), "tx.preflight");
EXPECT_EQ(std::string_view(tx_apply_span::preclaim), "tx.preclaim");
}
TEST(TxApplySpanNames, operation_suffixes)
{
// Suffix used with SpanGuard::span(cat, seg::tx, suffix) in Transactor.cpp.
EXPECT_EQ(std::string_view(tx_apply_span::op::preflight), "preflight");
EXPECT_EQ(std::string_view(tx_apply_span::op::preclaim), "preclaim");
EXPECT_EQ(std::string_view(tx_apply_span::op::transactor), "transactor");
}
TEST(TxApplySpanNames, attribute_keys_match_collector_dimensions)
{
// These keys MUST match docker/telemetry/otel-collector-config.yaml
// spanmetrics dimensions and TxSpanNames.h (so both span sets aggregate
// under one dimension).
EXPECT_EQ(std::string_view(tx_apply_span::attr::stage), "stage");
EXPECT_EQ(std::string_view(tx_apply_span::attr::txType), "tx_type");
EXPECT_EQ(std::string_view(tx_apply_span::attr::terResult), "ter_result");
EXPECT_EQ(std::string_view(tx_apply_span::attr::applied), "applied");
}
TEST(TxApplySpanNames, stage_values_are_the_three_pipeline_stages)
{
// The stage attribute carries exactly these three values; they become the
// spanmetrics `stage` dimension cardinality (3) and the dashboard filter.
EXPECT_EQ(std::string_view(tx_apply_span::val::preflight), "preflight");
EXPECT_EQ(std::string_view(tx_apply_span::val::preclaim), "preclaim");
EXPECT_EQ(std::string_view(tx_apply_span::val::apply), "apply");
}

View File

@@ -607,7 +607,8 @@ TxQ::tryClearAccountQueueUpThruTx(
if (txResult.applied)
{
// All of the queued transactions applied, so remove them from the
// queue.
// queue. `dist` queued txs preceded the current one in the batch.
span.setAttribute(txq_span::attr::numCleared, static_cast<std::int64_t>(dist));
endTxIter = erase(accountIter->second, beginTxIter, endTxIter);
// If `tx` is replacing a queued tx, delete that one, too.
if (endTxIter != accountIter->second.transactions.end() && endTxIter->first == tSeqProx)
@@ -744,6 +745,9 @@ TxQ::apply(
span.setAttribute(txq_span::attr::txHash, to_string(tx->getTransactionID()).c_str());
if (auto const* fmt = TxFormats::getInstance().findByType(tx->getTxnType()))
span.setAttribute(txq_span::attr::txType, fmt->getName().c_str());
// Default outcome; overridden below on the direct-apply and queued paths.
// Every other early return leaves the tx rejected from the queue.
span.setAttribute(txq_span::attr::txqStatus, txq_span::val::rejected);
NumberSO const stNumberSO{view.rules().enabled(fixUniversalNumber)};
@@ -757,7 +761,10 @@ TxQ::apply(
// See if the transaction paid a high enough fee that it can go straight
// into the ledger.
if (auto directApplied = tryDirectApply(app, view, tx, flags, j))
{
span.setAttribute(txq_span::attr::txqStatus, txq_span::val::appliedDirect);
return *directApplied;
}
if ((flags & TapDryRun) != 0u)
return {telCAN_NOT_QUEUE, false};
@@ -884,6 +891,10 @@ TxQ::apply(
auto const metricsSnapshot = feeMetrics_.getSnapshot();
auto const feeLevelPaid = getFeeLevelPaid(view, *tx);
auto const requiredFeeLevel = getRequiredFeeLevel(view, flags, metricsSnapshot, lock);
span.setAttribute(
txq_span::attr::feeLevelPaid, static_cast<std::int64_t>(feeLevelPaid.value()));
span.setAttribute(
txq_span::attr::requiredFeeLevel, static_cast<std::int64_t>(requiredFeeLevel.value()));
// Is there a blocker already in the account's queue? If so, don't
// allow additional transactions in the queue.
@@ -1217,6 +1228,7 @@ TxQ::apply(
/* Can't erase (*replacedTxIter) here because success
implies that it has already been deleted.
*/
span.setAttribute(txq_span::attr::txqStatus, txq_span::val::applied);
return result;
}
}
@@ -1332,6 +1344,7 @@ TxQ::apply(
<< " to queue."
<< " Flags: " << flags;
span.setAttribute(txq_span::attr::txqStatus, txq_span::val::queued);
return {terQUEUED, false};
}
@@ -1366,18 +1379,21 @@ TxQ::processClosedLedger(Application& app, ReadView const& view, bool timeLeap)
maxSize_ = std::max(snapshot.txnsExpected * setup_.ledgersInQueue, setup_.queueSizeMin);
// Remove any queued candidates whose LastLedgerSequence has gone by.
std::int64_t expiredCount = 0;
for (auto candidateIter = byFee_.begin(); candidateIter != byFee_.end();)
{
if (candidateIter->lastValid && *candidateIter->lastValid <= ledgerSeq)
{
byAccount_.at(candidateIter->account).dropPenalty = true;
candidateIter = erase(candidateIter);
++expiredCount;
}
else
{
++candidateIter;
}
}
span.setAttribute(txq_span::attr::expiredCount, expiredCount);
// Remove any TxQAccounts that don't have candidates
// under them

View File

@@ -15,12 +15,14 @@
* | +--------------------------------------------------+ |
* | | txq.enqueue | |
* | | TxQ::apply() | |
* | | attrs: tx_hash, status, fee_level | |
* | | attrs: tx_hash, tx_type, txq_status, | |
* | | fee_level_paid, required_fee_level | |
* | | | |
* | | +-------------------+ +----------------------+ | |
* | | | txq.apply_direct | | txq.batch_clear | | |
* | | | tryDirectApply() | | tryClearAccount...() | | |
* | | +-------------------+ +----------------------+ | |
* | | +-------------------+ | attrs: num_cleared | | |
* | | +----------------------+ | |
* | +--------------------------------------------------+ |
* +-------------------------------------------------------+
*