Merge branch 'pratik/otel-phase3-tx-tracing' into pratik/otel-phase4-consensus-tracing

This commit is contained in:
Pratik Mankawde
2026-06-03 16:02:26 +01:00
11 changed files with 87 additions and 11 deletions

View File

@@ -2,16 +2,22 @@
#ifdef XRPL_ENABLE_TELEMETRY
#include <xrpl/proto/xrpl.pb.h>
#include <xrpl/telemetry/TraceContextPropagator.h>
#include <opentelemetry/context/context.h>
#include <opentelemetry/nostd/shared_ptr.h>
#include <opentelemetry/nostd/span.h>
#include <opentelemetry/trace/context.h>
#include <opentelemetry/trace/default_span.h>
#include <opentelemetry/trace/span.h>
#include <opentelemetry/trace/span_context.h>
#include <opentelemetry/trace/span_id.h>
#include <opentelemetry/trace/span_metadata.h>
#include <opentelemetry/trace/trace_flags.h>
#include <opentelemetry/trace/trace_id.h>
#include <cstdint>
#include <cstring>
namespace trace = opentelemetry::trace;
@@ -37,10 +43,11 @@ TEST(TraceContextPropagator, round_trip)
0x10};
std::uint8_t spanIdBuf[8] = {0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x11, 0x22};
trace::TraceId traceId(opentelemetry::nostd::span<uint8_t const, 16>(traceIdBuf, 16));
trace::SpanId spanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBuf, 8));
trace::TraceFlags flags(trace::TraceFlags::kIsSampled);
trace::SpanContext spanCtx(traceId, spanId, flags, true);
trace::TraceId const traceId(
opentelemetry::nostd::span<std::uint8_t const, 16>(traceIdBuf, 16));
trace::SpanId const spanId(opentelemetry::nostd::span<std::uint8_t const, 8>(spanIdBuf, 8));
trace::TraceFlags const flags(trace::TraceFlags::kIsSampled);
trace::SpanContext const spanCtx(traceId, spanId, flags, true);
auto ctx = opentelemetry::context::Context{}.SetValue(
trace::kSpanKey,
@@ -53,7 +60,7 @@ TEST(TraceContextPropagator, round_trip)
EXPECT_EQ(proto.trace_id().size(), 16u);
EXPECT_TRUE(proto.has_span_id());
EXPECT_EQ(proto.span_id().size(), 8u);
EXPECT_EQ(proto.trace_flags(), static_cast<uint32_t>(trace::TraceFlags::kIsSampled));
EXPECT_EQ(proto.trace_flags(), static_cast<std::uint32_t>(trace::TraceFlags::kIsSampled));
EXPECT_EQ(std::memcmp(proto.trace_id().data(), traceIdBuf, 16), 0);
EXPECT_EQ(std::memcmp(proto.span_id().data(), spanIdBuf, 8), 0);
@@ -71,7 +78,7 @@ TEST(TraceContextPropagator, round_trip)
TEST(TraceContextPropagator, extract_empty_protobuf)
{
protocol::TraceContext proto;
protocol::TraceContext const proto;
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(ctx);
if (span)
@@ -124,10 +131,10 @@ TEST(TraceContextPropagator, flags_preservation)
std::uint8_t spanIdBuf[8] = {1, 2, 3, 4, 5, 6, 7, 8};
// Test with flags NOT sampled (flags = 0)
trace::TraceFlags flags(0);
trace::SpanContext spanCtx(
trace::TraceId(opentelemetry::nostd::span<uint8_t const, 16>(traceIdBuf, 16)),
trace::SpanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBuf, 8)),
trace::TraceFlags const flags(0);
trace::SpanContext const spanCtx(
trace::TraceId(opentelemetry::nostd::span<std::uint8_t const, 16>(traceIdBuf, 16)),
trace::SpanId(opentelemetry::nostd::span<std::uint8_t const, 8>(spanIdBuf, 8)),
flags,
true);

View File

@@ -1334,6 +1334,15 @@ NetworkOPsImp::processTransaction(
auto span = std::make_shared<SpanGuard>(txProcessSpan(transaction->getID()));
span->setAttribute(tx_span::attr::txHash, to_string(transaction->getID()).c_str());
span->setAttribute(tx_span::attr::local, bLocal);
if (auto const& stx = transaction->getSTransaction())
{
if (auto const* fmt = TxFormats::getInstance().findByType(stx->getTxnType()))
span->setAttribute(tx_span::attr::txType, fmt->getName().c_str());
span->setAttribute(
tx_span::attr::fee, static_cast<int64_t>(stx->getFieldAmount(sfFee).xrp().drops()));
span->setAttribute(
tx_span::attr::sequence, static_cast<int64_t>(stx->getSeqProxy().value()));
}
auto ev = jobQueue_.makeLoadEvent(JtTxnProc, "ProcessTXN");
@@ -1554,6 +1563,11 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
auto newOL = registry_.get().getOpenLedger().current();
for (TransactionStatus const& e : transactions)
{
if (e.span && *e.span)
{
e.span->setAttribute(tx_span::attr::terResult, transToken(e.result).c_str());
e.span->setAttribute(tx_span::attr::applied, e.applied);
}
e.transaction->clearSubmitResult();
if (e.applied)

View File

@@ -55,6 +55,16 @@ inline constexpr auto suppressed = makeStr("suppressed");
inline constexpr auto txStatus = makeStr("tx_status");
/// "peer_version" — version of peer that sent the tx.
inline constexpr auto peerVersion = makeStr("peer_version");
/// "tx_type" — transaction type name (e.g., "Payment", "OfferCreate").
inline constexpr auto txType = makeStr("tx_type");
/// "fee" — transaction fee in drops.
inline constexpr auto fee = makeStr("fee");
/// "sequence" — transaction sequence number.
inline constexpr auto sequence = makeStr("sequence");
/// "ter_result" — engine result code after application.
inline constexpr auto terResult = makeStr("ter_result");
/// "applied" — whether the transaction was applied to the ledger.
inline constexpr auto applied = makeStr("applied");
} // namespace attr
// ===== Attribute values ====================================================

View File

@@ -28,6 +28,7 @@
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/SeqProxy.h>
#include <xrpl/protocol/TER.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/Units.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/protocol/jss.h>
@@ -741,6 +742,8 @@ TxQ::apply(
auto span =
SpanGuard::span(TraceCategory::Transactions, txq_span::prefix::txq, txq_span::op::enqueue);
span.setAttribute(txq_span::attr::txHash, to_string(tx->getTransactionID()).c_str());
if (auto const* fmt = TxFormats::getInstance().findByType(tx->getTxnType()))
span.setAttribute(txq_span::attr::txType, fmt->getName().c_str());
NumberSO const stNumberSO{view.rules().enabled(fixUniversalNumber)};
@@ -1477,6 +1480,7 @@ TxQ::accept(Application& app, OpenView& view)
if (didApply)
{
txSpan.setAttribute(txq_span::attr::txqStatus, txq_span::val::applied);
// Remove the candidate from the queue
JLOG(j_.debug()) << "Queued transaction " << candidateIter->txID
<< " applied successfully with " << transToken(txnResult)
@@ -1497,12 +1501,14 @@ TxQ::accept(Application& app, OpenView& view)
{
account.dropPenalty = true;
}
txSpan.setAttribute(txq_span::attr::txqStatus, txq_span::val::failed);
JLOG(j_.debug()) << "Queued transaction " << candidateIter->txID << " failed with "
<< transToken(txnResult) << ". Remove from queue.";
candidateIter = eraseAndAdvance(candidateIter);
}
else
{
txSpan.setAttribute(txq_span::attr::txqStatus, txq_span::val::retried);
JLOG(j_.debug()) << "Queued transaction " << candidateIter->txID << " failed with "
<< transToken(txnResult) << ". Leave in queue."
<< " Applied: " << didApply << ". Flags: " << candidateIter->flags;
@@ -1598,6 +1604,7 @@ TxQ::accept(Application& app, OpenView& view)
}
}
XRPL_ASSERT(byFee_.size() == startingSize, "xrpl::TxQ::accept : byFee size match");
span.setAttribute(txq_span::attr::ledgerChanged, ledgerChanged);
return ledgerChanged;
}

View File

@@ -93,6 +93,8 @@ inline constexpr auto terCode = makeStr("ter_code");
inline constexpr auto retriesRemaining = makeStr("retries_remaining");
/// "num_cleared" — entries cleared in batch.
inline constexpr auto numCleared = makeStr("num_cleared");
/// "tx_type" — transaction type name (e.g., "Payment", "OfferCreate").
inline constexpr auto txType = makeStr("tx_type");
} // namespace attr
// ===== Attribute values ====================================================

