feat(telemetry): instrument missing critical/medium RPC span paths

Add spans to previously uninstrumented error and validation paths:

- gRPC: span in CallData::process(coro) with method name attribute,
  covers all 4 gRPC endpoints (GetLedger, GetLedgerData, etc.)
- WebSocket parse errors: span in onWSMessage() for invalid JSON
- WebSocket upgrade failures: span in onHandoff() try/catch
- Command dispatch rejections: span in doCommand() when fillHandler()
  fails (unknown command, too busy, permission denied)

New files: GrpcSpanNames.h (gRPC span constants)
Modified: GRPCServer.h (name_ member), RpcSpanNames.h (wsUpgrade op,
updated coverage diagram)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-04-20 14:47:25 +01:00
parent aeacede345
commit 759d425eb7
6 changed files with 142 additions and 11 deletions

View File

@@ -1,9 +1,11 @@
#include <xrpld/app/main/GRPCServer.h>
#include <xrpld/app/main/GrpcSpanNames.h>
#include <xrpld/core/ConfigSections.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/resource/Fees.h>
#include <xrpl/telemetry/SpanGuard.h>
namespace xrpl {
@@ -47,7 +49,8 @@ GRPCServerImpl::CallData<Request, Response>::CallData(
Forward<Request, Response> forward,
RPC::Condition requiredCondition,
Resource::Charge loadType,
std::vector<boost::asio::ip::address> const& secureGatewayIPs)
std::vector<boost::asio::ip::address> const& secureGatewayIPs,
std::string_view name)
: service_(service)
, cq_(cq)
, finished_(false)
@@ -59,6 +62,7 @@ GRPCServerImpl::CallData<Request, Response>::CallData(
, requiredCondition_(requiredCondition)
, loadType_(std::move(loadType))
, secureGatewayIPs_(secureGatewayIPs)
, name_(name)
{
// Bind a listener. When a request is received, "this" will be returned
// from CompletionQueue::Next
@@ -116,12 +120,18 @@ template <class Request, class Response>
void
GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::Coro> coro)
{
using namespace telemetry;
auto span =
SpanGuard::span(TraceCategory::Rpc, grpc_span::prefix::grpc, grpc_span::op::request);
span.setAttribute(grpc_span::attr::method, name_);
try
{
auto usage = getUsage();
bool const isUnlimited = clientIsUnlimited();
if (!isUnlimited && usage.disconnect(app_.getJournal("gRPCServer")))
{
span.setError("resource_exhausted");
grpc::Status const status{
grpc::StatusCode::RESOURCE_EXHAUSTED, "usage balance exceeds threshold"};
responder_.FinishWithError(status, this);
@@ -167,6 +177,7 @@ GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::C
if (conditionMetRes != rpcSUCCESS)
{
RPC::ErrorInfo const errorInfo = RPC::get_error_info(conditionMetRes);
span.setError(errorInfo.token.c_str());
grpc::Status const status{
grpc::StatusCode::FAILED_PRECONDITION, errorInfo.message.c_str()};
responder_.FinishWithError(status, this);
@@ -175,12 +186,14 @@ GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::C
{
std::pair<Response, grpc::Status> result = handler_(context);
setIsUnlimited(result.first, isUnlimited);
span.setOk();
responder_.Finish(result.first, result.second, this);
}
}
}
catch (std::exception const& ex)
{
span.recordException(ex);
grpc::Status const status{grpc::StatusCode::INTERNAL, ex.what()};
responder_.FinishWithError(status, this);
}
@@ -458,7 +471,8 @@ GRPCServerImpl::setupListeners()
&org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedger,
RPC::NO_CONDITION,
Resource::feeMediumBurdenRPC,
secureGatewayIPs_));
secureGatewayIPs_,
"GetLedger"));
}
{
using cd = CallData<
@@ -475,7 +489,8 @@ GRPCServerImpl::setupListeners()
&org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerData,
RPC::NO_CONDITION,
Resource::feeMediumBurdenRPC,
secureGatewayIPs_));
secureGatewayIPs_,
"GetLedgerData"));
}
{
using cd = CallData<
@@ -492,7 +507,8 @@ GRPCServerImpl::setupListeners()
&org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerDiff,
RPC::NO_CONDITION,
Resource::feeMediumBurdenRPC,
secureGatewayIPs_));
secureGatewayIPs_,
"GetLedgerDiff"));
}
{
using cd = CallData<
@@ -509,7 +525,8 @@ GRPCServerImpl::setupListeners()
&org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerEntry,
RPC::NO_CONDITION,
Resource::feeMediumBurdenRPC,
secureGatewayIPs_));
secureGatewayIPs_,
"GetLedgerEntry"));
}
return requests;
}

View File

@@ -13,6 +13,8 @@
#include <grpcpp/grpcpp.h>
#include <string_view>
namespace xrpl {
// Interface that CallData implements
@@ -174,6 +176,9 @@ private:
std::vector<boost::asio::ip::address> const& secureGatewayIPs_;
/// Human-readable name for telemetry spans (e.g. "GetLedger").
std::string_view name_;
public:
virtual ~CallData() = default;
@@ -189,7 +194,8 @@ private:
Forward<Request, Response> forward,
RPC::Condition requiredCondition,
Resource::Charge loadType,
std::vector<boost::asio::ip::address> const& secureGatewayIPs);
std::vector<boost::asio::ip::address> const& secureGatewayIPs,
std::string_view name = "");
CallData(CallData const&) = delete;

View File

