addressed code review comments

Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
This commit is contained in:
Pratik Mankawde
2026-05-28 15:55:25 +01:00
parent 43258e8dc0
commit c6c019ed8b
12 changed files with 177 additions and 45 deletions

View File

@@ -89,7 +89,7 @@
- In `onMessage(TMTransaction)` / `handleTransaction()`:
- Extract parent trace context from incoming `TMTransaction::trace_context` field (if present)
- Create `tx.receive` span as child of extracted context (or new root if none)
- Set attributes: `xrpl.tx.hash`, `xrpl.peer.id`, `tx_status`
- Set attributes: `tx_hash`, `peer_id`, `tx_status`
- On HashRouter suppression (duplicate): set `suppressed=true`, add `tx.duplicate` event
- Wrap validation call with child span `tx.validate`
- Wrap relay with `tx.relay` span
@@ -121,7 +121,7 @@
- Edit `src/xrpld/app/misc/NetworkOPs.cpp`:
- In `processTransaction()`:
- Create `tx.process` span
- Set attributes: `xrpl.tx.hash`, `tx_type`, `local` (whether from RPC or peer)
- Set attributes: `tx_hash`, `tx_type`, `local` (whether from RPC or peer)
- Record whether sync or async path is taken
- In `doTransactionAsync()`:
@@ -256,7 +256,7 @@
**What to do**:
- Edit `src/xrpld/overlay/detail/PeerImp.cpp`:
- In the `tx.receive` span block (after existing `xrpl.peer.id` setAttribute call):
- In the `tx.receive` span block (after existing `peer_id` setAttribute call):
- Add `peer_version` (string) — from `this->getVersion()`
- Only set if `getVersion()` returns a non-empty string (avoid empty-string attributes)

View File

@@ -109,7 +109,7 @@ datasources:
type: dynamic
# Phase 3: Transaction tracing filters
- id: tx-hash
tag: xrpl.tx.hash
tag: tx_hash
operator: "="
scope: span
type: static

View File

@@ -87,11 +87,22 @@ message TMPublicKey {
// Trace context for OpenTelemetry distributed tracing across nodes.
// Uses W3C Trace Context format internally.
//
// Field numbering note: this message is embedded as field 1001 on
// TMTransaction, TMProposeSet, and TMValidation. Field numbers >= 1000
// are reserved for optional, observability-only additions that must not
// collide with protocol-semantic fields (which historically use 1-99).
// Older peers that do not understand field 1001 will simply ignore it
// per protobuf wire-format rules, preserving backwards compatibility.
//
// trace_state is reserved for future use (secure tracing pipeline,
// OpenTelemetryPlan/secure-OTel.md). It is currently neither populated
// on inject nor read on extract; consumers must not rely on it.
message TraceContext {
optional bytes trace_id = 1; // 16-byte trace identifier
optional bytes span_id = 2; // 8-byte parent span identifier
optional uint32 trace_flags = 3; // bit 0 = sampled
optional string trace_state = 4; // W3C tracestate header value
optional string trace_state = 4; // RESERVED — see TraceContext header note
}
enum TransactionStatus {

View File

@@ -275,10 +275,10 @@ public:
*/
static SpanGuard
hashSpan(
TraceCategory cat,
std::string_view name,
std::uint8_t const* hashData,
std::size_t hashSize);
TraceCategory const cat,
std::string_view const name,
std::uint8_t const* const hashData,
std::size_t const hashSize);
/** Create a hash-derived span with a remote parent.
trace_id = hashData[0:16], parent span_id from protobuf context
@@ -294,13 +294,13 @@ public:
*/
static SpanGuard
hashSpan(
TraceCategory cat,
std::string_view name,
std::uint8_t const* hashData,
std::size_t hashSize,
std::uint8_t const* parentSpanId,
std::size_t parentSpanSize,
std::uint8_t traceFlags);
TraceCategory const cat,
std::string_view const name,
std::uint8_t const* const hashData,
std::size_t const hashSize,
std::uint8_t const* const parentSpanId,
std::size_t const parentSpanSize,
std::uint8_t const traceFlags);
// --- Context capture -----------------------------------------------

View File

