Phase 1c: RPC integration - ServerHandler tracing, telemetry config wiring

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-03-20 17:22:12 +00:00
parent 405886719c
commit 8707cc7d48
5 changed files with 617 additions and 185 deletions

View File

@@ -268,6 +268,7 @@ xrpld.perflog > xrpl.json
xrpld.rpc > xrpl.basics
xrpld.rpc > xrpl.core
xrpld.rpc > xrpld.core
xrpld.rpc > xrpld.telemetry
xrpld.rpc > xrpl.json
xrpld.rpc > xrpl.ledger
xrpld.rpc > xrpl.net
@@ -278,3 +279,4 @@ xrpld.rpc > xrpl.resource
xrpld.rpc > xrpl.server
xrpld.rpc > xrpl.tx
xrpld.shamap > xrpl.shamap
xrpld.telemetry > xrpl.telemetry

280
presentation.md Normal file
View File

@@ -0,0 +1,280 @@
# OpenTelemetry Distributed Tracing for rippled
---
## Slide 1: Introduction
### What is OpenTelemetry?
OpenTelemetry is an open-source, CNCF-backed observability framework for distributed tracing, metrics, and logs.
### Why OpenTelemetry for rippled?
- **End-to-End Transaction Visibility**: Track transactions from submission → consensus → ledger inclusion
- **Cross-Node Correlation**: Follow requests across multiple independent nodes using a unique `trace_id`
- **Consensus Round Analysis**: Understand timing and behavior across validators
- **Incident Debugging**: Correlate events across distributed nodes during issues
```mermaid
flowchart LR
A["Node A<br/>tx.receive<br/>trace_id: abc123"] --> B["Node B<br/>tx.relay<br/>trace_id: abc123"] --> C["Node C<br/>tx.validate<br/>trace_id: abc123"] --> D["Node D<br/>ledger.apply<br/>trace_id: abc123"]
style A fill:#1565c0,stroke:#0d47a1,color:#fff
style B fill:#2e7d32,stroke:#1b5e20,color:#fff
style C fill:#2e7d32,stroke:#1b5e20,color:#fff
style D fill:#e65100,stroke:#bf360c,color:#fff
```
> **Trace ID: abc123** — All nodes share the same trace, enabling cross-node correlation.
---
## Slide 2: OpenTelemetry vs Open Source Alternatives
| Feature | OpenTelemetry | Jaeger | Zipkin | SkyWalking | Pinpoint | Prometheus |
| ------------------- | ---------------- | ---------------- | ------------------ | ---------- | ---------- | ---------- |
| **Tracing** | YES | YES | YES | YES | YES | NO |
| **Metrics** | YES | NO | NO | YES | YES | YES |
| **Logs** | YES | NO | NO | YES | NO | NO |
| **C++ SDK** | YES Official | YES (Deprecated) | YES (Unmaintained) | NO | NO | YES |
| **Vendor Neutral** | YES Primary goal | NO | NO | NO | NO | NO |
| **Instrumentation** | Manual + Auto | Manual | Manual | Auto-first | Auto-first | Manual |
| **Backend** | Any (exporters) | Self | Self | Self | Self | Self |
| **CNCF Status** | Incubating | Graduated | NO | Incubating | NO | Graduated |
> **Why OpenTelemetry?** It's the only actively maintained, full-featured C++ option with vendor neutrality — allowing export to Jaeger, Prometheus, Grafana, or any commercial backend without changing instrumentation.
---
## Slide 3: Comparison with rippled's Existing Solutions
### Current Observability Stack
| Aspect | PerfLog (JSON) | StatsD (Metrics) | OpenTelemetry (NEW) |
| --------------------- | --------------------- | --------------------- | --------------------------- |
| **Type** | Logging | Metrics | Distributed Tracing |
| **Scope** | Single node | Single node | **Cross-node** |
| **Data** | JSON log entries | Counters, gauges | Spans with context |
| **Correlation** | By timestamp | By metric name | By `trace_id` |
| **Overhead** | Low (file I/O) | Low (UDP) | Low-Medium (configurable) |
| **Question Answered** | "What happened here?" | "How many? How fast?" | **"What was the journey?"** |
### Use Case Matrix
| Scenario | PerfLog | StatsD | OpenTelemetry |
| -------------------------------- | ------- | ------ | ------------- |
| "How many TXs per second?" | ❌ | ✅ | ❌ |
| "Why was this specific TX slow?" | ⚠️ | ❌ | ✅ |
| "Which node delayed consensus?" | ❌ | ❌ | ✅ |
| "Show TX journey across 5 nodes" | ❌ | ❌ | ✅ |
> **Key Insight**: OpenTelemetry **complements** (not replaces) existing systems.
---
## Slide 4: Architecture
### High-Level Integration Architecture
```mermaid
flowchart TB
subgraph rippled["rippled Node"]
subgraph services["Core Services"]
direction LR
RPC["RPC Server<br/>(HTTP/WS)"] ~~~ Overlay["Overlay<br/>(P2P Network)"] ~~~ Consensus["Consensus<br/>(RCLConsensus)"]
end
Telemetry["Telemetry Module<br/>(OpenTelemetry SDK)"]
services --> Telemetry
end
Telemetry -->|OTLP/gRPC| Collector["OTel Collector"]
Collector --> Tempo["Grafana Tempo"]
Collector --> Jaeger["Jaeger"]
Collector --> Elastic["Elastic APM"]
style rippled fill:#424242,stroke:#212121,color:#fff
style services fill:#1565c0,stroke:#0d47a1,color:#fff
style Telemetry fill:#2e7d32,stroke:#1b5e20,color:#fff
style Collector fill:#e65100,stroke:#bf360c,color:#fff
```
### Context Propagation
```mermaid
sequenceDiagram
participant Client
participant NodeA as Node A
participant NodeB as Node B
Client->>NodeA: Submit TX (no context)
Note over NodeA: Creates trace_id: abc123<br/>span: tx.receive
NodeA->>NodeB: Relay TX<br/>(traceparent: abc123)
Note over NodeB: Links to trace_id: abc123<br/>span: tx.relay
```
- **HTTP/RPC**: W3C Trace Context headers (`traceparent`)
- **P2P Messages**: Protocol Buffer extension fields
---
## Slide 5: Implementation Plan
### 5-Phase Rollout (9 Weeks)
```mermaid
gantt
title Implementation Timeline
dateFormat YYYY-MM-DD
axisFormat Week %W
section Phase 1
Core Infrastructure :p1, 2024-01-01, 2w
section Phase 2
RPC Tracing :p2, after p1, 2w
section Phase 3
Transaction Tracing :p3, after p2, 2w
section Phase 4
Consensus Tracing :p4, after p3, 2w
section Phase 5
Documentation :p5, after p4, 1w
```
### Phase Details
| Phase | Focus | Key Deliverables | Effort |
| ----- | ------------------- | -------------------------------------------- | ------- |
| 1 | Core Infrastructure | SDK integration, Telemetry interface, Config | 10 days |
| 2 | RPC Tracing | HTTP context extraction, Handler spans | 10 days |
| 3 | Transaction Tracing | Protobuf context, P2P relay propagation | 10 days |
| 4 | Consensus Tracing | Round spans, Proposal/validation tracing | 10 days |
| 5 | Documentation | Runbook, Dashboards, Training | 7 days |
**Total Effort**: ~47 developer-days (2 developers)
---
## Slide 6: Performance Overhead
### Estimated System Impact
| Metric | Overhead | Notes |
| ----------------- | ---------- | ----------------------------------- |
| **CPU** | 1-3% | Span creation and attribute setting |
| **Memory** | 2-5 MB | Batch buffer for pending spans |
| **Network** | 10-50 KB/s | Compressed OTLP export to collector |
| **Latency (p99)** | <2% | With proper sampling configuration |
### Per-Message Overhead (Context Propagation)
Each P2P message carries trace context with the following overhead:
| Field | Size | Description |
| ------------- | ------------- | ----------------------------------------- |
| `trace_id` | 16 bytes | Unique identifier for the entire trace |
| `span_id` | 8 bytes | Current span (becomes parent on receiver) |
| `trace_flags` | 4 bytes | Sampling decision flags |
| `trace_state` | 0-4 bytes | Optional vendor-specific data |
| **Total** | **~32 bytes** | **Added per traced P2P message** |
```mermaid
flowchart LR
subgraph msg["P2P Message with Trace Context"]
A["Original Message<br/>(variable size)"] --> B["+ TraceContext<br/>(~32 bytes)"]
end
subgraph breakdown["Context Breakdown"]
C["trace_id<br/>16 bytes"]
D["span_id<br/>8 bytes"]
E["flags<br/>4 bytes"]
F["state<br/>0-4 bytes"]
end
B --> breakdown
style A fill:#424242,stroke:#212121,color:#fff
style B fill:#2e7d32,stroke:#1b5e20,color:#fff
style C fill:#1565c0,stroke:#0d47a1,color:#fff
style D fill:#1565c0,stroke:#0d47a1,color:#fff
style E fill:#e65100,stroke:#bf360c,color:#fff
style F fill:#4a148c,stroke:#2e0d57,color:#fff
```
> **Note**: 32 bytes is negligible compared to typical transaction messages (hundreds to thousands of bytes)
### Mitigation Strategies
```mermaid
flowchart LR
A["Head Sampling<br/>10% default"] --> B["Tail Sampling<br/>Keep errors/slow"] --> C["Batch Export<br/>Reduce I/O"] --> D["Conditional Compile<br/>XRPL_ENABLE_TELEMETRY"]
style A fill:#1565c0,stroke:#0d47a1,color:#fff
style B fill:#2e7d32,stroke:#1b5e20,color:#fff
style C fill:#e65100,stroke:#bf360c,color:#fff
style D fill:#4a148c,stroke:#2e0d57,color:#fff
```
### Kill Switches (Rollback Options)
1. **Config Disable**: Set `enabled=0` in config instant disable, no restart needed for sampling
2. **Rebuild**: Compile with `XRPL_ENABLE_TELEMETRY=OFF` zero overhead (no-op)
3. **Full Revert**: Clean separation allows easy commit reversion
---
## Slide 7: Data Collection & Privacy
### What Data is Collected
| Category | Attributes Collected | Purpose |
| --------------- | ---------------------------------------------------------------------------------- | --------------------------- |
| **Transaction** | `tx.hash`, `tx.type`, `tx.result`, `tx.fee`, `ledger_index` | Trace transaction lifecycle |
| **Consensus** | `round`, `phase`, `mode`, `proposers`(public key or public node id), `duration_ms` | Analyze consensus timing |
| **RPC** | `command`, `version`, `status`, `duration_ms` | Monitor RPC performance |
| **Peer** | `peer.id`(public key), `latency_ms`, `message.type`, `message.size` | Network topology analysis |
| **Ledger** | `ledger.hash`, `ledger.index`, `close_time`, `tx_count` | Ledger progression tracking |
| **Job** | `job.type`, `queue_ms`, `worker` | JobQueue performance |
### What is NOT Collected (Privacy Guarantees)
```mermaid
flowchart LR
subgraph notCollected["❌ NOT Collected"]
direction LR
A["Private Keys"] ~~~ B["Account Balances"] ~~~ C["Transaction Amounts"]
end
subgraph alsoNot["❌ Also Excluded"]
direction LR
D["IP Addresses<br/>(configurable)"] ~~~ E["Personal Data"] ~~~ F["Raw TX Payloads"]
end
style A fill:#c62828,stroke:#8c2809,color:#fff
style B fill:#c62828,stroke:#8c2809,color:#fff
style C fill:#c62828,stroke:#8c2809,color:#fff
style D fill:#c62828,stroke:#8c2809,color:#fff
style E fill:#c62828,stroke:#8c2809,color:#fff
style F fill:#c62828,stroke:#8c2809,color:#fff
```
### Privacy Protection Mechanisms
| Mechanism | Description |
| -------------------------- | ------------------------------------------------------------- |
| **Account Hashing** | `xrpl.tx.account` is hashed at collector level before storage |
| **Configurable Redaction** | Sensitive fields can be excluded via config |
| **Sampling** | Only 10% of traces recorded by default (reduces exposure) |
| **Local Control** | Node operators control what gets exported |
| **No Raw Payloads** | Transaction content is never recorded, only metadata |
> **Key Principle**: Telemetry collects **operational metadata** (timing, counts, hashes) — never **sensitive content** (keys, balances, amounts).
---
_End of Presentation_

