diff --git a/.github/scripts/levelization/results/ordering.txt b/.github/scripts/levelization/results/ordering.txt index b908b4a64c..3d540797d2 100644 --- a/.github/scripts/levelization/results/ordering.txt +++ b/.github/scripts/levelization/results/ordering.txt @@ -237,6 +237,7 @@ xrpld.app > xrpl.basics xrpld.app > xrpl.core xrpld.app > xrpld.consensus xrpld.app > xrpld.core +xrpld.app > xrpld.telemetry xrpld.app > xrpl.json xrpld.app > xrpl.ledger xrpld.app > xrpl.net @@ -262,6 +263,7 @@ xrpld.overlay > xrpl.core xrpld.overlay > xrpld.consensus xrpld.overlay > xrpld.core xrpld.overlay > xrpld.peerfinder +xrpld.overlay > xrpld.telemetry xrpld.overlay > xrpl.json xrpld.overlay > xrpl.ledger xrpld.overlay > xrpl.protocol diff --git a/docker/telemetry/grafana/provisioning/datasources/tempo.yaml b/docker/telemetry/grafana/provisioning/datasources/tempo.yaml index 198c2550d3..188a5e095b 100644 --- a/docker/telemetry/grafana/provisioning/datasources/tempo.yaml +++ b/docker/telemetry/grafana/provisioning/datasources/tempo.yaml @@ -7,6 +7,7 @@ # Each phase adds filters for the span attributes it introduces. # Phase 1b (infra): Base filters — node identity, service, span name, status. # Phase 2 (RPC): RPC command, status, role filters. +# Phase 3 (TX): Transaction hash, local/peer origin, status. apiVersion: 1 @@ -117,3 +118,19 @@ datasources: operator: "=" scope: span type: dynamic + # Phase 3: Transaction tracing filters + - id: tx-hash + tag: xrpl.tx.hash + operator: "=" + scope: span + type: static + - id: tx-origin + tag: xrpl.tx.local + operator: "=" + scope: span + type: dynamic + - id: tx-status + tag: xrpl.tx.status + operator: "=" + scope: span + type: dynamic diff --git a/include/xrpl/proto/xrpl.proto b/include/xrpl/proto/xrpl.proto index d49920201e..56f4dafc80 100644 --- a/include/xrpl/proto/xrpl.proto +++ b/include/xrpl/proto/xrpl.proto @@ -85,6 +85,15 @@ message TMPublicKey { // If you want to send an amount that is greater than any single address of yours // you must first combine coins from one address to another. +// Trace context for OpenTelemetry distributed tracing across nodes. +// Uses W3C Trace Context format internally. +message TraceContext { + optional bytes trace_id = 1; // 16-byte trace identifier + optional bytes span_id = 2; // 8-byte parent span identifier + optional uint32 trace_flags = 3; // bit 0 = sampled + optional string trace_state = 4; // W3C tracestate header value +} + enum TransactionStatus { tsNEW = 1; // origin node did/could not validate tsCURRENT = 2; // scheduled to go in this ledger @@ -101,6 +110,9 @@ message TMTransaction { required TransactionStatus status = 2; optional uint64 receiveTimestamp = 3; optional bool deferred = 4; // not applied to open ledger + + // Optional trace context for OpenTelemetry distributed tracing + optional TraceContext trace_context = 1001; } message TMTransactions { @@ -149,6 +161,9 @@ message TMProposeSet { // Number of hops traveled optional uint32 hops = 12 [deprecated = true]; + + // Optional trace context for OpenTelemetry distributed tracing + optional TraceContext trace_context = 1001; } enum TxSetStatus { @@ -194,6 +209,9 @@ message TMValidation { // Number of hops traveled optional uint32 hops = 3 [deprecated = true]; + + // Optional trace context for OpenTelemetry distributed tracing + optional TraceContext trace_context = 1001; } // An array of Endpoint messages diff --git a/include/xrpl/telemetry/TraceContextPropagator.h b/include/xrpl/telemetry/TraceContextPropagator.h new file mode 100644 index 0000000000..b897541267 --- /dev/null +++ b/include/xrpl/telemetry/TraceContextPropagator.h @@ -0,0 +1,94 @@ +#pragma once + +/** Utilities for trace context propagation across nodes. + + Provides serialization/deserialization of OTel trace context to/from + Protocol Buffer TraceContext messages (P2P cross-node propagation). + + Only compiled when XRPL_ENABLE_TELEMETRY is defined. +*/ + +#ifdef XRPL_ENABLE_TELEMETRY + +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace xrpl { +namespace telemetry { + +/** Extract OTel context from a protobuf TraceContext message. + + @param proto The protobuf TraceContext received from a peer. + @return An OTel Context with the extracted parent span, or an empty + context if the protobuf fields are missing or invalid. +*/ +inline opentelemetry::context::Context +extractFromProtobuf(protocol::TraceContext const& proto) +{ + namespace trace = opentelemetry::trace; + + if (!proto.has_trace_id() || proto.trace_id().size() != 16 || !proto.has_span_id() || + proto.span_id().size() != 8) + { + return opentelemetry::context::Context{}; + } + + auto const* rawTraceId = reinterpret_cast(proto.trace_id().data()); + auto const* rawSpanId = reinterpret_cast(proto.span_id().data()); + trace::TraceId traceId(opentelemetry::nostd::span(rawTraceId, 16)); + trace::SpanId spanId(opentelemetry::nostd::span(rawSpanId, 8)); + // Default to not-sampled (0x00) per W3C Trace Context spec when + // the trace_flags field is absent. + trace::TraceFlags flags( + proto.has_trace_flags() ? static_cast(proto.trace_flags()) + : static_cast(0)); + + trace::SpanContext spanCtx(traceId, spanId, flags, /* remote = */ true); + + return opentelemetry::context::Context{}.SetValue( + trace::kSpanKey, + opentelemetry::nostd::shared_ptr(new trace::DefaultSpan(spanCtx))); +} + +/** Inject the current span's trace context into a protobuf TraceContext. + + @param ctx The OTel context containing the span to propagate. + @param proto The protobuf TraceContext to populate. +*/ +inline void +injectToProtobuf(opentelemetry::context::Context const& ctx, protocol::TraceContext& proto) +{ + namespace trace = opentelemetry::trace; + + auto span = trace::GetSpan(ctx); + if (!span) + return; + + auto const& spanCtx = span->GetContext(); + if (!spanCtx.IsValid()) + return; + + // Serialize trace_id (16 bytes) + auto const& traceId = spanCtx.trace_id(); + proto.set_trace_id(traceId.Id().data(), trace::TraceId::kSize); + + // Serialize span_id (8 bytes) + auto const& spanId = spanCtx.span_id(); + proto.set_span_id(spanId.Id().data(), trace::SpanId::kSize); + + // Serialize flags + proto.set_trace_flags(spanCtx.trace_flags().flags()); +} + +} // namespace telemetry +} // namespace xrpl + +#endif // XRPL_ENABLE_TELEMETRY diff --git a/src/tests/libxrpl/telemetry/TraceContextPropagator.cpp b/src/tests/libxrpl/telemetry/TraceContextPropagator.cpp new file mode 100644 index 0000000000..a8390bf768 --- /dev/null +++ b/src/tests/libxrpl/telemetry/TraceContextPropagator.cpp @@ -0,0 +1,155 @@ +#include + +#ifdef XRPL_ENABLE_TELEMETRY + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace trace = opentelemetry::trace; + +TEST(TraceContextPropagator, round_trip) +{ + std::uint8_t traceIdBuf[16] = { + 0x01, + 0x02, + 0x03, + 0x04, + 0x05, + 0x06, + 0x07, + 0x08, + 0x09, + 0x0a, + 0x0b, + 0x0c, + 0x0d, + 0x0e, + 0x0f, + 0x10}; + std::uint8_t spanIdBuf[8] = {0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x11, 0x22}; + + trace::TraceId traceId(opentelemetry::nostd::span(traceIdBuf, 16)); + trace::SpanId spanId(opentelemetry::nostd::span(spanIdBuf, 8)); + trace::TraceFlags flags(trace::TraceFlags::kIsSampled); + trace::SpanContext spanCtx(traceId, spanId, flags, true); + + auto ctx = opentelemetry::context::Context{}.SetValue( + trace::kSpanKey, + opentelemetry::nostd::shared_ptr(new trace::DefaultSpan(spanCtx))); + + protocol::TraceContext proto; + xrpl::telemetry::injectToProtobuf(ctx, proto); + + EXPECT_TRUE(proto.has_trace_id()); + EXPECT_EQ(proto.trace_id().size(), 16u); + EXPECT_TRUE(proto.has_span_id()); + EXPECT_EQ(proto.span_id().size(), 8u); + EXPECT_EQ(proto.trace_flags(), static_cast(trace::TraceFlags::kIsSampled)); + EXPECT_EQ(std::memcmp(proto.trace_id().data(), traceIdBuf, 16), 0); + EXPECT_EQ(std::memcmp(proto.span_id().data(), spanIdBuf, 8), 0); + + auto extractedCtx = xrpl::telemetry::extractFromProtobuf(proto); + auto extractedSpan = trace::GetSpan(extractedCtx); + ASSERT_NE(extractedSpan, nullptr); + + auto const& extracted = extractedSpan->GetContext(); + EXPECT_TRUE(extracted.IsValid()); + EXPECT_TRUE(extracted.IsRemote()); + EXPECT_EQ(extracted.trace_id(), traceId); + EXPECT_EQ(extracted.span_id(), spanId); + EXPECT_TRUE(extracted.trace_flags().IsSampled()); +} + +TEST(TraceContextPropagator, extract_empty_protobuf) +{ + protocol::TraceContext proto; + auto ctx = xrpl::telemetry::extractFromProtobuf(proto); + auto span = trace::GetSpan(ctx); + if (span) + { + EXPECT_FALSE(span->GetContext().IsValid()); + } +} + +TEST(TraceContextPropagator, extract_wrong_size_trace_id) +{ + protocol::TraceContext proto; + proto.set_trace_id(std::string(8, '\x01')); + proto.set_span_id(std::string(8, '\xaa')); + + auto ctx = xrpl::telemetry::extractFromProtobuf(proto); + auto span = trace::GetSpan(ctx); + if (span) + { + EXPECT_FALSE(span->GetContext().IsValid()); + } +} + +TEST(TraceContextPropagator, extract_wrong_size_span_id) +{ + protocol::TraceContext proto; + proto.set_trace_id(std::string(16, '\x01')); + proto.set_span_id(std::string(4, '\xaa')); + + auto ctx = xrpl::telemetry::extractFromProtobuf(proto); + auto span = trace::GetSpan(ctx); + if (span) + { + EXPECT_FALSE(span->GetContext().IsValid()); + } +} + +TEST(TraceContextPropagator, inject_invalid_span) +{ + auto ctx = opentelemetry::context::Context{}; + protocol::TraceContext proto; + xrpl::telemetry::injectToProtobuf(ctx, proto); + + EXPECT_FALSE(proto.has_trace_id()); + EXPECT_FALSE(proto.has_span_id()); +} + +TEST(TraceContextPropagator, flags_preservation) +{ + std::uint8_t traceIdBuf[16] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; + std::uint8_t spanIdBuf[8] = {1, 2, 3, 4, 5, 6, 7, 8}; + + // Test with flags NOT sampled (flags = 0) + trace::TraceFlags flags(0); + trace::SpanContext spanCtx( + trace::TraceId(opentelemetry::nostd::span(traceIdBuf, 16)), + trace::SpanId(opentelemetry::nostd::span(spanIdBuf, 8)), + flags, + true); + + auto ctx = opentelemetry::context::Context{}.SetValue( + trace::kSpanKey, + opentelemetry::nostd::shared_ptr(new trace::DefaultSpan(spanCtx))); + + protocol::TraceContext proto; + xrpl::telemetry::injectToProtobuf(ctx, proto); + EXPECT_EQ(proto.trace_flags(), 0u); + + auto extracted = xrpl::telemetry::extractFromProtobuf(proto); + auto span = trace::GetSpan(extracted); + ASSERT_NE(span, nullptr); + EXPECT_FALSE(span->GetContext().trace_flags().IsSampled()); +} + +#else // XRPL_ENABLE_TELEMETRY not defined + +TEST(TraceContextPropagator, compiles_without_telemetry) +{ + SUCCEED(); +} + +#endif // XRPL_ENABLE_TELEMETRY diff --git a/src/xrpld/app/misc/NetworkOPs.cpp b/src/xrpld/app/misc/NetworkOPs.cpp index 8de65d8b39..33c2b04d36 100644 --- a/src/xrpld/app/misc/NetworkOPs.cpp +++ b/src/xrpld/app/misc/NetworkOPs.cpp @@ -114,6 +114,7 @@ #include #include #include +#include #include #include @@ -1311,6 +1312,11 @@ NetworkOPsImp::processTransaction( bool bLocal, FailHard failType) { + using namespace telemetry; + auto span = SpanGuard::span(TraceCategory::Transactions, "tx", "process"); + span.setAttribute("xrpl.tx.hash", to_string(transaction->getID()).c_str()); + span.setAttribute("xrpl.tx.local", bLocal); + auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN"); // preProcessTransaction can change our pointer @@ -1319,10 +1325,12 @@ NetworkOPsImp::processTransaction( if (bLocal) { + span.setAttribute("xrpl.tx.path", "sync"); doTransactionSync(transaction, bUnlimited, failType); } else { + span.setAttribute("xrpl.tx.path", "async"); doTransactionAsync(transaction, bUnlimited, failType); } } diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 46a640ec5c..8902749f92 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -62,6 +62,7 @@ #include #include #include +#include #include #include @@ -1421,6 +1422,12 @@ PeerImp::handleTransaction( bool eraseTxQueue, bool batch) { + using namespace telemetry; + auto span = SpanGuard::span(TraceCategory::Transactions, "tx", "receive"); + span.setAttribute("xrpl.peer.id", static_cast(id_)); + if (auto const version = getVersion(); !version.empty()) + span.setAttribute("xrpl.peer.version", version.c_str()); + XRPL_ASSERT(eraseTxQueue != batch, ("xrpl::PeerImp::handleTransaction : valid inputs")); if (tracking_.load() == Tracking::diverged) return; @@ -1439,6 +1446,7 @@ PeerImp::handleTransaction( { auto stx = std::make_shared(sit); uint256 const txID = stx->getTransactionID(); + span.setAttribute("xrpl.tx.hash", to_string(txID).c_str()); // Charge strongly for attempting to relay a txn with tfInnerBatchTxn // LCOV_EXCL_START @@ -1472,9 +1480,11 @@ PeerImp::handleTransaction( if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval)) { + span.setAttribute("xrpl.tx.suppressed", true); // we have seen this transaction recently if (any(flags & HashRouterFlags::BAD)) { + span.setAttribute("xrpl.tx.status", "known_bad"); fee_.update(Resource::feeUselessData, "known bad"); JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID; }