Merge branch 'pratik/otel-phase2-rpc-tracing' into pratik/otel-phase3-tx-tracing

This commit is contained in:
Pratik Mankawde
2026-06-10 16:05:29 +01:00
11 changed files with 111 additions and 44 deletions

View File

@@ -47,6 +47,7 @@ libxrpl.shamap > xrpl.nodestore
libxrpl.shamap > xrpl.protocol
libxrpl.shamap > xrpl.shamap
libxrpl.telemetry > xrpl.basics
libxrpl.telemetry > xrpl.config
libxrpl.telemetry > xrpl.telemetry
libxrpl.tx > xrpl.basics
libxrpl.tx > xrpl.conditions
@@ -251,7 +252,7 @@ xrpl.server > xrpl.shamap
xrpl.shamap > xrpl.basics
xrpl.shamap > xrpl.nodestore
xrpl.shamap > xrpl.protocol
xrpl.telemetry > xrpl.basics
xrpl.telemetry > xrpl.config
xrpl.telemetry > xrpld.consensus
xrpl.tx > xrpl.basics
xrpl.tx > xrpl.core

View File

@@ -206,7 +206,7 @@ target_link_libraries(
add_module(xrpl telemetry)
target_link_libraries(
xrpl.libxrpl.telemetry
PUBLIC xrpl.libxrpl.basics xrpl.libxrpl.beast
PUBLIC xrpl.libxrpl.basics xrpl.libxrpl.beast xrpl.libxrpl.config
)
if(telemetry)
target_link_libraries(

View File

@@ -96,7 +96,7 @@ datasources:
tag: command
operator: "="
scope: span
type: static
type: dynamic
- id: rpc-status
tag: rpc_status
operator: "="

View File

@@ -83,8 +83,8 @@
The OTel SDK's TracerProvider and Tracer are internally thread-safe.
*/
#include <xrpl/basics/BasicConfig.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/config/BasicConfig.h>
#include <atomic>
#include <chrono>
@@ -148,11 +148,11 @@ public:
std::string serviceName = "xrpld";
/** OTel resource attribute `service.version` (set from BuildInfo). */
std::string serviceVersion{};
std::string serviceVersion;
/** OTel resource attribute `service.instance.id` (defaults to node
public key). */
std::string serviceInstanceId{};
std::string serviceInstanceId;
/** OTLP/HTTP endpoint URL where spans are sent. */
std::string exporterEndpoint = "http://localhost:4318/v1/traces";
@@ -161,7 +161,7 @@ public:
bool useTls = false;
/** Path to a CA certificate bundle for TLS verification. */
std::string tlsCertPath{};
std::string tlsCertPath;
/** Head-based sampling ratio. Intentionally fixed at 1.0 (sample
everything) and NOT read from config. A per-node ratio would let

View File

@@ -449,6 +449,14 @@ SpanGuard::discard()
{
gTlDiscardCurrentSpan = true;
impl_->span->End();
// Clear here so discard() owns the flag's whole lifetime
// (set -> End -> clear) in one scope, rather than relying on
// FilteringSpanProcessor::OnEnd() to clear it. Today every valid guard
// wraps a recording span (head sampling is 1.0), so OnEnd() always runs
// and clearing here is equivalent — but colocating set and clear keeps
// the flag leak-proof if a later phase can hand back a non-recording
// span (e.g. honoring a non-sampled remote parent during propagation).
gTlDiscardCurrentSpan = false;
impl_->span = nullptr; // prevent ~Impl from calling End() again
impl_.reset();
}

View File

@@ -123,8 +123,7 @@ public:
{
// SpanGuard::discard() set the flag on this thread just before
// calling Span::End(), which invokes OnEnd() synchronously.
// Clear the flag and drop the span.
gTlDiscardCurrentSpan = false;
// Drop the span.
return;
}
delegate_->OnEnd(std::move(span));

View File

@@ -7,7 +7,7 @@
See cfg/xrpld-example.cfg for the full list of available options.
*/
#include <xrpl/basics/BasicConfig.h>
#include <xrpl/config/BasicConfig.h>
#include <xrpl/telemetry/Telemetry.h>
#include <chrono>

View File

@@ -177,7 +177,11 @@ private:
void
processSession(std::shared_ptr<Session> const&, std::shared_ptr<JobQueue::Coro> coro);
void
/** Process an RPC request and write the reply to `output`.
@return false if the request resulted in an error response, true
otherwise. Lets the caller's enclosing span reflect the outcome.
*/
bool
processRequest(
Port const& port,
std::string const& request,

View File

@@ -192,8 +192,17 @@ callMethod(JsonContext& context, Method method, std::string const& name, Object&
rpc_span::attr::rpcStatus,
ret ? std::string_view{rpc_span::val::error}
: std::string_view{rpc_span::val::success});
if (!ret)
// Reflect the result in the OTel span status, not just the attribute,
// so non-exception RPC errors (rpcTOO_BUSY, rpcNO_PERMISSION, ...) are
// visible to {status.code=error} queries.
if (ret)
{
span.setError(rpc_span::val::error);
}
else
{
span.setOk();
}
return ret;
}
catch (std::exception& e)
@@ -238,6 +247,7 @@ doCommand(RPC::JsonContext& context, json::Value& result)
// "unknown" name only when the request truly omits both fields.
auto span = SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::command, cmdName);
span.setAttribute(rpc_span::attr::command, cmdName.c_str());
span.setAttribute(rpc_span::attr::rpcStatus, rpc_span::val::error);
span.setError(getErrorInfo(error).token.cStr());
injectError(error, result);

View File

@@ -159,7 +159,7 @@ using telemetry::attr_val::error;
using telemetry::attr_val::success;
inline constexpr auto admin = makeStr("admin");
inline constexpr auto user = makeStr("user");
inline constexpr auto unknownCommand = makeStr("unknown_command");
inline constexpr auto unknownCommand = makeStr("unknown");
/// "invalid_json" — WS message parse failure or oversize.
inline constexpr auto invalidJson = makeStr("invalid_json");
} // namespace val

