From 8707cc7d48fd73ffaea2dda01888239f161a08bf Mon Sep 17 00:00:00 2001 From: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com> Date: Fri, 20 Mar 2026 17:22:12 +0000 Subject: [PATCH] Phase 1c: RPC integration - ServerHandler tracing, telemetry config wiring Co-Authored-By: Claude Opus 4.6 --- .../scripts/levelization/results/ordering.txt | 2 + presentation.md | 280 +++++++++++++ src/xrpld/rpc/detail/RPCHandler.cpp | 9 + src/xrpld/rpc/detail/ServerHandler.cpp | 396 ++++++++++-------- src/xrpld/telemetry/TracingInstrumentation.h | 115 +++++ 5 files changed, 617 insertions(+), 185 deletions(-) create mode 100644 presentation.md create mode 100644 src/xrpld/telemetry/TracingInstrumentation.h diff --git a/.github/scripts/levelization/results/ordering.txt b/.github/scripts/levelization/results/ordering.txt index cb35a7e6cc..4187e99657 100644 --- a/.github/scripts/levelization/results/ordering.txt +++ b/.github/scripts/levelization/results/ordering.txt @@ -268,6 +268,7 @@ xrpld.perflog > xrpl.json xrpld.rpc > xrpl.basics xrpld.rpc > xrpl.core xrpld.rpc > xrpld.core +xrpld.rpc > xrpld.telemetry xrpld.rpc > xrpl.json xrpld.rpc > xrpl.ledger xrpld.rpc > xrpl.net @@ -278,3 +279,4 @@ xrpld.rpc > xrpl.resource xrpld.rpc > xrpl.server xrpld.rpc > xrpl.tx xrpld.shamap > xrpl.shamap +xrpld.telemetry > xrpl.telemetry diff --git a/presentation.md b/presentation.md new file mode 100644 index 0000000000..7a443a635c --- /dev/null +++ b/presentation.md @@ -0,0 +1,280 @@ +# OpenTelemetry Distributed Tracing for rippled + +--- + +## Slide 1: Introduction + +### What is OpenTelemetry? + +OpenTelemetry is an open-source, CNCF-backed observability framework for distributed tracing, metrics, and logs. + +### Why OpenTelemetry for rippled? + +- **End-to-End Transaction Visibility**: Track transactions from submission → consensus → ledger inclusion +- **Cross-Node Correlation**: Follow requests across multiple independent nodes using a unique `trace_id` +- **Consensus Round Analysis**: Understand timing and behavior across validators +- **Incident Debugging**: Correlate events across distributed nodes during issues + +```mermaid +flowchart LR + A["Node A
tx.receive
trace_id: abc123"] --> B["Node B
tx.relay
trace_id: abc123"] --> C["Node C
tx.validate
trace_id: abc123"] --> D["Node D
ledger.apply
trace_id: abc123"] + + style A fill:#1565c0,stroke:#0d47a1,color:#fff + style B fill:#2e7d32,stroke:#1b5e20,color:#fff + style C fill:#2e7d32,stroke:#1b5e20,color:#fff + style D fill:#e65100,stroke:#bf360c,color:#fff +``` + +> **Trace ID: abc123** — All nodes share the same trace, enabling cross-node correlation. + +--- + +## Slide 2: OpenTelemetry vs Open Source Alternatives + +| Feature | OpenTelemetry | Jaeger | Zipkin | SkyWalking | Pinpoint | Prometheus | +| ------------------- | ---------------- | ---------------- | ------------------ | ---------- | ---------- | ---------- | +| **Tracing** | YES | YES | YES | YES | YES | NO | +| **Metrics** | YES | NO | NO | YES | YES | YES | +| **Logs** | YES | NO | NO | YES | NO | NO | +| **C++ SDK** | YES Official | YES (Deprecated) | YES (Unmaintained) | NO | NO | YES | +| **Vendor Neutral** | YES Primary goal | NO | NO | NO | NO | NO | +| **Instrumentation** | Manual + Auto | Manual | Manual | Auto-first | Auto-first | Manual | +| **Backend** | Any (exporters) | Self | Self | Self | Self | Self | +| **CNCF Status** | Incubating | Graduated | NO | Incubating | NO | Graduated | + +> **Why OpenTelemetry?** It's the only actively maintained, full-featured C++ option with vendor neutrality — allowing export to Jaeger, Prometheus, Grafana, or any commercial backend without changing instrumentation. + +--- + +## Slide 3: Comparison with rippled's Existing Solutions + +### Current Observability Stack + +| Aspect | PerfLog (JSON) | StatsD (Metrics) | OpenTelemetry (NEW) | +| --------------------- | --------------------- | --------------------- | --------------------------- | +| **Type** | Logging | Metrics | Distributed Tracing | +| **Scope** | Single node | Single node | **Cross-node** | +| **Data** | JSON log entries | Counters, gauges | Spans with context | +| **Correlation** | By timestamp | By metric name | By `trace_id` | +| **Overhead** | Low (file I/O) | Low (UDP) | Low-Medium (configurable) | +| **Question Answered** | "What happened here?" | "How many? How fast?" | **"What was the journey?"** | + +### Use Case Matrix + +| Scenario | PerfLog | StatsD | OpenTelemetry | +| -------------------------------- | ------- | ------ | ------------- | +| "How many TXs per second?" | ❌ | ✅ | ❌ | +| "Why was this specific TX slow?" | ⚠️ | ❌ | ✅ | +| "Which node delayed consensus?" | ❌ | ❌ | ✅ | +| "Show TX journey across 5 nodes" | ❌ | ❌ | ✅ | + +> **Key Insight**: OpenTelemetry **complements** (not replaces) existing systems. + +--- + +## Slide 4: Architecture + +### High-Level Integration Architecture + +```mermaid +flowchart TB + subgraph rippled["rippled Node"] + subgraph services["Core Services"] + direction LR + RPC["RPC Server
(HTTP/WS)"] ~~~ Overlay["Overlay
(P2P Network)"] ~~~ Consensus["Consensus
(RCLConsensus)"] + end + + Telemetry["Telemetry Module
(OpenTelemetry SDK)"] + + services --> Telemetry + end + + Telemetry -->|OTLP/gRPC| Collector["OTel Collector"] + + Collector --> Tempo["Grafana Tempo"] + Collector --> Jaeger["Jaeger"] + Collector --> Elastic["Elastic APM"] + + style rippled fill:#424242,stroke:#212121,color:#fff + style services fill:#1565c0,stroke:#0d47a1,color:#fff + style Telemetry fill:#2e7d32,stroke:#1b5e20,color:#fff + style Collector fill:#e65100,stroke:#bf360c,color:#fff +``` + +### Context Propagation + +```mermaid +sequenceDiagram + participant Client + participant NodeA as Node A + participant NodeB as Node B + + Client->>NodeA: Submit TX (no context) + Note over NodeA: Creates trace_id: abc123
span: tx.receive + NodeA->>NodeB: Relay TX
(traceparent: abc123) + Note over NodeB: Links to trace_id: abc123
span: tx.relay +``` + +- **HTTP/RPC**: W3C Trace Context headers (`traceparent`) +- **P2P Messages**: Protocol Buffer extension fields + +--- + +## Slide 5: Implementation Plan + +### 5-Phase Rollout (9 Weeks) + +```mermaid +gantt + title Implementation Timeline + dateFormat YYYY-MM-DD + axisFormat Week %W + + section Phase 1 + Core Infrastructure :p1, 2024-01-01, 2w + + section Phase 2 + RPC Tracing :p2, after p1, 2w + + section Phase 3 + Transaction Tracing :p3, after p2, 2w + + section Phase 4 + Consensus Tracing :p4, after p3, 2w + + section Phase 5 + Documentation :p5, after p4, 1w +``` + +### Phase Details + +| Phase | Focus | Key Deliverables | Effort | +| ----- | ------------------- | -------------------------------------------- | ------- | +| 1 | Core Infrastructure | SDK integration, Telemetry interface, Config | 10 days | +| 2 | RPC Tracing | HTTP context extraction, Handler spans | 10 days | +| 3 | Transaction Tracing | Protobuf context, P2P relay propagation | 10 days | +| 4 | Consensus Tracing | Round spans, Proposal/validation tracing | 10 days | +| 5 | Documentation | Runbook, Dashboards, Training | 7 days | + +**Total Effort**: ~47 developer-days (2 developers) + +--- + +## Slide 6: Performance Overhead + +### Estimated System Impact + +| Metric | Overhead | Notes | +| ----------------- | ---------- | ----------------------------------- | +| **CPU** | 1-3% | Span creation and attribute setting | +| **Memory** | 2-5 MB | Batch buffer for pending spans | +| **Network** | 10-50 KB/s | Compressed OTLP export to collector | +| **Latency (p99)** | <2% | With proper sampling configuration | + +### Per-Message Overhead (Context Propagation) + +Each P2P message carries trace context with the following overhead: + +| Field | Size | Description | +| ------------- | ------------- | ----------------------------------------- | +| `trace_id` | 16 bytes | Unique identifier for the entire trace | +| `span_id` | 8 bytes | Current span (becomes parent on receiver) | +| `trace_flags` | 4 bytes | Sampling decision flags | +| `trace_state` | 0-4 bytes | Optional vendor-specific data | +| **Total** | **~32 bytes** | **Added per traced P2P message** | + +```mermaid +flowchart LR + subgraph msg["P2P Message with Trace Context"] + A["Original Message
(variable size)"] --> B["+ TraceContext
(~32 bytes)"] + end + + subgraph breakdown["Context Breakdown"] + C["trace_id
16 bytes"] + D["span_id
8 bytes"] + E["flags
4 bytes"] + F["state
0-4 bytes"] + end + + B --> breakdown + + style A fill:#424242,stroke:#212121,color:#fff + style B fill:#2e7d32,stroke:#1b5e20,color:#fff + style C fill:#1565c0,stroke:#0d47a1,color:#fff + style D fill:#1565c0,stroke:#0d47a1,color:#fff + style E fill:#e65100,stroke:#bf360c,color:#fff + style F fill:#4a148c,stroke:#2e0d57,color:#fff +``` + +> **Note**: 32 bytes is negligible compared to typical transaction messages (hundreds to thousands of bytes) + +### Mitigation Strategies + +```mermaid +flowchart LR + A["Head Sampling
10% default"] --> B["Tail Sampling
Keep errors/slow"] --> C["Batch Export
Reduce I/O"] --> D["Conditional Compile
XRPL_ENABLE_TELEMETRY"] + + style A fill:#1565c0,stroke:#0d47a1,color:#fff + style B fill:#2e7d32,stroke:#1b5e20,color:#fff + style C fill:#e65100,stroke:#bf360c,color:#fff + style D fill:#4a148c,stroke:#2e0d57,color:#fff +``` + +### Kill Switches (Rollback Options) + +1. **Config Disable**: Set `enabled=0` in config → instant disable, no restart needed for sampling +2. **Rebuild**: Compile with `XRPL_ENABLE_TELEMETRY=OFF` → zero overhead (no-op) +3. **Full Revert**: Clean separation allows easy commit reversion + +--- + +## Slide 7: Data Collection & Privacy + +### What Data is Collected + +| Category | Attributes Collected | Purpose | +| --------------- | ---------------------------------------------------------------------------------- | --------------------------- | +| **Transaction** | `tx.hash`, `tx.type`, `tx.result`, `tx.fee`, `ledger_index` | Trace transaction lifecycle | +| **Consensus** | `round`, `phase`, `mode`, `proposers`(public key or public node id), `duration_ms` | Analyze consensus timing | +| **RPC** | `command`, `version`, `status`, `duration_ms` | Monitor RPC performance | +| **Peer** | `peer.id`(public key), `latency_ms`, `message.type`, `message.size` | Network topology analysis | +| **Ledger** | `ledger.hash`, `ledger.index`, `close_time`, `tx_count` | Ledger progression tracking | +| **Job** | `job.type`, `queue_ms`, `worker` | JobQueue performance | + +### What is NOT Collected (Privacy Guarantees) + +```mermaid +flowchart LR + subgraph notCollected["❌ NOT Collected"] + direction LR + A["Private Keys"] ~~~ B["Account Balances"] ~~~ C["Transaction Amounts"] + end + + subgraph alsoNot["❌ Also Excluded"] + direction LR + D["IP Addresses
(configurable)"] ~~~ E["Personal Data"] ~~~ F["Raw TX Payloads"] + end + + style A fill:#c62828,stroke:#8c2809,color:#fff + style B fill:#c62828,stroke:#8c2809,color:#fff + style C fill:#c62828,stroke:#8c2809,color:#fff + style D fill:#c62828,stroke:#8c2809,color:#fff + style E fill:#c62828,stroke:#8c2809,color:#fff + style F fill:#c62828,stroke:#8c2809,color:#fff +``` + +### Privacy Protection Mechanisms + +| Mechanism | Description | +| -------------------------- | ------------------------------------------------------------- | +| **Account Hashing** | `xrpl.tx.account` is hashed at collector level before storage | +| **Configurable Redaction** | Sensitive fields can be excluded via config | +| **Sampling** | Only 10% of traces recorded by default (reduces exposure) | +| **Local Control** | Node operators control what gets exported | +| **No Raw Payloads** | Transaction content is never recorded, only metadata | + +> **Key Principle**: Telemetry collects **operational metadata** (timing, counts, hashes) — never **sensitive content** (keys, balances, amounts). + +--- + +_End of Presentation_ diff --git a/src/xrpld/rpc/detail/RPCHandler.cpp b/src/xrpld/rpc/detail/RPCHandler.cpp index eea24ab0c0..c12251eaaf 100644 --- a/src/xrpld/rpc/detail/RPCHandler.cpp +++ b/src/xrpld/rpc/detail/RPCHandler.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -157,6 +158,11 @@ template Status callMethod(JsonContext& context, Method method, std::string const& name, Object& result) { + XRPL_TRACE_RPC(context.app.getTelemetry(), "rpc.command." + name); + XRPL_TRACE_SET_ATTR("xrpl.rpc.command", name.c_str()); + XRPL_TRACE_SET_ATTR("xrpl.rpc.version", static_cast(context.apiVersion)); + XRPL_TRACE_SET_ATTR("xrpl.rpc.role", (context.role == Role::ADMIN ? "admin" : "user")); + static std::atomic requestId{0}; auto& perfLog = context.app.getPerfLog(); std::uint64_t const curId = ++requestId; @@ -172,12 +178,15 @@ callMethod(JsonContext& context, Method method, std::string const& name, Object& JLOG(context.j.debug()) << "RPC call " << name << " completed in " << ((end - start).count() / 1000000000.0) << "seconds"; perfLog.rpcFinish(name, curId); + XRPL_TRACE_SET_ATTR("xrpl.rpc.status", "success"); return ret; } catch (std::exception& e) { perfLog.rpcError(name, curId); JLOG(context.j.info()) << "Caught throw: " << e.what(); + XRPL_TRACE_EXCEPTION(e); + XRPL_TRACE_SET_ATTR("xrpl.rpc.status", "error"); if (context.loadType == Resource::feeReferenceRPC) context.loadType = Resource::feeExceptionRPC; diff --git a/src/xrpld/rpc/detail/ServerHandler.cpp b/src/xrpld/rpc/detail/ServerHandler.cpp index 64c81ccc0a..757b33e825 100644 --- a/src/xrpld/rpc/detail/ServerHandler.cpp +++ b/src/xrpld/rpc/detail/ServerHandler.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -46,7 +47,7 @@ static bool isStatusRequest(http_request_type const& request) { return request.version() >= 11 && request.target() == "/" && request.body().size() == 0 && - request.method() == boost::beast::http::verb::get; + request.method() == boost::beast::http::verb::get; } static Handoff @@ -78,23 +79,22 @@ authorized(Port const& port, std::map const& h) return false; std::string strUserPass64 = it->second.substr(6); boost::trim(strUserPass64); - std::string strUserPass = base64_decode(strUserPass64); + std::string strUserPass = base64_decode(strUserPass64); std::string::size_type nColon = strUserPass.find(":"); if (nColon == std::string::npos) return false; - std::string strUser = strUserPass.substr(0, nColon); + std::string strUser = strUserPass.substr(0, nColon); std::string strPassword = strUserPass.substr(nColon + 1); return strUser == port.user && strPassword == port.password; } -ServerHandler::ServerHandler( - ServerHandlerCreator const&, - Application& app, - boost::asio::io_context& io_context, - JobQueue& jobQueue, - NetworkOPs& networkOPs, - Resource::Manager& resourceManager, - CollectorManager& cm) +ServerHandler::ServerHandler(ServerHandlerCreator const&, + Application& app, + boost::asio::io_context& io_context, + JobQueue& jobQueue, + NetworkOPs& networkOPs, + Resource::Manager& resourceManager, + CollectorManager& cm) : app_(app) , m_resourceManager(resourceManager) , m_journal(app_.getJournal("Server")) @@ -104,8 +104,8 @@ ServerHandler::ServerHandler( { auto const& group(cm.group("rpc")); rpc_requests_ = group->make_counter("requests"); - rpc_size_ = group->make_event("size"); - rpc_time_ = group->make_event("time"); + rpc_size_ = group->make_event("size"); + rpc_time_ = group->make_event("time"); } ServerHandler::~ServerHandler() @@ -116,7 +116,7 @@ ServerHandler::~ServerHandler() void ServerHandler::setup(Setup const& setup, beast::Journal journal) { - setup_ = setup; + setup_ = setup; endpoints_ = m_server->ports(setup.ports); // fix auto ports @@ -146,7 +146,11 @@ ServerHandler::stop() m_server->close(); { std::unique_lock lock(mutex_); - condition_.wait(lock, [this] { return stopped_; }); + condition_.wait(lock, + [this] + { + return stopped_; + }); } } @@ -157,7 +161,8 @@ ServerHandler::onAccept(Session& session, boost::asio::ip::tcp::endpoint endpoin { auto const& port = session.port(); - auto const c = [this, &port]() { + auto const c = [this, &port]() + { std::lock_guard lock(mutex_); return ++count_[port]; }(); @@ -172,16 +177,15 @@ ServerHandler::onAccept(Session& session, boost::asio::ip::tcp::endpoint endpoin } Handoff -ServerHandler::onHandoff( - Session& session, - std::unique_ptr&& bundle, - http_request_type&& request, - boost::asio::ip::tcp::endpoint const& remote_address) +ServerHandler::onHandoff(Session& session, + std::unique_ptr&& bundle, + http_request_type&& request, + boost::asio::ip::tcp::endpoint const& remote_address) { using namespace boost::beast; auto const& p{session.port().protocol}; - bool const is_ws{ - p.count("ws") > 0 || p.count("ws2") > 0 || p.count("wss") > 0 || p.count("wss2") > 0}; + bool const is_ws{p.count("ws") > 0 || p.count("ws2") > 0 || p.count("wss") > 0 || + p.count("wss2") > 0}; if (websocket::is_upgrade(request)) { @@ -201,14 +205,16 @@ ServerHandler::onHandoff( auto is{std::make_shared(m_networkOPs, ws)}; auto const beast_remote_address = beast::IPAddressConversion::from_asio(remote_address); - is->getConsumer() = requestInboundEndpoint( - m_resourceManager, - beast_remote_address, - requestRole( - Role::GUEST, session.port(), Json::Value(), beast_remote_address, is->user()), - is->user(), - is->forwarded_for()); - ws->appDefined = std::move(is); + is->getConsumer() = requestInboundEndpoint(m_resourceManager, + beast_remote_address, + requestRole(Role::GUEST, + session.port(), + Json::Value(), + beast_remote_address, + is->user()), + is->user(), + is->forwarded_for()); + ws->appDefined = std::move(is); ws->run(); Handoff handoff; @@ -229,7 +235,10 @@ ServerHandler::onHandoff( static inline Json::Output makeOutput(Session& session) { - return [&](boost::beast::string_view const& b) { session.write(b.data(), b.size()); }; + return [&](boost::beast::string_view const& b) + { + session.write(b.data(), b.size()); + }; } static std::map @@ -241,9 +250,13 @@ build_map(boost::beast::http::fields const& h) // key cannot be a std::string_view because it needs to be used in // map and along with iterators std::string key(e.name_string()); - std::transform(key.begin(), key.end(), key.begin(), [](auto kc) { - return std::tolower(static_cast(kc)); - }); + std::transform(key.begin(), + key.end(), + key.begin(), + [](auto kc) + { + return std::tolower(static_cast(kc)); + }); c[key] = e.value(); } return c; @@ -266,6 +279,8 @@ buffers_to_string(ConstBufferSequence const& bs) void ServerHandler::onRequest(Session& session) { + XRPL_TRACE_RPC(app_.getTelemetry(), "rpc.request"); + // Make sure RPC is enabled on the port if (session.port().protocol.count("http") == 0 && session.port().protocol.count("https") == 0) { @@ -283,10 +298,13 @@ ServerHandler::onRequest(Session& session) } std::shared_ptr detachedSession = session.detach(); - auto const postResult = m_jobQueue.postCoro( - jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr coro) { - processSession(detachedSession, coro); - }); + auto const postResult = + m_jobQueue.postCoro(jtCLIENT_RPC, + "RPC-Client", + [this, detachedSession](std::shared_ptr coro) + { + processSession(detachedSession, coro); + }); if (postResult == nullptr) { // The coroutine was rejected, probably because we're shutting down. @@ -297,22 +315,24 @@ ServerHandler::onRequest(Session& session) } void -ServerHandler::onWSMessage( - std::shared_ptr session, - std::vector const& buffers) +ServerHandler::onWSMessage(std::shared_ptr session, + std::vector const& buffers) { Json::Value jv; auto const size = boost::asio::buffer_size(buffers); if (size > RPC::Tuning::maxRequestSize || !Json::Reader{}.parse(jv, buffers) || !jv.isObject()) { Json::Value jvResult(Json::objectValue); - jvResult[jss::type] = jss::error; + jvResult[jss::type] = jss::error; jvResult[jss::error] = "jsonInvalid"; jvResult[jss::value] = buffers_to_string(buffers); boost::beast::multi_buffer sb; - Json::stream(jvResult, [&sb](auto const p, auto const n) { - sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(p, n))); - }); + Json::stream( + jvResult, + [&sb](auto const p, auto const n) + { + sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(p, n))); + }); JLOG(m_journal.trace()) << "Websocket sending '" << jvResult << "'"; session->send(std::make_shared>(std::move(sb))); session->complete(); @@ -324,10 +344,11 @@ ServerHandler::onWSMessage( auto const postResult = m_jobQueue.postCoro( jtCLIENT_WEBSOCKET, "WS-Client", - [this, session, jv = std::move(jv)](std::shared_ptr const& coro) { + [this, session, jv = std::move(jv)](std::shared_ptr const& coro) + { auto const jr = this->processSession(session, coro, jv); - auto const s = to_string(jr); - auto const n = s.length(); + auto const s = to_string(jr); + auto const n = s.length(); boost::beast::multi_buffer sb(n); sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n))); session->send(std::make_shared>(std::move(sb))); @@ -362,7 +383,8 @@ void logDuration(Json::Value const& request, T const& duration, beast::Journal& journal) { using namespace std::chrono_literals; - auto const level = [&]() { + auto const level = [&]() + { if (duration >= 10s) return journal.error(); if (duration >= 1s) @@ -376,11 +398,11 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ } Json::Value -ServerHandler::processSession( - std::shared_ptr const& session, - std::shared_ptr const& coro, - Json::Value const& jv) +ServerHandler::processSession(std::shared_ptr const& session, + std::shared_ptr const& coro, + Json::Value const& jv) { + XRPL_TRACE_RPC(app_.getTelemetry(), "rpc.ws_message"); auto is = std::static_pointer_cast(session->appDefined); if (is->getConsumer().disconnect(m_journal)) { @@ -403,10 +425,10 @@ ServerHandler::processSession( (jv.isMember(jss::command) && jv.isMember(jss::method) && jv[jss::command].asString() != jv[jss::method].asString())) { - jr[jss::type] = jss::response; - jr[jss::status] = jss::error; - jr[jss::error] = apiVersion == RPC::apiInvalidVersion ? jss::invalid_API_version - : jss::missingCommand; + jr[jss::type] = jss::response; + jr[jss::status] = jss::error; + jr[jss::error] = apiVersion == RPC::apiInvalidVersion ? jss::invalid_API_version + : jss::missingCommand; jr[jss::request] = jv; if (jv.isMember(jss::id)) jr[jss::id] = jv[jss::id]; @@ -425,32 +447,30 @@ ServerHandler::processSession( apiVersion, app_.config().BETA_RPC_API, jv.isMember(jss::command) ? jv[jss::command].asString() : jv[jss::method].asString()); - auto role = requestRole( - required, - session->port(), - jv, - beast::IP::from_asio(session->remote_endpoint().address()), - is->user()); + auto role = requestRole(required, + session->port(), + jv, + beast::IP::from_asio(session->remote_endpoint().address()), + is->user()); if (Role::FORBID == role) { - loadType = Resource::feeMalformedRPC; + loadType = Resource::feeMalformedRPC; jr[jss::result] = rpcError(rpcFORBIDDEN); } else { - RPC::JsonContext context{ - {app_.getJournal("RPCHandler"), - app_, - loadType, - app_.getOPs(), - app_.getLedgerMaster(), - is->getConsumer(), - role, - coro, - is, - apiVersion}, - jv, - {is->user(), is->forwarded_for()}}; + RPC::JsonContext context{{app_.getJournal("RPCHandler"), + app_, + loadType, + app_.getOPs(), + app_.getLedgerMaster(), + is->getConsumer(), + role, + coro, + is, + apiVersion}, + jv, + {is->user(), is->forwarded_for()}}; auto start = std::chrono::system_clock::now(); RPC::doCommand(context, jr[jss::result]); @@ -478,7 +498,7 @@ ServerHandler::processSession( // Regularize result. This is duplicate code. if (jr[jss::result].isMember(jss::error)) { - jr = jr[jss::result]; + jr = jr[jss::result]; jr[jss::status] = jss::error; auto rq = jv; @@ -519,23 +539,22 @@ ServerHandler::processSession( // Run as a coroutine. void -ServerHandler::processSession( - std::shared_ptr const& session, - std::shared_ptr coro) +ServerHandler::processSession(std::shared_ptr const& session, + std::shared_ptr coro) { - processRequest( - session->port(), - buffers_to_string(session->request().body().data()), - session->remoteAddress().at_port(0), - makeOutput(*session), - coro, - forwardedFor(session->request()), - [&] { - auto const iter = session->request().find("X-User"); - if (iter != session->request().end()) - return iter->value(); - return boost::beast::string_view{}; - }()); + processRequest(session->port(), + buffers_to_string(session->request().body().data()), + session->remoteAddress().at_port(0), + makeOutput(*session), + coro, + forwardedFor(session->request()), + [&] + { + auto const iter = session->request().find("X-User"); + if (iter != session->request().end()) + return iter->value(); + return boost::beast::string_view{}; + }()); if (beast::rfc2616::is_keep_alive(session->request())) { @@ -551,28 +570,28 @@ static Json::Value make_json_error(Json::Int code, Json::Value&& message) { Json::Value sub{Json::objectValue}; - sub["code"] = code; + sub["code"] = code; sub["message"] = std::move(message); Json::Value r{Json::objectValue}; r["error"] = sub; return r; } -Json::Int constexpr method_not_found = -32601; +Json::Int constexpr method_not_found = -32601; Json::Int constexpr server_overloaded = -32604; -Json::Int constexpr forbidden = -32605; -Json::Int constexpr wrong_version = -32606; +Json::Int constexpr forbidden = -32605; +Json::Int constexpr wrong_version = -32606; void -ServerHandler::processRequest( - Port const& port, - std::string const& request, - beast::IP::Endpoint const& remoteIPAddress, - Output const& output, - std::shared_ptr coro, - std::string_view forwardedFor, - std::string_view user) +ServerHandler::processRequest(Port const& port, + std::string const& request, + beast::IP::Endpoint const& remoteIPAddress, + Output const& output, + std::shared_ptr coro, + std::string_view forwardedFor, + std::string_view user) { + XRPL_TRACE_RPC(app_.getTelemetry(), "rpc.process"); auto rpcJ = app_.getJournal("RPC"); Json::Value jsonOrig; @@ -581,16 +600,15 @@ ServerHandler::processRequest( if ((request.size() > RPC::Tuning::maxRequestSize) || !reader.parse(request, jsonOrig) || !jsonOrig || !jsonOrig.isObject()) { - HTTPReply( - 400, - "Unable to parse request: " + reader.getFormattedErrorMessages(), - output, - rpcJ); + HTTPReply(400, + "Unable to parse request: " + reader.getFormattedErrorMessages(), + output, + rpcJ); return; } } - bool batch = false; + bool batch = false; unsigned size = 1; if (jsonOrig.isMember(jss::method) && jsonOrig[jss::method] == "batch") { @@ -613,7 +631,7 @@ ServerHandler::processRequest( { Json::Value r(Json::objectValue); r[jss::request] = jsonRPC; - r[jss::error] = make_json_error(method_not_found, "Method not found"); + r[jss::error] = make_json_error(method_not_found, "Method not found"); reply.append(r); continue; } @@ -622,8 +640,8 @@ ServerHandler::processRequest( if (jsonRPC.isMember(jss::params) && jsonRPC[jss::params].isArray() && jsonRPC[jss::params].size() > 0 && jsonRPC[jss::params][0u].isObject()) { - apiVersion = RPC::getAPIVersionNumber( - jsonRPC[jss::params][Json::UInt(0)], app_.config().BETA_RPC_API); + apiVersion = RPC::getAPIVersionNumber(jsonRPC[jss::params][Json::UInt(0)], + app_.config().BETA_RPC_API); } if (apiVersion == RPC::apiVersionIfUnspecified && batch) @@ -641,25 +659,29 @@ ServerHandler::processRequest( } Json::Value r(Json::objectValue); r[jss::request] = jsonRPC; - r[jss::error] = make_json_error(wrong_version, jss::invalid_API_version.c_str()); + r[jss::error] = make_json_error(wrong_version, jss::invalid_API_version.c_str()); reply.append(r); continue; } /* ------------------------------------------------------------------ */ - auto role = Role::FORBID; + auto role = Role::FORBID; auto required = Role::FORBID; if (jsonRPC.isMember(jss::method) && jsonRPC[jss::method].isString()) { - required = RPC::roleRequired( - apiVersion, app_.config().BETA_RPC_API, jsonRPC[jss::method].asString()); + required = RPC::roleRequired(apiVersion, + app_.config().BETA_RPC_API, + jsonRPC[jss::method].asString()); } if (jsonRPC.isMember(jss::params) && jsonRPC[jss::params].isArray() && jsonRPC[jss::params].size() > 0 && jsonRPC[jss::params][Json::UInt(0)].isObjectOrNull()) { - role = requestRole( - required, port, jsonRPC[jss::params][Json::UInt(0)], remoteIPAddress, user); + role = requestRole(required, + port, + jsonRPC[jss::params][Json::UInt(0)], + remoteIPAddress, + user); } else { @@ -673,8 +695,9 @@ ServerHandler::processRequest( } else { - usage = m_resourceManager.newInboundEndpoint( - remoteIPAddress, role == Role::PROXY, forwardedFor); + usage = m_resourceManager.newInboundEndpoint(remoteIPAddress, + role == Role::PROXY, + forwardedFor); if (usage.disconnect(m_journal)) { if (!batch) @@ -821,19 +844,18 @@ ServerHandler::processRequest( Resource::Charge loadType = Resource::feeReferenceRPC; - RPC::JsonContext context{ - {m_journal, - app_, - loadType, - m_networkOPs, - app_.getLedgerMaster(), - usage, - role, - coro, - InfoSub::pointer(), - apiVersion}, - params, - {user, forwardedFor}}; + RPC::JsonContext context{{m_journal, + app_, + loadType, + m_networkOPs, + app_.getLedgerMaster(), + usage, + role, + coro, + InfoSub::pointer(), + apiVersion}, + params, + {user, forwardedFor}}; Json::Value result; auto start = std::chrono::system_clock::now(); @@ -866,8 +888,8 @@ ServerHandler::processRequest( if (result.isMember(jss::error)) { result[jss::status] = jss::error; - result["code"] = result[jss::error_code]; - result["message"] = result[jss::error_message]; + result["code"] = result[jss::error_code]; + result["message"] = result[jss::error_message]; result.removeMember(jss::error_message); JLOG(m_journal.debug()) << "rpcError: " << result[jss::error] << ": " << result[jss::error_message]; @@ -876,7 +898,7 @@ ServerHandler::processRequest( else { result[jss::status] = jss::success; - r[jss::result] = std::move(result); + r[jss::result] = std::move(result); } } else @@ -899,7 +921,7 @@ ServerHandler::processRequest( rq[jss::seed_hex.c_str()] = ""; } - result[jss::status] = jss::error; + result[jss::status] = jss::error; result[jss::request] = rq; JLOG(m_journal.debug()) @@ -939,7 +961,8 @@ ServerHandler::processRequest( } // If we're returning an error_code, use that to determine the HTTP status. - int const httpStatus = [&reply]() { + int const httpStatus = [&reply]() + { // This feature is enabled with ripplerpc version 3.0 and above. // Before ripplerpc version 3.0 always return 200. if (reply.isMember(jss::ripplerpc) && reply[jss::ripplerpc].isString() && @@ -959,9 +982,8 @@ ServerHandler::processRequest( auto response = to_string(reply); - rpc_time_.notify( - std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - start)); + rpc_time_.notify(std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start)); ++rpc_requests_; rpc_size_.notify(beast::insight::Event::value_type{response.size()}); @@ -1000,8 +1022,8 @@ ServerHandler::statusResponse(http_request_type const& request) const { msg.result(boost::beast::http::status::ok); msg.body() = "Test page for " + systemName() + - "

Test

This page shows " + systemName() + - " http(s) connectivity is working.

"; + "

Test

This page shows " + systemName() + + " http(s) connectivity is working.

"; } else { @@ -1071,19 +1093,19 @@ to_Port(ParsedPort const& parsed, std::ostream& log) } p.protocol = parsed.protocol; - p.user = parsed.user; - p.password = parsed.password; - p.admin_user = parsed.admin_user; - p.admin_password = parsed.admin_password; - p.ssl_key = parsed.ssl_key; - p.ssl_cert = parsed.ssl_cert; - p.ssl_chain = parsed.ssl_chain; - p.ssl_ciphers = parsed.ssl_ciphers; - p.pmd_options = parsed.pmd_options; - p.ws_queue_limit = parsed.ws_queue_limit; - p.limit = parsed.limit; - p.admin_nets_v4 = parsed.admin_nets_v4; - p.admin_nets_v6 = parsed.admin_nets_v6; + p.user = parsed.user; + p.password = parsed.password; + p.admin_user = parsed.admin_user; + p.admin_password = parsed.admin_password; + p.ssl_key = parsed.ssl_key; + p.ssl_cert = parsed.ssl_cert; + p.ssl_chain = parsed.ssl_chain; + p.ssl_ciphers = parsed.ssl_ciphers; + p.pmd_options = parsed.pmd_options; + p.ws_queue_limit = parsed.ws_queue_limit; + p.limit = parsed.limit; + p.admin_nets_v4 = parsed.admin_nets_v4; + p.admin_nets_v6 = parsed.admin_nets_v6; p.secure_gateway_nets_v4 = parsed.secure_gateway_nets_v4; p.secure_gateway_nets_v6 = parsed.secure_gateway_nets_v6; @@ -1146,9 +1168,12 @@ parse_Ports(Config const& config, std::ostream& log) } else { - auto const count = std::count_if(result.cbegin(), result.cend(), [](Port const& p) { - return p.protocol.count("peer") != 0; - }); + auto const count = std::count_if(result.cbegin(), + result.cend(), + [](Port const& p) + { + return p.protocol.count("peer") != 0; + }); if (count > 1) { @@ -1185,10 +1210,10 @@ setup_Client(ServerHandler::Setup& setup) { setup.client.ip = iter->ip.to_string(); } - setup.client.port = iter->port; - setup.client.user = iter->user; - setup.client.password = iter->password; - setup.client.admin_user = iter->admin_user; + setup.client.port = iter->port; + setup.client.user = iter->user; + setup.client.password = iter->password; + setup.client.admin_user = iter->admin_user; setup.client.admin_password = iter->admin_password; } @@ -1196,9 +1221,12 @@ setup_Client(ServerHandler::Setup& setup) static void setup_Overlay(ServerHandler::Setup& setup) { - auto const iter = std::find_if(setup.ports.cbegin(), setup.ports.cend(), [](Port const& port) { - return port.protocol.count("peer") != 0; - }); + auto const iter = std::find_if(setup.ports.cbegin(), + setup.ports.cend(), + [](Port const& port) + { + return port.protocol.count("peer") != 0; + }); if (iter == setup.ports.cend()) { setup.overlay = {}; @@ -1220,22 +1248,20 @@ setup_ServerHandler(Config const& config, std::ostream& log) } std::unique_ptr -make_ServerHandler( - Application& app, - boost::asio::io_context& io_context, - JobQueue& jobQueue, - NetworkOPs& networkOPs, - Resource::Manager& resourceManager, - CollectorManager& cm) +make_ServerHandler(Application& app, + boost::asio::io_context& io_context, + JobQueue& jobQueue, + NetworkOPs& networkOPs, + Resource::Manager& resourceManager, + CollectorManager& cm) { - return std::make_unique( - ServerHandler::ServerHandlerCreator(), - app, - io_context, - jobQueue, - networkOPs, - resourceManager, - cm); + return std::make_unique(ServerHandler::ServerHandlerCreator(), + app, + io_context, + jobQueue, + networkOPs, + resourceManager, + cm); } } // namespace xrpl diff --git a/src/xrpld/telemetry/TracingInstrumentation.h b/src/xrpld/telemetry/TracingInstrumentation.h new file mode 100644 index 0000000000..d7d2ecf912 --- /dev/null +++ b/src/xrpld/telemetry/TracingInstrumentation.h @@ -0,0 +1,115 @@ +#pragma once + +/** Convenience macros for instrumenting code with OpenTelemetry trace spans. + + When XRPL_ENABLE_TELEMETRY is defined, the macros create SpanGuard objects + that manage span lifetime via RAII. When not defined, all macros expand to + ((void)0) with zero overhead. + + Usage in instrumented code: + @code + XRPL_TRACE_RPC(app.getTelemetry(), "rpc.command." + name); + XRPL_TRACE_SET_ATTR("xrpl.rpc.command", name); + XRPL_TRACE_SET_ATTR("xrpl.rpc.status", "success"); + @endcode + + @note Macro parameter names use leading/trailing underscores + (e.g. _tel_obj_) to avoid colliding with identifiers in the macro body, + specifically the ::xrpl::telemetry:: namespace qualifier. +*/ + +#ifdef XRPL_ENABLE_TELEMETRY + +#include +#include + +#include + +namespace xrpl { +namespace telemetry { + +/** Start an unconditional span, ended when the guard goes out of scope. + @param _tel_obj_ Telemetry instance reference. + @param _span_name_ Span name string. +*/ +#define XRPL_TRACE_SPAN(_tel_obj_, _span_name_) \ + auto _xrpl_span_ = (_tel_obj_).startSpan(_span_name_); \ + ::xrpl::telemetry::SpanGuard _xrpl_guard_(_xrpl_span_) + +/** Start an unconditional span with a specific SpanKind. + @param _tel_obj_ Telemetry instance reference. + @param _span_name_ Span name string. + @param _span_kind_ opentelemetry::trace::SpanKind value. +*/ +#define XRPL_TRACE_SPAN_KIND(_tel_obj_, _span_name_, _span_kind_) \ + auto _xrpl_span_ = (_tel_obj_).startSpan(_span_name_, _span_kind_); \ + ::xrpl::telemetry::SpanGuard _xrpl_guard_(_xrpl_span_) + +/** Conditionally start a span for RPC tracing. + The span is only created if shouldTraceRpc() returns true. + @param _tel_obj_ Telemetry instance reference. + @param _span_name_ Span name string. +*/ +#define XRPL_TRACE_RPC(_tel_obj_, _span_name_) \ + std::optional<::xrpl::telemetry::SpanGuard> _xrpl_guard_; \ + if ((_tel_obj_).shouldTraceRpc()) \ + { \ + _xrpl_guard_.emplace((_tel_obj_).startSpan(_span_name_)); \ + } + +/** Conditionally start a span for transaction tracing. + The span is only created if shouldTraceTransactions() returns true. + @param _tel_obj_ Telemetry instance reference. + @param _span_name_ Span name string. +*/ +#define XRPL_TRACE_TX(_tel_obj_, _span_name_) \ + std::optional<::xrpl::telemetry::SpanGuard> _xrpl_guard_; \ + if ((_tel_obj_).shouldTraceTransactions()) \ + { \ + _xrpl_guard_.emplace((_tel_obj_).startSpan(_span_name_)); \ + } + +/** Conditionally start a span for consensus tracing. + The span is only created if shouldTraceConsensus() returns true. + @param _tel_obj_ Telemetry instance reference. + @param _span_name_ Span name string. +*/ +#define XRPL_TRACE_CONSENSUS(_tel_obj_, _span_name_) \ + std::optional<::xrpl::telemetry::SpanGuard> _xrpl_guard_; \ + if ((_tel_obj_).shouldTraceConsensus()) \ + { \ + _xrpl_guard_.emplace((_tel_obj_).startSpan(_span_name_)); \ + } + +/** Set a key-value attribute on the current span (if it exists). + Must be used after one of the XRPL_TRACE_* span macros. +*/ +#define XRPL_TRACE_SET_ATTR(key, value) \ + if (_xrpl_guard_.has_value()) \ + { \ + _xrpl_guard_->setAttribute(key, value); \ + } + +/** Record an exception on the current span and mark it as error. + Must be used after one of the XRPL_TRACE_* span macros. +*/ +#define XRPL_TRACE_EXCEPTION(e) \ + if (_xrpl_guard_.has_value()) \ + { \ + _xrpl_guard_->recordException(e); \ + } + +} // namespace telemetry +} // namespace xrpl + +#else // XRPL_ENABLE_TELEMETRY not defined + +#define XRPL_TRACE_SPAN(_tel_obj_, _span_name_) ((void)0) +#define XRPL_TRACE_SPAN_KIND(_tel_obj_, _span_name_, _span_kind_) ((void)0) +#define XRPL_TRACE_RPC(_tel_obj_, _span_name_) ((void)0) +#define XRPL_TRACE_TX(_tel_obj_, _span_name_) ((void)0) +#define XRPL_TRACE_CONSENSUS(_tel_obj_, _span_name_) ((void)0) +#define XRPL_TRACE_SET_ATTR(key, value) ((void)0) +#define XRPL_TRACE_EXCEPTION(e) ((void)0) + +#endif // XRPL_ENABLE_TELEMETRY