View File

@@ -54,6 +54,7 @@
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/digest.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/protocol/tokens.h>
@@ -1331,6 +1332,8 @@ PeerImp::handleTransaction(
auto span = std::make_shared<SpanGuard>(txReceiveSpan(txID, *m));
span->setAttribute(tx_span::attr::txHash, to_string(txID).c_str());
span->setAttribute(tx_span::attr::peerId, static_cast<int64_t>(id_));
if (auto const* fmt = TxFormats::getInstance().findByType(stx->getTxnType()))
span->setAttribute(tx_span::attr::txType, fmt->getName().c_str());
if (auto const version = getVersion(); !version.empty())
span->setAttribute(tx_span::attr::peerVersion, version.c_str());
// Set defaults for conditional attributes so they are always present

View File

@@ -84,6 +84,12 @@ inline constexpr auto numPaths = makeStr("pathfind_num_paths");
inline constexpr auto numRequests = makeStr("pathfind_num_requests");
/// "pathfind_ledger_index" — pathfind target ledger index.
inline constexpr auto ledgerIndex = makeStr("pathfind_ledger_index");
/// "pathfind_dest_amount" — requested destination amount as string.
inline constexpr auto destAmount = makeStr("pathfind_dest_amount");
/// "pathfind_dest_currency" — destination currency code.
inline constexpr auto destCurrency = makeStr("pathfind_dest_currency");
/// "pathfind_num_source_assets" — candidate source assets count.
inline constexpr auto numSourceAssets = makeStr("pathfind_num_source_assets");
} // namespace attr
} // namespace xrpl::telemetry::pathfind_span

