feat(telemetry): Phase 3 transaction tracing with protobuf context propagation

- TraceContext protobuf message for cross-node trace propagation
  (added to TMTransaction, TMProposeSet, TMValidation at field 1001)
- TraceContextPropagator.h: inline extractFromProtobuf/injectToProtobuf
- PeerImp::handleTransaction: tx.receive span with peer.id, peer.version,
  tx.hash, tx.suppressed, tx.status attributes
- NetworkOPsImp::processTransaction: tx.process span with tx.hash,
  tx.local, tx.path attributes
- Tempo search filters for tx.hash, tx.local, tx.status
- Unit tests for TraceContextPropagator (round-trip, edge cases)
- Levelization: xrpld.app/overlay > xrpld.telemetry dependencies

Translated from macro API (XRPL_TRACE_TX/SET_ATTR) to SpanGuard factory
pattern introduced in Phase 1c.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-04-20 16:39:56 +01:00
parent ed8164d502
commit 19eead6955
7 changed files with 304 additions and 0 deletions

View File

@@ -237,6 +237,7 @@ xrpld.app > xrpl.basics
xrpld.app > xrpl.core
xrpld.app > xrpld.consensus
xrpld.app > xrpld.core
xrpld.app > xrpld.telemetry
xrpld.app > xrpl.json
xrpld.app > xrpl.ledger
xrpld.app > xrpl.net
@@ -262,6 +263,7 @@ xrpld.overlay > xrpl.core
xrpld.overlay > xrpld.consensus
xrpld.overlay > xrpld.core
xrpld.overlay > xrpld.peerfinder
xrpld.overlay > xrpld.telemetry
xrpld.overlay > xrpl.json
xrpld.overlay > xrpl.ledger
xrpld.overlay > xrpl.protocol

View File

@@ -7,6 +7,7 @@
# Each phase adds filters for the span attributes it introduces.
# Phase 1b (infra): Base filters — node identity, service, span name, status.
# Phase 2 (RPC): RPC command, status, role filters.
# Phase 3 (TX): Transaction hash, local/peer origin, status.
apiVersion: 1
@@ -117,3 +118,19 @@ datasources:
operator: "="
scope: span
type: dynamic
# Phase 3: Transaction tracing filters
- id: tx-hash
tag: xrpl.tx.hash
operator: "="
scope: span
type: static
- id: tx-origin
tag: xrpl.tx.local
operator: "="
scope: span
type: dynamic
- id: tx-status
tag: xrpl.tx.status
operator: "="
scope: span
type: dynamic

View File