View File

@@ -444,6 +444,8 @@ ServerHandler::processSession(
session->close({boost::beast::websocket::policy_error, "threshold exceeded"});
// FIX: This rpcError is not delivered since the session
// was just closed.
span.setAttribute(rpc_span::attr::rpcStatus, rpc_span::val::error);
span.setError("resource threshold exceeded");
return rpcError(RpcSlowDown);
}
@@ -475,6 +477,8 @@ ServerHandler::processSession(
jr[jss::api_version] = jv[jss::api_version];
is->getConsumer().charge(Resource::kFeeMalformedRpc);
span.setAttribute(rpc_span::attr::rpcStatus, rpc_span::val::error);
span.setError(jr[jss::error].asString());
return jr;
}
@@ -555,12 +559,19 @@ ServerHandler::processSession(
}
jr[jss::request] = rq;
// Mark the span according to the final result. Doing it here (rather
// than an unconditional setOk later) ensures error responses — from
// doCommand, a FORBID role, or the catch block above — are not
// overwritten as OK.
span.setAttribute(rpc_span::attr::rpcStatus, rpc_span::val::error);
span.setError(jr[jss::error].asString());
}
else
{
if (jr[jss::result].isMember("forwarded") && jr[jss::result]["forwarded"])
jr = jr[jss::result];
jr[jss::status] = jss::success;
span.setOk();
}
if (jv.isMember(jss::id))
@@ -573,7 +584,6 @@ ServerHandler::processSession(
jr[jss::api_version] = jv[jss::api_version];
jr[jss::type] = jss::response;
span.setOk();
return jr;
}
@@ -589,7 +599,7 @@ ServerHandler::processSession(
auto const requestBody = ::xrpl::buffersToString(session->request().body().data());
span.setAttribute(rpc_span::attr::requestPayloadSize, static_cast<int64_t>(requestBody.size()));
processRequest(
bool const ok = processRequest(
session->port(),
requestBody,
session->remoteAddress().atPort(0),
@@ -611,7 +621,15 @@ ServerHandler::processSession(
{
session->close(true);
}
span.setOk();
// Reflect the request outcome on the wrapper span instead of always OK.
if (ok)
{
span.setOk();
}
else
{
span.setError(rpc_span::val::error);
}
}
static json::Value
@@ -630,7 +648,7 @@ constexpr json::Int kServerOverloaded = -32604;
constexpr json::Int kForbidden = -32605;
constexpr json::Int kWrongVersion = -32606;
void
bool
ServerHandler::processRequest(
Port const& port,
std::string const& request,
@@ -643,18 +661,30 @@ ServerHandler::processRequest(
auto span = SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::rpc, rpc_span::op::process);
auto rpcJ = app_.getJournal("RPC");
// Tracks whether any failure occurred. Set on every error path (early
// returns, the catch block, and per-request error replies) and used at the
// end to mark the span status. The HTTP status code alone is insufficient:
// it stays 200 for batch responses and for ripplerpc < 3.0, so relying on
// it would let payload-level errors end the span as successful.
bool spanHadError = false;
// Marks the span as failed before sending an error reply, so the
// early-return validation paths below are not later seen as successful
// (the span would otherwise end UNSET, invisible to {status.code=error}).
auto httpReplyError = [&](int status, std::string const& message) {
spanHadError = true;
span.setError(message);
httpReply(status, message, output, rpcJ);
};
json::Value jsonOrig;
{
json::Reader reader;
if ((request.size() > RPC::Tuning::kMaxRequestSize) || !reader.parse(request, jsonOrig) ||
!jsonOrig || !jsonOrig.isObject())
{
httpReply(
400,
"Unable to parse request: " + reader.getFormattedErrorMessages(),
output,
rpcJ);
return;
httpReplyError(400, "Unable to parse request: " + reader.getFormattedErrorMessages());
return false;
}
}
@@ -665,8 +695,8 @@ ServerHandler::processRequest(
batch = true;
if (!jsonOrig.isMember(jss::params) || !jsonOrig[jss::params].isArray())
{
httpReply(400, "Malformed batch request", output, rpcJ);
return;
httpReplyError(400, "Malformed batch request");
return false;
}
size = jsonOrig[jss::params].size();
}
@@ -707,8 +737,8 @@ ServerHandler::processRequest(
{
if (!batch)
{
httpReply(400, jss::invalid_API_version.cStr(), output, rpcJ);
return;
httpReplyError(400, jss::invalid_API_version.cStr());
return false;
}
json::Value r(json::ValueType::Object);
r[jss::request] = jsonRPC;
@@ -750,8 +780,8 @@ ServerHandler::processRequest(
{
if (!batch)
{
httpReply(503, "Server is overloaded", output, rpcJ);
return;
httpReplyError(503, "Server is overloaded");
return false;
}
json::Value r = jsonRPC;
r[jss::error] = makeJsonError(kServerOverloaded, "Server is overloaded");
@@ -765,8 +795,8 @@ ServerHandler::processRequest(
usage.charge(Resource::kFeeMalformedRpc);
if (!batch)
{
httpReply(403, "Forbidden", output, rpcJ);
return;
httpReplyError(403, "Forbidden");
return false;
}
json::Value r = jsonRPC;
r[jss::error] = makeJsonError(kForbidden, "Forbidden");
@@ -779,8 +809,8 @@ ServerHandler::processRequest(
usage.charge(Resource::kFeeMalformedRpc);
if (!batch)
{
httpReply(400, "Null method", output, rpcJ);
return;
httpReplyError(400, "Null method");
return false;
}
json::Value r = jsonRPC;
r[jss::error] = makeJsonError(kMethodNotFound, "Null method");
@@ -794,8 +824,8 @@ ServerHandler::processRequest(
usage.charge(Resource::kFeeMalformedRpc);
if (!batch)
{
httpReply(400, "method is not string", output, rpcJ);
return;
httpReplyError(400, "method is not string");
return false;
}
json::Value r = jsonRPC;
r[jss::error] = makeJsonError(kMethodNotFound, "method is not string");
@@ -809,8 +839,8 @@ ServerHandler::processRequest(
usage.charge(Resource::kFeeMalformedRpc);
if (!batch)
{
httpReply(400, "method is empty", output, rpcJ);
return;
httpReplyError(400, "method is empty");
return false;
}
json::Value r = jsonRPC;
r[jss::error] = makeJsonError(kMethodNotFound, "method is empty");
@@ -835,8 +865,8 @@ ServerHandler::processRequest(
else if (!params.isArray() || params.size() != 1)
{
usage.charge(Resource::kFeeMalformedRpc);
httpReply(400, "params unparsable", output, rpcJ);
return;
httpReplyError(400, "params unparsable");
return false;
}
else
{
@@ -844,8 +874,8 @@ ServerHandler::processRequest(
if (!params.isObjectOrNull())
{
usage.charge(Resource::kFeeMalformedRpc);
httpReply(400, "params unparsable", output, rpcJ);
return;
httpReplyError(400, "params unparsable");
return false;
}
}
}
@@ -862,8 +892,8 @@ ServerHandler::processRequest(
usage.charge(Resource::kFeeMalformedRpc);
if (!batch)
{
httpReply(400, "ripplerpc is not a string", output, rpcJ);
return;
httpReplyError(400, "ripplerpc is not a string");
return false;
}
json::Value r = jsonRPC;
@@ -922,6 +952,7 @@ ServerHandler::processRequest(
<< " when processing request: " << json::Compact{json::Value{params}};
span.recordException(ex);
span.setAttribute(rpc_span::attr::rpcStatus, rpc_span::val::error);
spanHadError = true;
// LCOV_EXCL_STOP
}
@@ -938,6 +969,7 @@ ServerHandler::processRequest(
{
if (result.isMember(jss::error))
{
spanHadError = true;
result[jss::status] = jss::error;
result["code"] = result[jss::error_code];
result["message"] = result[jss::error_message];
@@ -958,6 +990,7 @@ ServerHandler::processRequest(
// received.
if (result.isMember(jss::error))
{
spanHadError = true;
auto rq = params;
if (rq.isObject())
@@ -1053,8 +1086,20 @@ ServerHandler::processRequest(
}
}
span.setOk();
// Mark the span error if any request failed or the HTTP status is an error.
// spanHadError catches payload-level errors that httpStatus misses (batch
// responses and ripplerpc < 3.0 always return HTTP 200).
if (spanHadError || httpStatus >= 400)
{
span.setAttribute(rpc_span::attr::rpcStatus, rpc_span::val::error);
span.setError(rpc_span::val::error);
}
else
{
span.setOk();
}
httpReply(httpStatus, response, output, rpcJ);
return !spanHadError;
}
//------------------------------------------------------------------------------