feat(telemetry): add cross-node trace context propagation

Wire trace context into P2P message flow so distributed traces
link across nodes. TX relay injects SpanGuard context via
PropagationHelpers.h; consensus propose/validate injects via
TraceContextPropagator.h. Receive-side extraction in PeerImp
creates child spans for proposals and validations.

- Add TraceBytes struct and SpanGuard::getTraceBytes() for
  extracting raw trace context without OTel type dependencies
- Add PropagationHelpers.h: injectSpanContext(SpanGuard, proto)
- Add ConsensusReceiveTracing.h: proposalReceiveSpan(),
  validationReceiveSpan() with parent context extraction
- NetworkOPs::apply(): inject tx.process context before relay
- RCLConsensus::propose()/validate(): inject active span context
- PeerImp: create receive spans for proposals and validations
  with sender's trace context as parent

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-04-29 14:21:32 +01:00
parent 0012f52940
commit 654fe2d30f
11 changed files with 359 additions and 28 deletions

View File

@@ -309,6 +309,26 @@ SpanGuard::captureContext() const
return SpanContext(std::make_shared<SpanContext::Impl>(ctx));
}
TraceBytes
SpanGuard::getTraceBytes() const
{
if (!impl_ || !impl_->span)
return {};
auto const& spanCtx = impl_->span->GetContext();
if (!spanCtx.IsValid())
return {};
TraceBytes result;
auto const& tid = spanCtx.trace_id();
std::memcpy(result.traceId.data(), tid.Id().data(), 16);
auto const& sid = spanCtx.span_id();
std::memcpy(result.spanId.data(), sid.Id().data(), 8);
result.traceFlags = spanCtx.trace_flags().flags();
result.valid = true;
return result;
}
// ===== Attribute setters ===================================================
void

View File