View File

@@ -8,6 +8,7 @@
#include <xrpld/rpc/Role.h>
#include <xrpld/rpc/detail/Handler.h>
#include <xrpld/rpc/detail/Tuning.h>
#include <xrpld/telemetry/TracingInstrumentation.h>
#include <xrpl/basics/Log.h>
#include <xrpl/core/JobQueue.h>
@@ -157,6 +158,11 @@ template <class Object, class Method>
Status
callMethod(JsonContext& context, Method method, std::string const& name, Object& result)
{
XRPL_TRACE_RPC(context.app.getTelemetry(), "rpc.command." + name);
XRPL_TRACE_SET_ATTR("xrpl.rpc.command", name.c_str());
XRPL_TRACE_SET_ATTR("xrpl.rpc.version", static_cast<int64_t>(context.apiVersion));
XRPL_TRACE_SET_ATTR("xrpl.rpc.role", (context.role == Role::ADMIN ? "admin" : "user"));
static std::atomic<std::uint64_t> requestId{0};
auto& perfLog = context.app.getPerfLog();
std::uint64_t const curId = ++requestId;
@@ -172,12 +178,15 @@ callMethod(JsonContext& context, Method method, std::string const& name, Object&
JLOG(context.j.debug()) << "RPC call " << name << " completed in "
<< ((end - start).count() / 1000000000.0) << "seconds";
perfLog.rpcFinish(name, curId);
XRPL_TRACE_SET_ATTR("xrpl.rpc.status", "success");
return ret;
}
catch (std::exception& e)
{
perfLog.rpcError(name, curId);
JLOG(context.j.info()) << "Caught throw: " << e.what();
XRPL_TRACE_EXCEPTION(e);
XRPL_TRACE_SET_ATTR("xrpl.rpc.status", "error");
if (context.loadType == Resource::feeReferenceRPC)
context.loadType = Resource::feeExceptionRPC;

View File

@@ -7,6 +7,7 @@
#include <xrpld/rpc/detail/Tuning.h>
#include <xrpld/rpc/detail/WSInfoSub.h>
#include <xrpld/rpc/json_body.h>
#include <xrpld/telemetry/TracingInstrumentation.h>
#include <xrpl/basics/base64.h>
#include <xrpl/basics/contract.h>
@@ -46,7 +47,7 @@ static bool
isStatusRequest(http_request_type const& request)
{
return request.version() >= 11 && request.target() == "/" && request.body().size() == 0 &&
request.method() == boost::beast::http::verb::get;
request.method() == boost::beast::http::verb::get;
}
static Handoff
@@ -78,23 +79,22 @@ authorized(Port const& port, std::map<std::string, std::string> const& h)
return false;
std::string strUserPass64 = it->second.substr(6);
boost::trim(strUserPass64);
std::string strUserPass = base64_decode(strUserPass64);
std::string strUserPass = base64_decode(strUserPass64);
std::string::size_type nColon = strUserPass.find(":");
if (nColon == std::string::npos)
return false;
std::string strUser = strUserPass.substr(0, nColon);
std::string strUser = strUserPass.substr(0, nColon);
std::string strPassword = strUserPass.substr(nColon + 1);
return strUser == port.user && strPassword == port.password;
}
ServerHandler::ServerHandler(
ServerHandlerCreator const&,
Application& app,
boost::asio::io_context& io_context,
JobQueue& jobQueue,
NetworkOPs& networkOPs,
Resource::Manager& resourceManager,
CollectorManager& cm)
ServerHandler::ServerHandler(ServerHandlerCreator const&,
Application& app,
boost::asio::io_context& io_context,
JobQueue& jobQueue,
NetworkOPs& networkOPs,
Resource::Manager& resourceManager,
CollectorManager& cm)
: app_(app)
, m_resourceManager(resourceManager)
, m_journal(app_.getJournal("Server"))
@@ -104,8 +104,8 @@ ServerHandler::ServerHandler(
{
auto const& group(cm.group("rpc"));
rpc_requests_ = group->make_counter("requests");
rpc_size_ = group->make_event("size");
rpc_time_ = group->make_event("time");
rpc_size_ = group->make_event("size");
rpc_time_ = group->make_event("time");
}
ServerHandler::~ServerHandler()
@@ -116,7 +116,7 @@ ServerHandler::~ServerHandler()
void
ServerHandler::setup(Setup const& setup, beast::Journal journal)
{
setup_ = setup;
setup_ = setup;
endpoints_ = m_server->ports(setup.ports);
// fix auto ports
@@ -146,7 +146,11 @@ ServerHandler::stop()
m_server->close();
{
std::unique_lock lock(mutex_);
condition_.wait(lock, [this] { return stopped_; });
condition_.wait(lock,
[this]
{
return stopped_;
});
}
}
@@ -157,7 +161,8 @@ ServerHandler::onAccept(Session& session, boost::asio::ip::tcp::endpoint endpoin
{
auto const& port = session.port();
auto const c = [this, &port]() {
auto const c = [this, &port]()
{
std::lock_guard lock(mutex_);
return ++count_[port];
}();
@@ -172,16 +177,15 @@ ServerHandler::onAccept(Session& session, boost::asio::ip::tcp::endpoint endpoin
}
Handoff
ServerHandler::onHandoff(
Session& session,
std::unique_ptr<stream_type>&& bundle,
http_request_type&& request,
boost::asio::ip::tcp::endpoint const& remote_address)
ServerHandler::onHandoff(Session& session,
std::unique_ptr<stream_type>&& bundle,
http_request_type&& request,
boost::asio::ip::tcp::endpoint const& remote_address)
{
using namespace boost::beast;
auto const& p{session.port().protocol};
bool const is_ws{
p.count("ws") > 0 || p.count("ws2") > 0 || p.count("wss") > 0 || p.count("wss2") > 0};
bool const is_ws{p.count("ws") > 0 || p.count("ws2") > 0 || p.count("wss") > 0 ||
p.count("wss2") > 0};
if (websocket::is_upgrade(request))
{
@@ -201,14 +205,16 @@ ServerHandler::onHandoff(
auto is{std::make_shared<WSInfoSub>(m_networkOPs, ws)};
auto const beast_remote_address = beast::IPAddressConversion::from_asio(remote_address);
is->getConsumer() = requestInboundEndpoint(
m_resourceManager,
beast_remote_address,
requestRole(
Role::GUEST, session.port(), Json::Value(), beast_remote_address, is->user()),
is->user(),
is->forwarded_for());
ws->appDefined = std::move(is);
is->getConsumer() = requestInboundEndpoint(m_resourceManager,
beast_remote_address,
requestRole(Role::GUEST,
session.port(),
Json::Value(),
beast_remote_address,
is->user()),
is->user(),
is->forwarded_for());
ws->appDefined = std::move(is);
ws->run();
Handoff handoff;
@@ -229,7 +235,10 @@ ServerHandler::onHandoff(
static inline Json::Output
makeOutput(Session& session)
{
return [&](boost::beast::string_view const& b) { session.write(b.data(), b.size()); };
return [&](boost::beast::string_view const& b)
{
session.write(b.data(), b.size());
};
}
static std::map<std::string, std::string>
@@ -241,9 +250,13 @@ build_map(boost::beast::http::fields const& h)
// key cannot be a std::string_view because it needs to be used in
// map and along with iterators
std::string key(e.name_string());
std::transform(key.begin(), key.end(), key.begin(), [](auto kc) {
return std::tolower(static_cast<unsigned char>(kc));
});
std::transform(key.begin(),
key.end(),
key.begin(),
[](auto kc)
{
return std::tolower(static_cast<unsigned char>(kc));
});
c[key] = e.value();
}
return c;
@@ -266,6 +279,8 @@ buffers_to_string(ConstBufferSequence const& bs)
void
ServerHandler::onRequest(Session& session)
{
XRPL_TRACE_RPC(app_.getTelemetry(), "rpc.request");
// Make sure RPC is enabled on the port
if (session.port().protocol.count("http") == 0 && session.port().protocol.count("https") == 0)
{
@@ -283,10 +298,13 @@ ServerHandler::onRequest(Session& session)
}
std::shared_ptr<Session> detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
});
auto const postResult =
m_jobQueue.postCoro(jtCLIENT_RPC,
"RPC-Client",
[this, detachedSession](std::shared_ptr<JobQueue::Coro> coro)
{
processSession(detachedSession, coro);
});
if (postResult == nullptr)
{
// The coroutine was rejected, probably because we're shutting down.
@@ -297,22 +315,24 @@ ServerHandler::onRequest(Session& session)
}
void
ServerHandler::onWSMessage(
std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers)
ServerHandler::onWSMessage(std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers)
{
Json::Value jv;
auto const size = boost::asio::buffer_size(buffers);
if (size > RPC::Tuning::maxRequestSize || !Json::Reader{}.parse(jv, buffers) || !jv.isObject())
{
Json::Value jvResult(Json::objectValue);
jvResult[jss::type] = jss::error;
jvResult[jss::type] = jss::error;
jvResult[jss::error] = "jsonInvalid";
jvResult[jss::value] = buffers_to_string(buffers);
boost::beast::multi_buffer sb;
Json::stream(jvResult, [&sb](auto const p, auto const n) {
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(p, n)));
});
Json::stream(
jvResult,
[&sb](auto const p, auto const n)
{
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(p, n)));
});
JLOG(m_journal.trace()) << "Websocket sending '" << jvResult << "'";
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
@@ -324,10 +344,11 @@ ServerHandler::onWSMessage(
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_WEBSOCKET,
"WS-Client",
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro)
{
auto const jr = this->processSession(session, coro, jv);
auto const s = to_string(jr);
auto const n = s.length();
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
@@ -362,7 +383,8 @@ void
logDuration(Json::Value const& request, T const& duration, beast::Journal& journal)
{
using namespace std::chrono_literals;
auto const level = [&]() {
auto const level = [&]()
{
if (duration >= 10s)
return journal.error();
if (duration >= 1s)
@@ -376,11 +398,11 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ
}
Json::Value
ServerHandler::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv)
ServerHandler::processSession(std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv)
{
XRPL_TRACE_RPC(app_.getTelemetry(), "rpc.ws_message");
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
if (is->getConsumer().disconnect(m_journal))
{
@@ -403,10 +425,10 @@ ServerHandler::processSession(
(jv.isMember(jss::command) && jv.isMember(jss::method) &&
jv[jss::command].asString() != jv[jss::method].asString()))
{
jr[jss::type] = jss::response;
jr[jss::status] = jss::error;
jr[jss::error] = apiVersion == RPC::apiInvalidVersion ? jss::invalid_API_version
: jss::missingCommand;
jr[jss::type] = jss::response;
jr[jss::status] = jss::error;
jr[jss::error] = apiVersion == RPC::apiInvalidVersion ? jss::invalid_API_version
: jss::missingCommand;
jr[jss::request] = jv;
if (jv.isMember(jss::id))
jr[jss::id] = jv[jss::id];
@@ -425,32 +447,30 @@ ServerHandler::processSession(
apiVersion,
app_.config().BETA_RPC_API,
jv.isMember(jss::command) ? jv[jss::command].asString() : jv[jss::method].asString());
auto role = requestRole(
required,
session->port(),
jv,
beast::IP::from_asio(session->remote_endpoint().address()),
is->user());
auto role = requestRole(required,
session->port(),
jv,
beast::IP::from_asio(session->remote_endpoint().address()),
is->user());
if (Role::FORBID == role)
{
loadType = Resource::feeMalformedRPC;
loadType = Resource::feeMalformedRPC;
jr[jss::result] = rpcError(rpcFORBIDDEN);
}
else
{
RPC::JsonContext context{
{app_.getJournal("RPCHandler"),
app_,
loadType,
app_.getOPs(),
app_.getLedgerMaster(),
is->getConsumer(),
role,
coro,
is,
apiVersion},
jv,
{is->user(), is->forwarded_for()}};
RPC::JsonContext context{{app_.getJournal("RPCHandler"),
app_,
loadType,
app_.getOPs(),
app_.getLedgerMaster(),
is->getConsumer(),
role,
coro,
is,
apiVersion},
jv,
{is->user(), is->forwarded_for()}};
auto start = std::chrono::system_clock::now();
RPC::doCommand(context, jr[jss::result]);
@@ -478,7 +498,7 @@ ServerHandler::processSession(
// Regularize result. This is duplicate code.
if (jr[jss::result].isMember(jss::error))
{
jr = jr[jss::result];
jr = jr[jss::result];
jr[jss::status] = jss::error;
auto rq = jv;
@@ -519,23 +539,22 @@ ServerHandler::processSession(
// Run as a coroutine.
void
ServerHandler::processSession(
std::shared_ptr<Session> const& session,
std::shared_ptr<JobQueue::Coro> coro)
ServerHandler::processSession(std::shared_ptr<Session> const& session,
std::shared_ptr<JobQueue::Coro> coro)
{
processRequest(
session->port(),
buffers_to_string(session->request().body().data()),
session->remoteAddress().at_port(0),
makeOutput(*session),
coro,
forwardedFor(session->request()),
[&] {
auto const iter = session->request().find("X-User");
if (iter != session->request().end())
return iter->value();
return boost::beast::string_view{};
}());
processRequest(session->port(),
buffers_to_string(session->request().body().data()),
session->remoteAddress().at_port(0),
makeOutput(*session),
coro,
forwardedFor(session->request()),
[&]
{
auto const iter = session->request().find("X-User");
if (iter != session->request().end())
return iter->value();
return boost::beast::string_view{};
}());
if (beast::rfc2616::is_keep_alive(session->request()))
{
@@ -551,28 +570,28 @@ static Json::Value
make_json_error(Json::Int code, Json::Value&& message)
{
Json::Value sub{Json::objectValue};
sub["code"] = code;
sub["code"] = code;
sub["message"] = std::move(message);
Json::Value r{Json::objectValue};
r["error"] = sub;
return r;
}
Json::Int constexpr method_not_found = -32601;
Json::Int constexpr method_not_found = -32601;
Json::Int constexpr server_overloaded = -32604;
Json::Int constexpr forbidden = -32605;
Json::Int constexpr wrong_version = -32606;
Json::Int constexpr forbidden = -32605;
Json::Int constexpr wrong_version = -32606;
void
ServerHandler::processRequest(
Port const& port,
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output const& output,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user)
ServerHandler::processRequest(Port const& port,
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output const& output,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user)
{
XRPL_TRACE_RPC(app_.getTelemetry(), "rpc.process");
auto rpcJ = app_.getJournal("RPC");
Json::Value jsonOrig;
@@ -581,16 +600,15 @@ ServerHandler::processRequest(
if ((request.size() > RPC::Tuning::maxRequestSize) || !reader.parse(request, jsonOrig) ||
!jsonOrig || !jsonOrig.isObject())
{
HTTPReply(
400,
"Unable to parse request: " + reader.getFormattedErrorMessages(),
output,
rpcJ);
HTTPReply(400,
"Unable to parse request: " + reader.getFormattedErrorMessages(),
output,
rpcJ);
return;
}
}
bool batch = false;
bool batch = false;
unsigned size = 1;
if (jsonOrig.isMember(jss::method) && jsonOrig[jss::method] == "batch")
{
@@ -613,7 +631,7 @@ ServerHandler::processRequest(
{
Json::Value r(Json::objectValue);
r[jss::request] = jsonRPC;
r[jss::error] = make_json_error(method_not_found, "Method not found");
r[jss::error] = make_json_error(method_not_found, "Method not found");
reply.append(r);
continue;
}
@@ -622,8 +640,8 @@ ServerHandler::processRequest(
if (jsonRPC.isMember(jss::params) && jsonRPC[jss::params].isArray() &&
jsonRPC[jss::params].size() > 0 && jsonRPC[jss::params][0u].isObject())
{
apiVersion = RPC::getAPIVersionNumber(
jsonRPC[jss::params][Json::UInt(0)], app_.config().BETA_RPC_API);
apiVersion = RPC::getAPIVersionNumber(jsonRPC[jss::params][Json::UInt(0)],
app_.config().BETA_RPC_API);
}
if (apiVersion == RPC::apiVersionIfUnspecified && batch)
@@ -641,25 +659,29 @@ ServerHandler::processRequest(
}
Json::Value r(Json::objectValue);
r[jss::request] = jsonRPC;
r[jss::error] = make_json_error(wrong_version, jss::invalid_API_version.c_str());
r[jss::error] = make_json_error(wrong_version, jss::invalid_API_version.c_str());
reply.append(r);
continue;
}
/* ------------------------------------------------------------------ */
auto role = Role::FORBID;
auto role = Role::FORBID;
auto required = Role::FORBID;
if (jsonRPC.isMember(jss::method) && jsonRPC[jss::method].isString())
{
required = RPC::roleRequired(
apiVersion, app_.config().BETA_RPC_API, jsonRPC[jss::method].asString());
required = RPC::roleRequired(apiVersion,
app_.config().BETA_RPC_API,
jsonRPC[jss::method].asString());
}
if (jsonRPC.isMember(jss::params) && jsonRPC[jss::params].isArray() &&
jsonRPC[jss::params].size() > 0 && jsonRPC[jss::params][Json::UInt(0)].isObjectOrNull())
{
role = requestRole(
required, port, jsonRPC[jss::params][Json::UInt(0)], remoteIPAddress, user);
role = requestRole(required,
port,
jsonRPC[jss::params][Json::UInt(0)],
remoteIPAddress,
user);
}
else
{
@@ -673,8 +695,9 @@ ServerHandler::processRequest(
}
else
{
usage = m_resourceManager.newInboundEndpoint(
remoteIPAddress, role == Role::PROXY, forwardedFor);
usage = m_resourceManager.newInboundEndpoint(remoteIPAddress,
role == Role::PROXY,
forwardedFor);
if (usage.disconnect(m_journal))
{
if (!batch)
@@ -821,19 +844,18 @@ ServerHandler::processRequest(
Resource::Charge loadType = Resource::feeReferenceRPC;
RPC::JsonContext context{
{m_journal,
app_,
loadType,
m_networkOPs,
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
params,
{user, forwardedFor}};
RPC::JsonContext context{{m_journal,
app_,
loadType,
m_networkOPs,
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
params,
{user, forwardedFor}};
Json::Value result;
auto start = std::chrono::system_clock::now();
@@ -866,8 +888,8 @@ ServerHandler::processRequest(
if (result.isMember(jss::error))
{
result[jss::status] = jss::error;
result["code"] = result[jss::error_code];
result["message"] = result[jss::error_message];
result["code"] = result[jss::error_code];
result["message"] = result[jss::error_message];
result.removeMember(jss::error_message);
JLOG(m_journal.debug())
<< "rpcError: " << result[jss::error] << ": " << result[jss::error_message];
@@ -876,7 +898,7 @@ ServerHandler::processRequest(
else
{
result[jss::status] = jss::success;
r[jss::result] = std::move(result);
r[jss::result] = std::move(result);
}
}
else
@@ -899,7 +921,7 @@ ServerHandler::processRequest(
rq[jss::seed_hex.c_str()] = "<masked>";
}
result[jss::status] = jss::error;
result[jss::status] = jss::error;
result[jss::request] = rq;
JLOG(m_journal.debug())
@@ -939,7 +961,8 @@ ServerHandler::processRequest(
}
// If we're returning an error_code, use that to determine the HTTP status.
int const httpStatus = [&reply]() {
int const httpStatus = [&reply]()
{
// This feature is enabled with ripplerpc version 3.0 and above.
// Before ripplerpc version 3.0 always return 200.
if (reply.isMember(jss::ripplerpc) && reply[jss::ripplerpc].isString() &&
@@ -959,9 +982,8 @@ ServerHandler::processRequest(
auto response = to_string(reply);
rpc_time_.notify(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start));
rpc_time_.notify(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start));
++rpc_requests_;
rpc_size_.notify(beast::insight::Event::value_type{response.size()});
@@ -1000,8 +1022,8 @@ ServerHandler::statusResponse(http_request_type const& request) const
{
msg.result(boost::beast::http::status::ok);
msg.body() = "<!DOCTYPE html><html><head><title>Test page for " + systemName() +
"</title></head><body><h1>Test</h1><p>This page shows " + systemName() +
" http(s) connectivity is working.</p></body></html>";
"</title></head><body><h1>Test</h1><p>This page shows " + systemName() +
" http(s) connectivity is working.</p></body></html>";
}
else
{
@@ -1071,19 +1093,19 @@ to_Port(ParsedPort const& parsed, std::ostream& log)
}
p.protocol = parsed.protocol;
p.user = parsed.user;
p.password = parsed.password;
p.admin_user = parsed.admin_user;
p.admin_password = parsed.admin_password;
p.ssl_key = parsed.ssl_key;
p.ssl_cert = parsed.ssl_cert;
p.ssl_chain = parsed.ssl_chain;
p.ssl_ciphers = parsed.ssl_ciphers;
p.pmd_options = parsed.pmd_options;
p.ws_queue_limit = parsed.ws_queue_limit;
p.limit = parsed.limit;
p.admin_nets_v4 = parsed.admin_nets_v4;
p.admin_nets_v6 = parsed.admin_nets_v6;
p.user = parsed.user;
p.password = parsed.password;
p.admin_user = parsed.admin_user;
p.admin_password = parsed.admin_password;
p.ssl_key = parsed.ssl_key;
p.ssl_cert = parsed.ssl_cert;
p.ssl_chain = parsed.ssl_chain;
p.ssl_ciphers = parsed.ssl_ciphers;
p.pmd_options = parsed.pmd_options;
p.ws_queue_limit = parsed.ws_queue_limit;
p.limit = parsed.limit;
p.admin_nets_v4 = parsed.admin_nets_v4;
p.admin_nets_v6 = parsed.admin_nets_v6;
p.secure_gateway_nets_v4 = parsed.secure_gateway_nets_v4;
p.secure_gateway_nets_v6 = parsed.secure_gateway_nets_v6;
@@ -1146,9 +1168,12 @@ parse_Ports(Config const& config, std::ostream& log)
}
else
{
auto const count = std::count_if(result.cbegin(), result.cend(), [](Port const& p) {
return p.protocol.count("peer") != 0;
});
auto const count = std::count_if(result.cbegin(),
result.cend(),
[](Port const& p)
{
return p.protocol.count("peer") != 0;
});
if (count > 1)
{
@@ -1185,10 +1210,10 @@ setup_Client(ServerHandler::Setup& setup)
{
setup.client.ip = iter->ip.to_string();
}
setup.client.port = iter->port;
setup.client.user = iter->user;
setup.client.password = iter->password;
setup.client.admin_user = iter->admin_user;
setup.client.port = iter->port;
setup.client.user = iter->user;
setup.client.password = iter->password;
setup.client.admin_user = iter->admin_user;
setup.client.admin_password = iter->admin_password;
}
@@ -1196,9 +1221,12 @@ setup_Client(ServerHandler::Setup& setup)
static void
setup_Overlay(ServerHandler::Setup& setup)
{
auto const iter = std::find_if(setup.ports.cbegin(), setup.ports.cend(), [](Port const& port) {
return port.protocol.count("peer") != 0;
});
auto const iter = std::find_if(setup.ports.cbegin(),
setup.ports.cend(),
[](Port const& port)
{
return port.protocol.count("peer") != 0;
});
if (iter == setup.ports.cend())
{
setup.overlay = {};
@@ -1220,22 +1248,20 @@ setup_ServerHandler(Config const& config, std::ostream& log)
}
std::unique_ptr<ServerHandler>
make_ServerHandler(
Application& app,
boost::asio::io_context& io_context,
JobQueue& jobQueue,
NetworkOPs& networkOPs,
Resource::Manager& resourceManager,
CollectorManager& cm)
make_ServerHandler(Application& app,
boost::asio::io_context& io_context,
JobQueue& jobQueue,
NetworkOPs& networkOPs,
Resource::Manager& resourceManager,
CollectorManager& cm)
{
return std::make_unique<ServerHandler>(
ServerHandler::ServerHandlerCreator(),
app,
io_context,
jobQueue,
networkOPs,
resourceManager,
cm);
return std::make_unique<ServerHandler>(ServerHandler::ServerHandlerCreator(),
app,
io_context,
jobQueue,
networkOPs,
resourceManager,
cm);
}
} // namespace xrpl

View File

@@ -0,0 +1,115 @@
#pragma once
/** Convenience macros for instrumenting code with OpenTelemetry trace spans.
When XRPL_ENABLE_TELEMETRY is defined, the macros create SpanGuard objects
that manage span lifetime via RAII. When not defined, all macros expand to
((void)0) with zero overhead.
Usage in instrumented code:
@code
XRPL_TRACE_RPC(app.getTelemetry(), "rpc.command." + name);
XRPL_TRACE_SET_ATTR("xrpl.rpc.command", name);
XRPL_TRACE_SET_ATTR("xrpl.rpc.status", "success");
@endcode
@note Macro parameter names use leading/trailing underscores
(e.g. _tel_obj_) to avoid colliding with identifiers in the macro body,
specifically the ::xrpl::telemetry:: namespace qualifier.
*/
#ifdef XRPL_ENABLE_TELEMETRY
#include <xrpl/telemetry/SpanGuard.h>
#include <xrpl/telemetry/Telemetry.h>
#include <optional>
namespace xrpl {
namespace telemetry {
/** Start an unconditional span, ended when the guard goes out of scope.
@param _tel_obj_ Telemetry instance reference.
@param _span_name_ Span name string.
*/
#define XRPL_TRACE_SPAN(_tel_obj_, _span_name_) \
auto _xrpl_span_ = (_tel_obj_).startSpan(_span_name_); \
::xrpl::telemetry::SpanGuard _xrpl_guard_(_xrpl_span_)
/** Start an unconditional span with a specific SpanKind.
@param _tel_obj_ Telemetry instance reference.
@param _span_name_ Span name string.
@param _span_kind_ opentelemetry::trace::SpanKind value.
*/
#define XRPL_TRACE_SPAN_KIND(_tel_obj_, _span_name_, _span_kind_) \
auto _xrpl_span_ = (_tel_obj_).startSpan(_span_name_, _span_kind_); \
::xrpl::telemetry::SpanGuard _xrpl_guard_(_xrpl_span_)
/** Conditionally start a span for RPC tracing.
The span is only created if shouldTraceRpc() returns true.
@param _tel_obj_ Telemetry instance reference.
@param _span_name_ Span name string.
*/
#define XRPL_TRACE_RPC(_tel_obj_, _span_name_) \
std::optional<::xrpl::telemetry::SpanGuard> _xrpl_guard_; \
if ((_tel_obj_).shouldTraceRpc()) \
{ \
_xrpl_guard_.emplace((_tel_obj_).startSpan(_span_name_)); \
}
/** Conditionally start a span for transaction tracing.
The span is only created if shouldTraceTransactions() returns true.
@param _tel_obj_ Telemetry instance reference.
@param _span_name_ Span name string.
*/
#define XRPL_TRACE_TX(_tel_obj_, _span_name_) \
std::optional<::xrpl::telemetry::SpanGuard> _xrpl_guard_; \
if ((_tel_obj_).shouldTraceTransactions()) \
{ \
_xrpl_guard_.emplace((_tel_obj_).startSpan(_span_name_)); \
}
/** Conditionally start a span for consensus tracing.
The span is only created if shouldTraceConsensus() returns true.
@param _tel_obj_ Telemetry instance reference.
@param _span_name_ Span name string.
*/
#define XRPL_TRACE_CONSENSUS(_tel_obj_, _span_name_) \
std::optional<::xrpl::telemetry::SpanGuard> _xrpl_guard_; \
if ((_tel_obj_).shouldTraceConsensus()) \
{ \
_xrpl_guard_.emplace((_tel_obj_).startSpan(_span_name_)); \
}
/** Set a key-value attribute on the current span (if it exists).
Must be used after one of the XRPL_TRACE_* span macros.
*/
#define XRPL_TRACE_SET_ATTR(key, value) \
if (_xrpl_guard_.has_value()) \
{ \
_xrpl_guard_->setAttribute(key, value); \
}
/** Record an exception on the current span and mark it as error.
Must be used after one of the XRPL_TRACE_* span macros.
*/
#define XRPL_TRACE_EXCEPTION(e) \
if (_xrpl_guard_.has_value()) \
{ \
_xrpl_guard_->recordException(e); \
}
} // namespace telemetry
} // namespace xrpl
#else // XRPL_ENABLE_TELEMETRY not defined
#define XRPL_TRACE_SPAN(_tel_obj_, _span_name_) ((void)0)
#define XRPL_TRACE_SPAN_KIND(_tel_obj_, _span_name_, _span_kind_) ((void)0)
#define XRPL_TRACE_RPC(_tel_obj_, _span_name_) ((void)0)
#define XRPL_TRACE_TX(_tel_obj_, _span_name_) ((void)0)
#define XRPL_TRACE_CONSENSUS(_tel_obj_, _span_name_) ((void)0)
#define XRPL_TRACE_SET_ATTR(key, value) ((void)0)
#define XRPL_TRACE_EXCEPTION(e) ((void)0)
#endif // XRPL_ENABLE_TELEMETRY