@@ -109,11 +109,17 @@ inline constexpr auto networkId = join(join(seg::xrpl, seg::network), makeStr("i
inline constexpr auto networkType = join(join(seg::xrpl, seg::network), makeStr("type"));
inline constexpr auto linkType = makeStr("link_type");
/// Canonical shared attrs (rule 5 — kept xrpl.<domain>.* form).
/// Defined once here, aliased by domain-specific headers.
inline constexpr auto txHash = join(join(seg::xrpl, seg::tx), makeStr("hash"));
inline constexpr auto peerId = join(join(seg::xrpl, seg::peer), makeStr("id"));
inline constexpr auto ledgerSeq = join(join(seg::xrpl, seg::ledger), makeStr("seq"));
/// Canonical shared attrs (rule 5 — <domain>_<field> underscore form).
///
/// Per the naming convention header note: shared cross-span attribute
/// keys use the underscore form, reserving the dotted xrpl.<domain>.<field>
/// form for resource attributes set on the OTel resource at startup.
/// Defined once here, aliased by domain-specific headers. These are
/// literal underscore-joined names, not dot-joined via `join()`, since
/// `join()` always inserts `.` between its arguments.
inline constexpr auto txHash = makeStr("tx_hash");
inline constexpr auto peerId = makeStr("peer_id");
inline constexpr auto ledgerSeq = makeStr("ledger_seq");
} // namespace attr
// ===== Shared attribute values =============================================

View File

@@ -91,6 +91,14 @@ injectToProtobuf(opentelemetry::context::Context const& ctx, protocol::TraceCont
// Serialize flags
proto.set_trace_flags(spanCtx.trace_flags().flags());
// TODO(observability/secure-OTel): the protobuf TraceContext message
// also carries `trace_state` (field 4), which is currently neither
// populated here nor read by extractFromProtobuf above. The field is
// reserved for the secure tracing pipeline outlined in
// OpenTelemetryPlan/secure-OTel.md, where an authenticated token in
// tracestate will let receivers reject spoofed/poisoned trace context.
// Wire trace_state through inject/extract once the consumer lands.
}
} // namespace telemetry

View File