@@ -62,9 +62,14 @@
#include <xrpl/shamap/SHAMapItem.h>
#include <xrpl/shamap/SHAMapMissingNode.h>
#include <xrpl/shamap/SHAMapTreeNode.h>
#include <xrpl/telemetry/TraceContextPropagator.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#ifdef XRPL_ENABLE_TELEMETRY
#include <opentelemetry/context/runtime_context.h>
#endif
#include <xrpl.pb.h>
#include <algorithm>
@@ -261,6 +266,16 @@ RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
app_.getHashRouter().addSuppression(suppression);
// Inject the current thread's active span context (e.g. the
// consensus round span from Phase 4) so receiving peers can link
// their proposal.receive span as a child of this trace.
#ifdef XRPL_ENABLE_TELEMETRY
{
auto ctx = opentelemetry::context::RuntimeContext::GetCurrent();
telemetry::injectToProtobuf(ctx, *prop.mutable_trace_context());
}
#endif
app_.getOverlay().broadcast(prop);
}
@@ -881,6 +896,14 @@ RCLConsensus::Adaptor::validate(RCLCxLedger const& ledger, RCLTxSet const& txns,
// Broadcast to all our peers:
protocol::TMValidation val;
val.set_validation(serialized.data(), serialized.size());
// Inject the current thread's active span context so receiving
// peers can link their validation.receive span as a child.
#ifdef XRPL_ENABLE_TELEMETRY
{
auto ctx = opentelemetry::context::RuntimeContext::GetCurrent();
telemetry::injectToProtobuf(ctx, *val.mutable_trace_context());
}
#endif
app_.getOverlay().broadcast(val);
// Publish to all our subscribers:

View File

@@ -35,6 +35,7 @@
#include <xrpld/rpc/DeliveredAmount.h>
#include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h>
#include <xrpld/telemetry/PropagationHelpers.h>
#include <xrpld/telemetry/TxTracing.h>
#include <xrpl/basics/Log.h>
@@ -1703,6 +1704,10 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
tx.set_receivetimestamp(
registry_.get().getTimeKeeper().now().time_since_epoch().count());
tx.set_deferred(e.result == terQUEUED);
// Inject the tx.process span's trace context so the
// receiving node can link its tx.receive span as a child.
if (e.span && *e.span)
telemetry::injectSpanContext(*e.span, *tx.mutable_trace_context());
// FIXME: This should be when we received it
registry_.get().getOverlay().relay(e.transaction->getID(), tx, *toSkip);
e.transaction->setBroadcast();

View File

@@ -5,14 +5,14 @@
* Used by PeerImp (overlay) and NetworkOPs (app) for transaction
* lifecycle spans. Built on StaticStr/join() from SpanNames.h.
*
* Span hierarchy:
* Span hierarchy (cross-node propagation):
*
* Node A (sender) Node B (receiver)
* +------------------+ +------------------+
* | tx.process | protobuf | tx.receive |
* | injectTo | ---------> | extractFrom |
* | Protobuf() | trace_ctx | Protobuf() |
* +------------------+ +------------------+
* Node A (sender) Node B (receiver)
* +---------------------+ +---------------------+
* | tx.process | protobuf | tx.receive |
* | injectSpanContext | ---------> | txReceiveSpan() |
* | (PropagationHelp.) | trace_ctx | extracts parent |
* +---------------------+ +---------------------+
*/
#include <xrpl/telemetry/SpanNames.h>

View File

@@ -22,6 +22,7 @@
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/peerfinder/PeerfinderManager.h>
#include <xrpld/peerfinder/Slot.h>
#include <xrpld/telemetry/ConsensusReceiveTracing.h>
#include <xrpld/telemetry/TxTracing.h>
#include <xrpl/basics/Log.h>
@@ -1958,9 +1959,17 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
app_.getTimeKeeper().closeTime(),
calcNodeID(app_.getValidatorManifests().getMasterKey(publicKey))});
// Create a receive span that links to the sender's trace context
// (if propagated). shared_ptr keeps it alive across the job boundary.
auto span = std::make_shared<telemetry::SpanGuard>(telemetry::proposalReceiveSpan(set));
span->setAttribute("xrpl.consensus.trusted", isTrusted);
span->setAttribute("xrpl.consensus.round", static_cast<int64_t>(set.proposeseq()));
std::weak_ptr<PeerImp> const weak = shared_from_this();
app_.getJobQueue().addJob(
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "checkPropose", [weak, isTrusted, m, proposal]() {
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
"checkPropose",
[weak, isTrusted, m, proposal, sp = std::move(span)]() {
if (auto peer = weak.lock())
peer->checkPropose(isTrusted, m, proposal);
});
@@ -2535,6 +2544,17 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
return;
}
// Create a receive span that links to the sender's trace context
// (if propagated). shared_ptr keeps it alive across the job boundary.
auto span = std::make_shared<telemetry::SpanGuard>(telemetry::validationReceiveSpan(*m));
span->setAttribute("xrpl.consensus.trusted", isTrusted);
if (val->isFieldPresent(sfLedgerSequence))
{
span->setAttribute(
"xrpl.consensus.ledger.seq",
static_cast<int64_t>(val->getFieldU32(sfLedgerSequence)));
}
if (!isTrusted && (tracking_.load() == Tracking::diverged))
{
JLOG(p_journal_.debug()) << "Dropping untrusted validation from diverged peer";
@@ -2545,7 +2565,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
std::weak_ptr<PeerImp> const weak = shared_from_this();
app_.getJobQueue().addJob(
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, name, [weak, val, m, key]() {
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
name,
[weak, val, m, key, sp = std::move(span)]() {
if (auto peer = weak.lock())
peer->checkValidation(val, key, m);
});

View File

@@ -0,0 +1,127 @@
#pragma once
/** Helper functions for creating consensus receive trace spans.
*
* Encapsulates the logic for creating SpanGuard instances for incoming
* proposal and validation messages with optional protobuf parent
* extraction. When the incoming message carries a TraceContext with a
* valid span_id, the receive span is created as a child of the
* sender's span, enabling cross-node trace correlation.
*
* Dependency diagram:
*
* protocol::TMProposeSet / TMValidation
* |
* v
* proposalReceiveSpan() / validationReceiveSpan()
* |
* +--- has trace_context? ----+
* | yes | no
* v v
* SpanGuard::span() with SpanGuard::span()
* extracted parent context (standalone span)
*
* When XRPL_ENABLE_TELEMETRY is not defined, the functions return
* no-op SpanGuard instances (zero overhead, zero dependencies).
*
* Usage:
* @code
* // In PeerImp::onMessage(TMProposeSet):
* auto span = telemetry::proposalReceiveSpan(*m);
* span.setAttribute(...);
* @endcode
*
* @note These span names use inline string_view literals. When
* ConsensusSpanNames.h (from Phase 4) is available, callers should
* migrate to using the constexpr constants defined there.
*/
#include <xrpl/proto/xrpl.pb.h>
#include <xrpl/telemetry/SpanGuard.h>
namespace xrpl {
namespace telemetry {
// Inline span name constants for consensus receive spans.
// Phase 4 will provide these via ConsensusSpanNames.h; these are
// temporary definitions for the propagation infrastructure.
namespace detail {
inline constexpr std::string_view proposalReceiveName = "consensus.proposal.receive";
inline constexpr std::string_view validationReceiveName = "consensus.validation.receive";
} // namespace detail
/** Create a "consensus.proposal.receive" span for an incoming proposal.
*
* If the message carries a TraceContext with a valid span_id, the
* receive span is created with the sender's context as parent.
* Otherwise a standalone span is created.
*
* @param msg The incoming TMProposeSet protobuf message.
* @return An active SpanGuard, or a null guard if tracing is disabled.
*/
inline SpanGuard
proposalReceiveSpan([[maybe_unused]] protocol::TMProposeSet const& msg)
{
#ifdef XRPL_ENABLE_TELEMETRY
if (msg.has_trace_context())
{
auto const& tc = msg.trace_context();
if (tc.has_span_id() && tc.span_id().size() == 8 && tc.has_trace_id() &&
tc.trace_id().size() == 16)
{
// Create a child span using the sender's trace_id and
// span_id as parent. Use hashSpan with the sender's
// trace_id so the receiving span shares the same trace.
return SpanGuard::hashSpan(
TraceCategory::Consensus,
detail::proposalReceiveName,
reinterpret_cast<std::uint8_t const*>(tc.trace_id().data()),
tc.trace_id().size(),
reinterpret_cast<std::uint8_t const*>(tc.span_id().data()),
tc.span_id().size(),
tc.has_trace_flags() ? static_cast<std::uint8_t>(tc.trace_flags())
: std::uint8_t{0});
}
}
#endif
// No propagated context — create a standalone span.
return SpanGuard::span(TraceCategory::Consensus, "consensus", "proposal.receive");
}
/** Create a "consensus.validation.receive" span for an incoming validation.
*
* If the message carries a TraceContext with a valid span_id, the
* receive span is created with the sender's context as parent.
* Otherwise a standalone span is created.
*
* @param msg The incoming TMValidation protobuf message.
* @return An active SpanGuard, or a null guard if tracing is disabled.
*/
inline SpanGuard
validationReceiveSpan([[maybe_unused]] protocol::TMValidation const& msg)
{
#ifdef XRPL_ENABLE_TELEMETRY
if (msg.has_trace_context())
{
auto const& tc = msg.trace_context();
if (tc.has_span_id() && tc.span_id().size() == 8 && tc.has_trace_id() &&
tc.trace_id().size() == 16)
{
return SpanGuard::hashSpan(
TraceCategory::Consensus,
detail::validationReceiveName,
reinterpret_cast<std::uint8_t const*>(tc.trace_id().data()),
tc.trace_id().size(),
reinterpret_cast<std::uint8_t const*>(tc.span_id().data()),
tc.span_id().size(),
tc.has_trace_flags() ? static_cast<std::uint8_t>(tc.trace_flags())
: std::uint8_t{0});
}
}
#endif
// No propagated context — create a standalone span.
return SpanGuard::span(TraceCategory::Consensus, "consensus", "validation.receive");
}
} // namespace telemetry
} // namespace xrpl

View File

@@ -0,0 +1,62 @@
#pragma once
/** Helpers for injecting trace context into protobuf messages.
*
* Bridges the gap between SpanGuard (which hides OTel types) and the
* protobuf TraceContext message used for cross-node propagation.
*
* Dependency diagram:
*
* SpanGuard::getTraceBytes() protocol::TraceContext (proto)
* \ /
* +--- TraceBytes -----+
* | |
* injectSpanContext(span, proto)
*
* @note When XRPL_ENABLE_TELEMETRY is disabled, getTraceBytes() returns
* {.valid=false}, so injectSpanContext becomes a no-op with zero overhead.
*
* Usage:
* @code
* // Send side — inject from a SpanGuard reference:
* protocol::TMTransaction tx;
* // ... populate tx fields ...
* injectSpanContext(mySpanGuard, *tx.mutable_trace_context());
* overlay.relay(txID, tx, toSkip);
* @endcode
*
* @see ConsensusReceiveTracing.h for receive-side extraction helpers.
* @see TraceContextPropagator.h for low-level OTel context serialization.
*/
#include <xrpl/proto/xrpl.pb.h>
#include <xrpl/telemetry/SpanGuard.h>
namespace xrpl {
namespace telemetry {
/** Inject trace context from an active SpanGuard into a protobuf
* TraceContext message for cross-node propagation.
*
* Reads the span's trace_id, span_id, and trace_flags via
* getTraceBytes() and writes them into the protobuf fields.
* Safe to call from any thread that holds a reference to the span.
* No-op if the span is null or inactive.
*
* @param span The active SpanGuard whose context to propagate.
* @param proto The protobuf TraceContext to populate.
*/
inline void
injectSpanContext(SpanGuard const& span, protocol::TraceContext& proto)
{
auto const bytes = span.getTraceBytes();
if (!bytes.valid)
return;
proto.set_trace_id(bytes.traceId.data(), bytes.traceId.size());
proto.set_span_id(bytes.spanId.data(), bytes.spanId.size());
proto.set_trace_flags(bytes.traceFlags);
}
} // namespace telemetry
} // namespace xrpl