View File

@@ -594,6 +594,8 @@ PathRequest::findPaths(
auto span = SpanGuard::span(
TraceCategory::Rpc, pathfind_span::prefix::pathfind, pathfind_span::op::discover);
span.setAttribute(pathfind_span::attr::searchLevel, static_cast<int64_t>(level));
span.setAttribute(
pathfind_span::attr::numSourceAssets, static_cast<int64_t>(sourceAssets.size()));
std::int64_t totalPaths = 0;
for (auto const& asset : sourceAssets)
@@ -740,6 +742,8 @@ PathRequest::doUpdate(
auto span = SpanGuard::span(
TraceCategory::Rpc, pathfind_span::prefix::pathfind, pathfind_span::op::compute);
span.setAttribute(pathfind_span::attr::fast, fast);
span.setAttribute(pathfind_span::attr::destAmount, saDstAmount_.getFullText().c_str());
span.setAttribute(pathfind_span::attr::destCurrency, to_string(saDstAmount_.asset()).c_str());
JLOG(journal_.debug()) << iIdentifier_ << " update " << (fast ? "fast" : "normal");

View File

@@ -185,6 +185,7 @@ callMethod(JsonContext& context, Method method, std::string const& name, Object&
JLOG(context.j.debug()) << "RPC call " << name << " completed in "
<< ((end - start).count() / 1000000000.0) << "seconds";
perfLog.rpcFinish(name, curId);
span.setAttribute(rpc_span::attr::loadType, context.loadType.label().c_str());
// Status::operator bool() returns true when there IS an error
// (code_ != OK), so the ternary correctly maps error->error, ok->success.
span.setAttribute(

View File

@@ -144,6 +144,12 @@ inline constexpr auto rpcRole = makeStr("rpc_role");
inline constexpr auto rpcStatus = makeStr("rpc_status");
/// "request_payload_size" — bytes of inbound request payload.
inline constexpr auto requestPayloadSize = makeStr("request_payload_size");
/// "is_batch" — whether request is a JSON-RPC batch.
inline constexpr auto isBatch = makeStr("is_batch");
/// "batch_size" — number of sub-requests in a batch.
inline constexpr auto batchSize = makeStr("batch_size");
/// "load_type" — resource cost category after execution.
inline constexpr auto loadType = makeStr("load_type");
} // namespace attr
// ===== Attribute values ====================================================

View File

@@ -63,6 +63,7 @@
#include <algorithm>
#include <cctype>
#include <chrono>
#include <cstdint>
#include <exception>
#include <map>
#include <memory>
@@ -428,6 +429,15 @@ ServerHandler::processSession(
json::Value const& jv)
{
auto span = SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::rpc, rpc_span::op::wsMessage);
if (jv.isMember(jss::command) && jv[jss::command].isString())
{
span.setAttribute(rpc_span::attr::command, jv[jss::command].asString().c_str());
}
else if (jv.isMember(jss::method) && jv[jss::method].isString())
{
span.setAttribute(rpc_span::attr::command, jv[jss::method].asString().c_str());
}
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
if (is->getConsumer().disconnect(journal_))
{
@@ -576,9 +586,12 @@ ServerHandler::processSession(
auto span =
SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::rpc, rpc_span::op::httpRequest);
auto const requestBody = ::xrpl::buffersToString(session->request().body().data());
span.setAttribute(rpc_span::attr::requestPayloadSize, static_cast<int64_t>(requestBody.size()));
processRequest(
session->port(),
::xrpl::buffersToString(session->request().body().data()),
requestBody,
session->remoteAddress().atPort(0),
makeOutput(*session),
coro,
@@ -657,6 +670,9 @@ ServerHandler::processRequest(
}
size = jsonOrig[jss::params].size();
}
span.setAttribute(rpc_span::attr::isBatch, batch);
if (batch)
span.setAttribute(rpc_span::attr::batchSize, static_cast<int64_t>(size));
json::Value reply(batch ? json::ValueType::Array : json::ValueType::Object);
auto const start(std::chrono::high_resolution_clock::now());