feat(telemetry): Phase 3 transaction tracing with protobuf context propagation

- TraceContext protobuf message for cross-node trace propagation
  (added to TMTransaction, TMProposeSet, TMValidation at field 1001)
- TraceContextPropagator.h: inline extractFromProtobuf/injectToProtobuf
- PeerImp::handleTransaction: tx.receive span with peer.id, peer.version,
  tx.hash, tx.suppressed, tx.status attributes
- NetworkOPsImp::processTransaction: tx.process span with tx.hash,
  tx.local, tx.path attributes
- Tempo search filters for tx.hash, tx.local, tx.status
- Unit tests for TraceContextPropagator (round-trip, edge cases)
- Levelization: xrpld.app/overlay > xrpld.telemetry dependencies

Translated from macro API (XRPL_TRACE_TX/SET_ATTR) to SpanGuard factory
pattern introduced in Phase 1c.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-04-20 16:39:56 +01:00
parent 3ed22580fe
commit 3508917f17
7 changed files with 304 additions and 0 deletions

View File

@@ -0,0 +1,155 @@
#include <gtest/gtest.h>
#ifdef XRPL_ENABLE_TELEMETRY
#include <xrpl/telemetry/TraceContextPropagator.h>
#include <opentelemetry/context/context.h>
#include <opentelemetry/nostd/span.h>
#include <opentelemetry/trace/context.h>
#include <opentelemetry/trace/default_span.h>
#include <opentelemetry/trace/span_context.h>
#include <opentelemetry/trace/trace_flags.h>
#include <opentelemetry/trace/trace_id.h>
#include <cstring>
namespace trace = opentelemetry::trace;
TEST(TraceContextPropagator, round_trip)
{
std::uint8_t traceIdBuf[16] = {
0x01,
0x02,
0x03,
0x04,
0x05,
0x06,
0x07,
0x08,
0x09,
0x0a,
0x0b,
0x0c,
0x0d,
0x0e,
0x0f,
0x10};
std::uint8_t spanIdBuf[8] = {0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x11, 0x22};
trace::TraceId traceId(opentelemetry::nostd::span<uint8_t const, 16>(traceIdBuf, 16));
trace::SpanId spanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBuf, 8));
trace::TraceFlags flags(trace::TraceFlags::kIsSampled);
trace::SpanContext spanCtx(traceId, spanId, flags, true);
auto ctx = opentelemetry::context::Context{}.SetValue(
trace::kSpanKey,
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));
protocol::TraceContext proto;
xrpl::telemetry::injectToProtobuf(ctx, proto);
EXPECT_TRUE(proto.has_trace_id());
EXPECT_EQ(proto.trace_id().size(), 16u);
EXPECT_TRUE(proto.has_span_id());
EXPECT_EQ(proto.span_id().size(), 8u);
EXPECT_EQ(proto.trace_flags(), static_cast<uint32_t>(trace::TraceFlags::kIsSampled));
EXPECT_EQ(std::memcmp(proto.trace_id().data(), traceIdBuf, 16), 0);
EXPECT_EQ(std::memcmp(proto.span_id().data(), spanIdBuf, 8), 0);
auto extractedCtx = xrpl::telemetry::extractFromProtobuf(proto);
auto extractedSpan = trace::GetSpan(extractedCtx);
ASSERT_NE(extractedSpan, nullptr);
auto const& extracted = extractedSpan->GetContext();
EXPECT_TRUE(extracted.IsValid());
EXPECT_TRUE(extracted.IsRemote());
EXPECT_EQ(extracted.trace_id(), traceId);
EXPECT_EQ(extracted.span_id(), spanId);
EXPECT_TRUE(extracted.trace_flags().IsSampled());
}
TEST(TraceContextPropagator, extract_empty_protobuf)
{
protocol::TraceContext proto;
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(ctx);
if (span)
{
EXPECT_FALSE(span->GetContext().IsValid());
}
}
TEST(TraceContextPropagator, extract_wrong_size_trace_id)
{
protocol::TraceContext proto;
proto.set_trace_id(std::string(8, '\x01'));
proto.set_span_id(std::string(8, '\xaa'));
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(ctx);
if (span)
{
EXPECT_FALSE(span->GetContext().IsValid());
}
}
TEST(TraceContextPropagator, extract_wrong_size_span_id)
{
protocol::TraceContext proto;
proto.set_trace_id(std::string(16, '\x01'));
proto.set_span_id(std::string(4, '\xaa'));
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(ctx);
if (span)
{
EXPECT_FALSE(span->GetContext().IsValid());
}
}
TEST(TraceContextPropagator, inject_invalid_span)
{
auto ctx = opentelemetry::context::Context{};
protocol::TraceContext proto;
xrpl::telemetry::injectToProtobuf(ctx, proto);
EXPECT_FALSE(proto.has_trace_id());
EXPECT_FALSE(proto.has_span_id());
}
TEST(TraceContextPropagator, flags_preservation)
{
std::uint8_t traceIdBuf[16] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
std::uint8_t spanIdBuf[8] = {1, 2, 3, 4, 5, 6, 7, 8};
// Test with flags NOT sampled (flags = 0)
trace::TraceFlags flags(0);
trace::SpanContext spanCtx(
trace::TraceId(opentelemetry::nostd::span<uint8_t const, 16>(traceIdBuf, 16)),
trace::SpanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBuf, 8)),
flags,
true);
auto ctx = opentelemetry::context::Context{}.SetValue(
trace::kSpanKey,
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));
protocol::TraceContext proto;
xrpl::telemetry::injectToProtobuf(ctx, proto);
EXPECT_EQ(proto.trace_flags(), 0u);
auto extracted = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(extracted);
ASSERT_NE(span, nullptr);
EXPECT_FALSE(span->GetContext().trace_flags().IsSampled());
}
#else // XRPL_ENABLE_TELEMETRY not defined
TEST(TraceContextPropagator, compiles_without_telemetry)
{
SUCCEED();
}
#endif // XRPL_ENABLE_TELEMETRY