@@ -85,6 +85,15 @@ message TMPublicKey {
// If you want to send an amount that is greater than any single address of yours
// you must first combine coins from one address to another.
// Trace context for OpenTelemetry distributed tracing across nodes.
// Uses W3C Trace Context format internally.
message TraceContext {
optional bytes trace_id = 1; // 16-byte trace identifier
optional bytes span_id = 2; // 8-byte parent span identifier
optional uint32 trace_flags = 3; // bit 0 = sampled
optional string trace_state = 4; // W3C tracestate header value
}
enum TransactionStatus {
tsNEW = 1; // origin node did/could not validate
tsCURRENT = 2; // scheduled to go in this ledger
@@ -101,6 +110,9 @@ message TMTransaction {
required TransactionStatus status = 2;
optional uint64 receiveTimestamp = 3;
optional bool deferred = 4; // not applied to open ledger
// Optional trace context for OpenTelemetry distributed tracing
optional TraceContext trace_context = 1001;
}
message TMTransactions {
@@ -149,6 +161,9 @@ message TMProposeSet {
// Number of hops traveled
optional uint32 hops = 12 [deprecated = true];
// Optional trace context for OpenTelemetry distributed tracing
optional TraceContext trace_context = 1001;
}
enum TxSetStatus {
@@ -194,6 +209,9 @@ message TMValidation {
// Number of hops traveled
optional uint32 hops = 3 [deprecated = true];
// Optional trace context for OpenTelemetry distributed tracing
optional TraceContext trace_context = 1001;
}
// An array of Endpoint messages

View File

@@ -0,0 +1,94 @@
#pragma once
/** Utilities for trace context propagation across nodes.
Provides serialization/deserialization of OTel trace context to/from
Protocol Buffer TraceContext messages (P2P cross-node propagation).
Only compiled when XRPL_ENABLE_TELEMETRY is defined.
*/
#ifdef XRPL_ENABLE_TELEMETRY
#include <xrpl/proto/xrpl.pb.h>
#include <opentelemetry/context/context.h>
#include <opentelemetry/trace/context.h>
#include <opentelemetry/trace/default_span.h>
#include <opentelemetry/trace/span_context.h>
#include <opentelemetry/trace/trace_flags.h>
#include <opentelemetry/trace/trace_id.h>
#include <cstdint>
namespace xrpl {
namespace telemetry {
/** Extract OTel context from a protobuf TraceContext message.
@param proto The protobuf TraceContext received from a peer.
@return An OTel Context with the extracted parent span, or an empty
context if the protobuf fields are missing or invalid.
*/
inline opentelemetry::context::Context
extractFromProtobuf(protocol::TraceContext const& proto)
{
namespace trace = opentelemetry::trace;
if (!proto.has_trace_id() || proto.trace_id().size() != 16 || !proto.has_span_id() ||
proto.span_id().size() != 8)
{
return opentelemetry::context::Context{};
}
auto const* rawTraceId = reinterpret_cast<std::uint8_t const*>(proto.trace_id().data());
auto const* rawSpanId = reinterpret_cast<std::uint8_t const*>(proto.span_id().data());
trace::TraceId traceId(opentelemetry::nostd::span<std::uint8_t const, 16>(rawTraceId, 16));
trace::SpanId spanId(opentelemetry::nostd::span<std::uint8_t const, 8>(rawSpanId, 8));
// Default to not-sampled (0x00) per W3C Trace Context spec when
// the trace_flags field is absent.
trace::TraceFlags flags(
proto.has_trace_flags() ? static_cast<std::uint8_t>(proto.trace_flags())
: static_cast<std::uint8_t>(0));
trace::SpanContext spanCtx(traceId, spanId, flags, /* remote = */ true);
return opentelemetry::context::Context{}.SetValue(
trace::kSpanKey,
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));
}
/** Inject the current span's trace context into a protobuf TraceContext.
@param ctx The OTel context containing the span to propagate.
@param proto The protobuf TraceContext to populate.
*/
inline void
injectToProtobuf(opentelemetry::context::Context const& ctx, protocol::TraceContext& proto)
{
namespace trace = opentelemetry::trace;
auto span = trace::GetSpan(ctx);
if (!span)
return;
auto const& spanCtx = span->GetContext();
if (!spanCtx.IsValid())
return;
// Serialize trace_id (16 bytes)
auto const& traceId = spanCtx.trace_id();
proto.set_trace_id(traceId.Id().data(), trace::TraceId::kSize);
// Serialize span_id (8 bytes)
auto const& spanId = spanCtx.span_id();
proto.set_span_id(spanId.Id().data(), trace::SpanId::kSize);
// Serialize flags
proto.set_trace_flags(spanCtx.trace_flags().flags());
}
} // namespace telemetry
} // namespace xrpl
#endif // XRPL_ENABLE_TELEMETRY

View File

@@ -0,0 +1,155 @@
#include <gtest/gtest.h>
#ifdef XRPL_ENABLE_TELEMETRY
#include <xrpl/telemetry/TraceContextPropagator.h>
#include <opentelemetry/context/context.h>
#include <opentelemetry/nostd/span.h>
#include <opentelemetry/trace/context.h>
#include <opentelemetry/trace/default_span.h>
#include <opentelemetry/trace/span_context.h>
#include <opentelemetry/trace/trace_flags.h>
#include <opentelemetry/trace/trace_id.h>
#include <cstring>
namespace trace = opentelemetry::trace;
TEST(TraceContextPropagator, round_trip)
{
std::uint8_t traceIdBuf[16] = {
0x01,
0x02,
0x03,
0x04,
0x05,
0x06,
0x07,
0x08,
0x09,
0x0a,
0x0b,
0x0c,
0x0d,
0x0e,
0x0f,
0x10};
std::uint8_t spanIdBuf[8] = {0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x11, 0x22};
trace::TraceId traceId(opentelemetry::nostd::span<uint8_t const, 16>(traceIdBuf, 16));
trace::SpanId spanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBuf, 8));
trace::TraceFlags flags(trace::TraceFlags::kIsSampled);
trace::SpanContext spanCtx(traceId, spanId, flags, true);
auto ctx = opentelemetry::context::Context{}.SetValue(
trace::kSpanKey,
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));
protocol::TraceContext proto;
xrpl::telemetry::injectToProtobuf(ctx, proto);
EXPECT_TRUE(proto.has_trace_id());
EXPECT_EQ(proto.trace_id().size(), 16u);
EXPECT_TRUE(proto.has_span_id());
EXPECT_EQ(proto.span_id().size(), 8u);
EXPECT_EQ(proto.trace_flags(), static_cast<uint32_t>(trace::TraceFlags::kIsSampled));
EXPECT_EQ(std::memcmp(proto.trace_id().data(), traceIdBuf, 16), 0);
EXPECT_EQ(std::memcmp(proto.span_id().data(), spanIdBuf, 8), 0);
auto extractedCtx = xrpl::telemetry::extractFromProtobuf(proto);
auto extractedSpan = trace::GetSpan(extractedCtx);
ASSERT_NE(extractedSpan, nullptr);
auto const& extracted = extractedSpan->GetContext();
EXPECT_TRUE(extracted.IsValid());
EXPECT_TRUE(extracted.IsRemote());
EXPECT_EQ(extracted.trace_id(), traceId);
EXPECT_EQ(extracted.span_id(), spanId);
EXPECT_TRUE(extracted.trace_flags().IsSampled());
}
TEST(TraceContextPropagator, extract_empty_protobuf)
{
protocol::TraceContext proto;
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(ctx);
if (span)
{
EXPECT_FALSE(span->GetContext().IsValid());
}
}
TEST(TraceContextPropagator, extract_wrong_size_trace_id)
{
protocol::TraceContext proto;
proto.set_trace_id(std::string(8, '\x01'));
proto.set_span_id(std::string(8, '\xaa'));
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(ctx);
if (span)
{
EXPECT_FALSE(span->GetContext().IsValid());
}
}
TEST(TraceContextPropagator, extract_wrong_size_span_id)
{
protocol::TraceContext proto;
proto.set_trace_id(std::string(16, '\x01'));
proto.set_span_id(std::string(4, '\xaa'));
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(ctx);
if (span)
{
EXPECT_FALSE(span->GetContext().IsValid());
}
}
TEST(TraceContextPropagator, inject_invalid_span)
{
auto ctx = opentelemetry::context::Context{};
protocol::TraceContext proto;
xrpl::telemetry::injectToProtobuf(ctx, proto);
EXPECT_FALSE(proto.has_trace_id());
EXPECT_FALSE(proto.has_span_id());
}
TEST(TraceContextPropagator, flags_preservation)
{
std::uint8_t traceIdBuf[16] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
std::uint8_t spanIdBuf[8] = {1, 2, 3, 4, 5, 6, 7, 8};
// Test with flags NOT sampled (flags = 0)
trace::TraceFlags flags(0);
trace::SpanContext spanCtx(
trace::TraceId(opentelemetry::nostd::span<uint8_t const, 16>(traceIdBuf, 16)),
trace::SpanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBuf, 8)),
flags,
true);
auto ctx = opentelemetry::context::Context{}.SetValue(
trace::kSpanKey,
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));
protocol::TraceContext proto;
xrpl::telemetry::injectToProtobuf(ctx, proto);
EXPECT_EQ(proto.trace_flags(), 0u);
auto extracted = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(extracted);
ASSERT_NE(span, nullptr);
EXPECT_FALSE(span->GetContext().trace_flags().IsSampled());
}
#else // XRPL_ENABLE_TELEMETRY not defined
TEST(TraceContextPropagator, compiles_without_telemetry)
{
SUCCEED();
}
#endif // XRPL_ENABLE_TELEMETRY