@@ -264,10 +264,10 @@ SpanGuard::linkedSpan(std::string_view name, SpanContext const& linkCtx)
SpanGuard
SpanGuard::hashSpan(
TraceCategory cat,
std::string_view name,
std::uint8_t const* hashData,
std::size_t hashSize)
TraceCategory const cat,
std::string_view const name,
std::uint8_t const* const hashData,
std::size_t const hashSize)
{
if (hashSize < 16)
return {};
@@ -295,13 +295,13 @@ SpanGuard::hashSpan(
SpanGuard
SpanGuard::hashSpan(
TraceCategory cat,
std::string_view name,
std::uint8_t const* hashData,
std::size_t hashSize,
std::uint8_t const* parentSpanId,
std::size_t parentSpanSize,
std::uint8_t traceFlags)
TraceCategory const cat,
std::string_view const name,
std::uint8_t const* const hashData,
std::size_t const hashSize,
std::uint8_t const* const parentSpanId,
std::size_t const parentSpanSize,
std::uint8_t const traceFlags)
{
if (hashSize < 16 || parentSpanSize != 8)
return {};

View File

@@ -0,0 +1,67 @@
#pragma once
/** Compile-time span name and attribute constants for consensus tracing.
*
* Used by PeerImp (overlay) and RCLConsensus (consensus) for proposal
* and validation lifecycle spans. Built on StaticStr/join() from
* SpanNames.h and follows the rule-5 underscore form for shared
* cross-span attributes (e.g. `consensus_round`, `ledger_seq`).
*
* Phase 3 introduces the receive-side surface used by PeerImp.
* Phase 4 will extend this with the proposer/validator-side spans
* (`consensus.proposal.send`, `consensus.validation.send`, round
* bookkeeping, etc.).
*
* Span hierarchy (cross-node propagation):
*
* Node A (sender) Node B (receiver)
* +----------------------------+ +-------------------------------+
* | consensus.proposal/...send | proto | consensus.proposal/...receive |
* | inject trace context | -----> | proposalReceiveSpan() / |
* | (RCLConsensus broadcast) | t_ctx | validationReceiveSpan() |
* +----------------------------+ +-------------------------------+
*/
#include <xrpl/telemetry/SpanNames.h>
namespace xrpl::telemetry::consensus_span {
// ===== Span prefixes =======================================================
namespace prefix {
/// "consensus" — root prefix for consensus lifecycle spans.
inline constexpr auto consensus = seg::consensus;
/// "consensus.proposal" — proposal sub-tree.
inline constexpr auto proposal = join(consensus, makeStr("proposal"));
/// "consensus.validation" — validation sub-tree.
inline constexpr auto validation = join(consensus, makeStr("validation"));
} // namespace prefix
// ===== Span operation suffixes =============================================
namespace op {
inline constexpr auto receive = makeStr("receive");
inline constexpr auto send = makeStr("send");
} // namespace op
// ===== Full span names =====================================================
inline constexpr auto proposalReceive = join(prefix::proposal, op::receive);
inline constexpr auto validationReceive = join(prefix::validation, op::receive);
// ===== Attribute keys ======================================================
namespace attr {
/// Canonical shared constants (defined in SpanNames.h).
using ::xrpl::telemetry::attr::ledgerSeq;
/// "trusted" — bare field; whether the proposing/validating node is
/// in our UNL. Used only on consensus spans, no cross-domain collision.
inline constexpr auto trusted = makeStr("trusted");
/// "consensus_round" — propose-sequence within a consensus round
/// (rule-5 underscore form, shared across consensus spans).
inline constexpr auto round = makeStr("consensus_round");
} // namespace attr
} // namespace xrpl::telemetry::consensus_span

View File

@@ -898,6 +898,15 @@ RCLConsensus::Adaptor::validate(RCLCxLedger const& ledger, RCLTxSet const& txns,
val.set_validation(serialized.data(), serialized.size());
// Inject the current thread's active span context so receiving
// peers can link their validation.receive span as a child.
//
// TODO(observability/secure-OTel): the trace_context appended below is
// outside the cryptographic signature on `serialized` and is therefore
// unauthenticated. Receivers cannot prove it was not tampered with by
// a relay. A signed trace context (either folded into the validation
// payload or carried by an authenticated trace_state token) is tracked
// as a follow-up — see PR #6425 discussion r3317273388 and
// OpenTelemetryPlan/secure-OTel.md. Until then, downstream consumers
// must treat the validation trace_context as advisory only.
#ifdef XRPL_ENABLE_TELEMETRY
{
auto ctx = opentelemetry::context::RuntimeContext::GetCurrent();

View File

@@ -388,9 +388,15 @@ public:
* @param transaction Transaction object.
* @param bUnlimited Whether a privileged client connection submitted it.
* @param failType fail_hard setting from transaction submission.
* @param span Optional tx.process span to keep alive across the
* batch boundary so its context propagates to peers.
*/
void
doTransactionSync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failType);
doTransactionSync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType,
std::shared_ptr<telemetry::SpanGuard> span = nullptr);
/**
* For transactions not submitted by a locally connected client, fire and
@@ -1337,7 +1343,7 @@ NetworkOPsImp::processTransaction(
if (bLocal)
{
span->setAttribute(tx_span::attr::path, tx_span::val::sync);
doTransactionSync(transaction, bUnlimited, failType);
doTransactionSync(transaction, bUnlimited, failType, std::move(span));
}
else
{
@@ -1374,13 +1380,14 @@ void
NetworkOPsImp::doTransactionSync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType)
FailHard failType,
std::shared_ptr<telemetry::SpanGuard> span)
{
std::unique_lock<std::mutex> lock(mMutex);
if (!transaction->getApplying())
{
mTransactions.emplace_back(transaction, bUnlimited, true, failType);
mTransactions.emplace_back(transaction, bUnlimited, true, failType, std::move(span));
transaction->setApplying();
}

View File

@@ -63,6 +63,17 @@ namespace val {
inline constexpr auto sync = makeStr("sync");
inline constexpr auto async = makeStr("async");
inline constexpr auto knownBad = makeStr("known_bad");
/// Transaction was suppressed via HashRouter (duplicate, not flagged bad).
inline constexpr auto suppressed = makeStr("suppressed");
/// Transaction was rejected because it carried tfInnerBatchTxn, which
/// must never appear in network-relayed traffic.
inline constexpr auto rejectedInnerBatch = makeStr("rejected_inner_batch");
/// Transaction was dropped because the validated ledger is too old to
/// confidently apply new transactions (server is out of sync).
inline constexpr auto droppedNoSync = makeStr("dropped_no_sync");
/// Transaction was dropped because the local job queue for jtTRANSACTION
/// is at MAX_TRANSACTIONS — backpressure on the receive side.
inline constexpr auto droppedQueueFull = makeStr("dropped_queue_full");
} // namespace val
} // namespace xrpl::telemetry::tx_span

View File

@@ -1,5 +1,6 @@
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/app/consensus/ConsensusSpanNames.h>
#include <xrpld/app/consensus/RCLCxPeerPos.h>
#include <xrpld/app/consensus/RCLValidations.h>
#include <xrpld/app/ledger/InboundLedgers.h>
@@ -1474,6 +1475,7 @@ PeerImp::handleTransaction(
*/
if (stx->isFlag(tfInnerBatchTxn))
{
span->setAttribute(tx_span::attr::txStatus, tx_span::val::rejectedInnerBatch);
JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
"tfInnerBatchTxn (handleTransaction).";
fee_.update(Resource::feeModerateBurdenPeer, "inner batch txn");
@@ -1494,12 +1496,20 @@ PeerImp::handleTransaction(
fee_.update(Resource::feeUselessData, "known bad");
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
}
// Erase only if the server has seen this tx. If the server has not
// seen this tx then the tx could not has been queued for this peer.
else if (eraseTxQueue && txReduceRelayEnabled())
else
{
removeTxQueue(txID);
// Recently-seen but not flagged bad — this is the plain
// duplicate-suppression path. Mark it explicitly so the
// span never exits as "new".
span->setAttribute(tx_span::attr::txStatus, tx_span::val::suppressed);
// Erase only if the server has seen this tx. If the server
// has not seen this tx then the tx could not have been
// queued for this peer.
if (eraseTxQueue && txReduceRelayEnabled())
{
removeTxQueue(txID);
}
}
overlay_.reportInboundTraffic(
@@ -1532,10 +1542,12 @@ PeerImp::handleTransaction(
if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min)
{
span->setAttribute(tx_span::attr::txStatus, tx_span::val::droppedNoSync);
JLOG(p_journal_.trace()) << "No new transactions until synchronized";
}
else if (app_.getJobQueue().getJobCount(jtTRANSACTION) > app_.config().MAX_TRANSACTIONS)
{
span->setAttribute(tx_span::attr::txStatus, tx_span::val::droppedQueueFull);
overlay_.incJqTransOverflow();
JLOG(p_journal_.info()) << "Transaction queue is full";
}
@@ -1967,8 +1979,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
// Create a receive span that links to the sender's trace context
// (if propagated). shared_ptr keeps it alive across the job boundary.
auto span = std::make_shared<telemetry::SpanGuard>(telemetry::proposalReceiveSpan(set));
span->setAttribute("xrpl.consensus.trusted", isTrusted);
span->setAttribute("xrpl.consensus.round", static_cast<int64_t>(set.proposeseq()));
span->setAttribute(telemetry::consensus_span::attr::trusted, isTrusted);
span->setAttribute(
telemetry::consensus_span::attr::round, static_cast<int64_t>(set.proposeseq()));
std::weak_ptr<PeerImp> const weak = shared_from_this();
app_.getJobQueue().addJob(
@@ -2552,11 +2565,11 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
// Create a receive span that links to the sender's trace context
// (if propagated). shared_ptr keeps it alive across the job boundary.
auto span = std::make_shared<telemetry::SpanGuard>(telemetry::validationReceiveSpan(*m));
span->setAttribute("xrpl.consensus.trusted", isTrusted);
span->setAttribute(telemetry::consensus_span::attr::trusted, isTrusted);
if (val->isFieldPresent(sfLedgerSequence))
{
span->setAttribute(
"xrpl.consensus.ledger.seq",
telemetry::consensus_span::attr::ledgerSeq,
static_cast<int64_t>(val->getFieldU32(sfLedgerSequence)));
}