View File

@@ -114,6 +114,7 @@
#include <xrpl/server/LoadFeeTrack.h>
#include <xrpl/server/Manifest.h>
#include <xrpl/shamap/SHAMap.h>
#include <xrpl/telemetry/SpanGuard.h>
#include <xrpl/tx/apply.h>
#include <boost/asio/error.hpp>
@@ -1311,6 +1312,11 @@ NetworkOPsImp::processTransaction(
bool bLocal,
FailHard failType)
{
using namespace telemetry;
auto span = SpanGuard::span(TraceCategory::Transactions, "tx", "process");
span.setAttribute("xrpl.tx.hash", to_string(transaction->getID()).c_str());
span.setAttribute("xrpl.tx.local", bLocal);
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
// preProcessTransaction can change our pointer
@@ -1319,10 +1325,12 @@ NetworkOPsImp::processTransaction(
if (bLocal)
{
span.setAttribute("xrpl.tx.path", "sync");
doTransactionSync(transaction, bUnlimited, failType);
}
else
{
span.setAttribute("xrpl.tx.path", "async");
doTransactionAsync(transaction, bUnlimited, failType);
}
}

View File

@@ -62,6 +62,7 @@
#include <xrpl/server/LoadFeeTrack.h>
#include <xrpl/server/NetworkOPs.h>
#include <xrpl/shamap/SHAMapNodeID.h>
#include <xrpl/telemetry/SpanGuard.h>
#include <xrpl/tx/apply.h>
#include <boost/algorithm/string/predicate.hpp>
@@ -1421,6 +1422,12 @@ PeerImp::handleTransaction(
bool eraseTxQueue,
bool batch)
{
using namespace telemetry;
auto span = SpanGuard::span(TraceCategory::Transactions, "tx", "receive");
span.setAttribute("xrpl.peer.id", static_cast<int64_t>(id_));
if (auto const version = getVersion(); !version.empty())
span.setAttribute("xrpl.peer.version", version.c_str());
XRPL_ASSERT(eraseTxQueue != batch, ("xrpl::PeerImp::handleTransaction : valid inputs"));
if (tracking_.load() == Tracking::diverged)
return;
@@ -1439,6 +1446,7 @@ PeerImp::handleTransaction(
{
auto stx = std::make_shared<STTx const>(sit);
uint256 const txID = stx->getTransactionID();
span.setAttribute("xrpl.tx.hash", to_string(txID).c_str());
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
// LCOV_EXCL_START
@@ -1472,9 +1480,11 @@ PeerImp::handleTransaction(
if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
{
span.setAttribute("xrpl.tx.suppressed", true);
// we have seen this transaction recently
if (any(flags & HashRouterFlags::BAD))
{
span.setAttribute("xrpl.tx.status", "known_bad");
fee_.update(Resource::feeUselessData, "known bad");
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
}