View File

@@ -114,6 +114,7 @@
#include <xrpl/server/LoadFeeTrack.h>
#include <xrpl/server/Manifest.h>
#include <xrpl/shamap/SHAMap.h>
#include <xrpl/telemetry/SpanGuard.h>
#include <xrpl/tx/apply.h>
#include <boost/asio/error.hpp>
@@ -1311,6 +1312,11 @@ NetworkOPsImp::processTransaction(
bool bLocal,
FailHard failType)
{
using namespace telemetry;
auto span = SpanGuard::span(TraceCategory::Transactions, "tx", "process");
span.setAttribute("xrpl.tx.hash", to_string(transaction->getID()).c_str());
span.setAttribute("xrpl.tx.local", bLocal);
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
// preProcessTransaction can change our pointer
@@ -1319,10 +1325,12 @@ NetworkOPsImp::processTransaction(
if (bLocal)
{
span.setAttribute("xrpl.tx.path", "sync");
doTransactionSync(transaction, bUnlimited, failType);
}
else
{
span.setAttribute("xrpl.tx.path", "async");
doTransactionAsync(transaction, bUnlimited, failType);
}
}

View File

@@ -62,6 +62,7 @@
#include <xrpl/server/LoadFeeTrack.h>
#include <xrpl/server/NetworkOPs.h>
#include <xrpl/shamap/SHAMapNodeID.h>
#include <xrpl/telemetry/SpanGuard.h>
#include <xrpl/tx/apply.h>
#include <boost/algorithm/string/predicate.hpp>
@@ -1421,6 +1422,12 @@ PeerImp::handleTransaction(
bool eraseTxQueue,
bool batch)
{
using namespace telemetry;
auto span = SpanGuard::span(TraceCategory::Transactions, "tx", "receive");
span.setAttribute("xrpl.peer.id", static_cast<int64_t>(id_));
if (auto const version = getVersion(); !version.empty())
span.setAttribute("xrpl.peer.version", version.c_str());
XRPL_ASSERT(eraseTxQueue != batch, ("xrpl::PeerImp::handleTransaction : valid inputs"));
if (tracking_.load() == Tracking::diverged)
return;
@@ -1439,6 +1446,7 @@ PeerImp::handleTransaction(
{
auto stx = std::make_shared<STTx const>(sit);
uint256 const txID = stx->getTransactionID();
span.setAttribute("xrpl.tx.hash", to_string(txID).c_str());
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
// LCOV_EXCL_START
@@ -1472,9 +1480,11 @@ PeerImp::handleTransaction(
if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
{
span.setAttribute("xrpl.tx.suppressed", true);
// we have seen this transaction recently
if (any(flags & HashRouterFlags::BAD))
{
span.setAttribute("xrpl.tx.status", "known_bad");
fee_.update(Resource::feeUselessData, "known bad");
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
}