@@ -0,0 +1,64 @@
#pragma once
/** Compile-time span name constants for the gRPC subsystem.
*
* All span prefixes, operation names, and attribute keys used by gRPC
* tracing call sites are defined here. Built on the StaticStr/join()
* primitives from <xrpl/telemetry/SpanNames.h>.
*
* Span hierarchy:
*
* +-------------------------------------------------------+
* | grpc.request |
* | CallData::process(coro) |
* | attrs: method, role, status |
* +-------------------------------------------------------+
*
* Unlike the HTTP/WS RPC path, gRPC has a flat single-span structure
* per request since each CallData handles exactly one RPC method.
*/
#include <xrpl/telemetry/SpanNames.h>
namespace xrpl {
namespace telemetry {
namespace grpc_span {
// ===== Span prefixes =======================================================
namespace prefix {
/// "grpc" — root prefix for gRPC transport spans.
inline constexpr auto grpc = makeStr("grpc");
} // namespace prefix
// ===== Span operation suffixes =============================================
namespace op {
inline constexpr auto request = makeStr("request");
} // namespace op
// ===== Attribute keys ======================================================
namespace attr {
inline constexpr auto xrplGrpc = join(seg::xrpl, makeStr("grpc"));
/// "xrpl.grpc.method"
inline constexpr auto method = join(xrplGrpc, makeStr("method"));
/// "xrpl.grpc.role"
inline constexpr auto role = join(xrplGrpc, makeStr("role"));
/// "xrpl.grpc.status"
inline constexpr auto status = join(xrplGrpc, makeStr("status"));
} // namespace attr
// ===== Attribute values ====================================================
namespace val {
using telemetry::attr_val::error;
using telemetry::attr_val::success;
inline constexpr auto resourceExhausted = makeStr("resource_exhausted");
inline constexpr auto failedPrecondition = makeStr("failed_precondition");
} // namespace val
} // namespace grpc_span
} // namespace telemetry
} // namespace xrpl

View File

@@ -209,6 +209,14 @@ doCommand(RPC::JsonContext& context, Json::Value& result)
Handler const* handler = nullptr;
if (auto error = fillHandler(context, handler))
{
std::string const cmdName = context.params.isMember(jss::command)
? context.params[jss::command].asString()
: context.params.isMember(jss::method) ? context.params[jss::method].asString()
: "unknown";
auto span = SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::command, cmdName);
span.setAttribute(rpc_span::attr::command, cmdName.c_str());
span.setError(get_error_info(error).token.c_str());
inject_error(error, result);
return error;
}

View File

@@ -64,21 +64,48 @@
* | +--------------------------------------------------+ |
* +-------------------------------------------------------+
*
* WebSocket error paths:
*
* +-------------------------------------------------------+
* | rpc.ws_message (error: invalid_json) |
* | ServerHandler::onWSMessage() — parse failure |
* +-------------------------------------------------------+
*
* +-------------------------------------------------------+
* | rpc.ws_upgrade |
* | ServerHandler::onHandoff() — upgrade try/catch |
* +-------------------------------------------------------+
*
* Command dispatch error path:
*
* +-------------------------------------------------------+
* | rpc.command.{name} (error: too_busy/unknown/etc) |
* | RPC::doCommand() — fillHandler() rejection |
* +-------------------------------------------------------+
*
* gRPC path (see GrpcSpanNames.h for constants):
*
* +-------------------------------------------------------+
* | grpc.request |
* | CallData::process(coro) |
* | attrs: method, status |
* +-------------------------------------------------------+
*
* Covered paths:
* - HTTP JSON-RPC (single and batch requests)
* - WebSocket RPC commands
* - WebSocket message parse errors (invalid JSON, oversized)
* - WebSocket upgrade failures (protocol handshake errors)
* - Admin CLI (connects via HTTP internally)
* - Command dispatch rejections (unknown cmd, too busy, no perm)
* - gRPC endpoints (GetLedger, GetLedgerData, GetLedgerDiff,
* GetLedgerEntry)
* - Command execution: timing, success/failure, exceptions
* - Per-command attributes: name, API version, role, status
*
* Known gaps (not yet instrumented):
* - gRPC endpoints (GRPCServer.cpp) — no spans at all
* - Early validation errors in processRequest() before rpc.process
* span (malformed JSON, auth failures, oversized requests)
* - fillHandler() rejections in doCommand() before rpc.command
* span (unknown command, too busy, permission denied)
* - WebSocket upgrade failures in onHandoff()
* - WebSocket message parse errors in onWSMessage()
* - Subscription push notifications (server-initiated, not RPC)
*/
@@ -101,6 +128,7 @@ inline constexpr auto command = join(seg::rpc, makeStr("command"));
namespace op {
inline constexpr auto wsMessage = makeStr("ws_message");
inline constexpr auto wsUpgrade = makeStr("ws_upgrade");
inline constexpr auto httpRequest = makeStr("http_request");
inline constexpr auto process = makeStr("process");
} // namespace op

View File

@@ -191,13 +191,17 @@ ServerHandler::onHandoff(
if (!is_ws)
return statusRequestResponse(request, http::status::unauthorized);
auto span =
SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::rpc, rpc_span::op::wsUpgrade);
std::shared_ptr<WSSession> ws;
try
{
ws = session.websocketUpgrade();
span.setOk();
}
catch (std::exception const& e)
{
span.recordException(e);
JLOG(m_journal.error()) << "Exception upgrading websocket: " << e.what() << "\n";
return statusRequestResponse(request, http::status::internal_server_error);
}
@@ -308,6 +312,10 @@ ServerHandler::onWSMessage(
auto const size = boost::asio::buffer_size(buffers);
if (size > RPC::Tuning::maxRequestSize || !Json::Reader{}.parse(jv, buffers) || !jv.isObject())
{
auto span =
SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::rpc, rpc_span::op::wsMessage);
span.setError("invalid_json");
Json::Value jvResult(Json::objectValue);
jvResult[jss::type] = jss::error;
jvResult[jss::error] = "jsonInvalid";