Phase 9: Internal Metric Instrumentation Gap Fill (Tasks 9.1-9.10)

Implement ~50 OTel metrics covering NodeStore I/O, cache hit rates,
TxQ state, PerfLog per-RPC/per-job counters, CountedObject instances,
and load factor breakdown via MetricsRegistry.

Core implementation:
- MetricsRegistry class with synchronous instruments (Counter, Histogram)
  for RPC and Job metrics, and ObservableGauge callbacks for cache, TxQ,
  CountedObject, LoadFactor, and NodeStore state polling.
- ServiceRegistry extended with getMetricsRegistry() virtual method.
- Application wires MetricsRegistry lifecycle (create/start/stop).
- PerfLogImp instrumented to emit OTel metrics on RPC and Job events.

Dashboards & observability:
- 3 new Grafana dashboards: RPC Performance, Job Queue, Fee Market/TxQ.
- Extended statsd-node-health dashboard with NodeStore, Cache, and
  CountedObject panels.
- 10 alerting rules added to telemetry-runbook.md.
- Integration test extended with 12 OTel metric validation checks.

Documentation:
- 09-data-collection-reference.md updated with Phase 9 metric tables.
- Unit tests for MetricsRegistry disabled-path (no-op) behavior.

All OTel SDK code guarded with #ifdef XRPL_ENABLE_TELEMETRY.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-03-10 16:20:54 +00:00
parent b73592f934
commit 9289cb671d
13 changed files with 2722 additions and 11 deletions

View File

@@ -0,0 +1,374 @@
/** Unit tests for MetricsRegistry.
Tests cover:
- Construction with telemetry disabled (no-op behavior).
- start()/stop() lifecycle when disabled.
- Synchronous instrument recording methods do not crash when disabled.
- Double stop() is safe.
NOTE: Tests that exercise the OTel SDK path require XRPL_ENABLE_TELEMETRY
to be defined at build time (telemetry=ON). The no-op path tests run
unconditionally.
*/
#include <xrpld/telemetry/MetricsRegistry.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/core/ServiceRegistry.h>
namespace xrpl {
namespace test {
/** Minimal mock ServiceRegistry for MetricsRegistry testing.
Only the getMetricsRegistry() call is used in the tests; other methods
are not invoked because the registry is disabled (enabled=false) so no
gauge callbacks execute.
All pure virtual methods throw to catch accidental calls during tests.
*/
class MockServiceRegistry : public ServiceRegistry
{
[[noreturn]] void
throwUnimplemented() const
{
Throw<std::logic_error>("MockServiceRegistry: method not implemented");
}
public:
// ServiceRegistry interface — stubs that should never be called.
CollectorManager&
getCollectorManager() override
{
throwUnimplemented();
}
Family&
getNodeFamily() override
{
throwUnimplemented();
}
TimeKeeper&
timeKeeper() override
{
throwUnimplemented();
}
JobQueue&
getJobQueue() override
{
throwUnimplemented();
}
NodeCache&
getTempNodeCache() override
{
throwUnimplemented();
}
CachedSLEs&
cachedSLEs() override
{
throwUnimplemented();
}
NetworkIDService&
getNetworkIDService() override
{
throwUnimplemented();
}
AmendmentTable&
getAmendmentTable() override
{
throwUnimplemented();
}
HashRouter&
getHashRouter() override
{
throwUnimplemented();
}
LoadFeeTrack&
getFeeTrack() override
{
throwUnimplemented();
}
LoadManager&
getLoadManager() override
{
throwUnimplemented();
}
RCLValidations&
getValidations() override
{
throwUnimplemented();
}
ValidatorList&
validators() override
{
throwUnimplemented();
}
ValidatorSite&
validatorSites() override
{
throwUnimplemented();
}
ManifestCache&
validatorManifests() override
{
throwUnimplemented();
}
ManifestCache&
publisherManifests() override
{
throwUnimplemented();
}
Overlay&
overlay() override
{
throwUnimplemented();
}
Cluster&
cluster() override
{
throwUnimplemented();
}
PeerReservationTable&
peerReservations() override
{
throwUnimplemented();
}
Resource::Manager&
getResourceManager() override
{
throwUnimplemented();
}
NodeStore::Database&
getNodeStore() override
{
throwUnimplemented();
}
SHAMapStore&
getSHAMapStore() override
{
throwUnimplemented();
}
RelationalDatabase&
getRelationalDatabase() override
{
throwUnimplemented();
}
InboundLedgers&
getInboundLedgers() override
{
throwUnimplemented();
}
InboundTransactions&
getInboundTransactions() override
{
throwUnimplemented();
}
TaggedCache<uint256, AcceptedLedger>&
getAcceptedLedgerCache() override
{
throwUnimplemented();
}
LedgerMaster&
getLedgerMaster() override
{
throwUnimplemented();
}
LedgerCleaner&
getLedgerCleaner() override
{
throwUnimplemented();
}
LedgerReplayer&
getLedgerReplayer() override
{
throwUnimplemented();
}
PendingSaves&
pendingSaves() override
{
throwUnimplemented();
}
OpenLedger&
openLedger() override
{
throwUnimplemented();
}
OpenLedger const&
openLedger() const override
{
throwUnimplemented();
}
NetworkOPs&
getOPs() override
{
throwUnimplemented();
}
OrderBookDB&
getOrderBookDB() override
{
throwUnimplemented();
}
TransactionMaster&
getMasterTransaction() override
{
throwUnimplemented();
}
TxQ&
getTxQ() override
{
throwUnimplemented();
}
PathRequests&
getPathRequests() override
{
throwUnimplemented();
}
ServerHandler&
getServerHandler() override
{
throwUnimplemented();
}
perf::PerfLog&
getPerfLog() override
{
throwUnimplemented();
}
telemetry::Telemetry&
getTelemetry() override
{
throwUnimplemented();
}
telemetry::MetricsRegistry*
getMetricsRegistry() override
{
return nullptr;
}
bool
isStopping() const override
{
return false;
}
beast::Journal
journal(std::string const&) override
{
return beast::Journal(beast::Journal::getNullSink());
}
boost::asio::io_context&
getIOContext() override
{
throwUnimplemented();
}
Logs&
logs() override
{
throwUnimplemented();
}
std::optional<uint256> const&
trapTxID() const override
{
static std::optional<uint256> const empty;
return empty;
}
DatabaseCon&
getWalletDB() override
{
throwUnimplemented();
}
Application&
app() override
{
throwUnimplemented();
}
};
class MetricsRegistry_test : public beast::unit_test::suite
{
void
testDisabledConstruction()
{
testcase("Disabled construction");
MockServiceRegistry mockApp;
beast::Journal j(beast::Journal::getNullSink());
// Construct with enabled=false; should be a no-op.
telemetry::MetricsRegistry registry(false, mockApp, j);
BEAST_EXPECT(!registry.isEnabled());
}
void
testDisabledStartStop()
{
testcase("Disabled start/stop");
MockServiceRegistry mockApp;
beast::Journal j(beast::Journal::getNullSink());
telemetry::MetricsRegistry registry(false, mockApp, j);
// start() and stop() should be no-ops when disabled.
registry.start("http://localhost:4318/v1/metrics");
registry.stop();
// Double stop should be safe.
registry.stop();
pass();
}
void
testDisabledRecording()
{
testcase("Disabled recording methods");
MockServiceRegistry mockApp;
beast::Journal j(beast::Journal::getNullSink());
telemetry::MetricsRegistry registry(false, mockApp, j);
registry.start("http://localhost:4318/v1/metrics");
// All recording methods should be no-ops (not crash).
registry.recordRpcStarted("server_info");
registry.recordRpcFinished("server_info", 1000);
registry.recordRpcErrored("ledger", 500);
registry.recordJobQueued("ledgerData");
registry.recordJobStarted("ledgerData", 200);
registry.recordJobFinished("ledgerData", 3000);
registry.stop();
pass();
}
void
testDestructorStops()
{
testcase("Destructor calls stop");
MockServiceRegistry mockApp;
beast::Journal j(beast::Journal::getNullSink());
{
// Let the destructor handle cleanup.
telemetry::MetricsRegistry registry(false, mockApp, j);
registry.start("http://localhost:4318/v1/metrics");
}
// If we get here without crash, the destructor handled stop.
pass();
}
public:
void
run() override
{
testDisabledConstruction();
testDisabledStartStop();
testDisabledRecording();
testDestructorStops();
}
};
BEAST_DEFINE_TESTSUITE(MetricsRegistry, telemetry, ripple);
} // namespace test
} // namespace xrpl

View File

@@ -29,6 +29,7 @@
#include <xrpld/overlay/PeerSet.h>
#include <xrpld/overlay/make_Overlay.h>
#include <xrpld/shamap/NodeFamily.h>
#include <xrpld/telemetry/MetricsRegistry.h>
#include <xrpl/basics/ByteUtilities.h>
#include <xrpl/basics/ResolverAsio.h>
@@ -148,6 +149,9 @@ public:
beast::Journal m_journal;
std::unique_ptr<perf::PerfLog> perfLog_;
std::unique_ptr<telemetry::Telemetry> telemetry_;
/// OTel metrics registry for gap-fill metrics (counters, histograms,
/// observable gauges). Created after telemetry_ during setup().
std::unique_ptr<telemetry::MetricsRegistry> metricsRegistry_;
Application::MutexType m_masterMutex;
// Required by the SHAMapStore
@@ -633,6 +637,12 @@ public:
return *telemetry_;
}
telemetry::MetricsRegistry*
getMetricsRegistry() override
{
return metricsRegistry_.get();
}
NodeCache&
getTempNodeCache() override
{
@@ -1282,6 +1292,11 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
if (!config_->section("telemetry").exists("service_instance_id"))
telemetry_->setServiceInstanceId(toBase58(TokenType::NodePublic, nodeIdentity_->first));
// Create the OTel MetricsRegistry for gap-fill metrics (counters,
// histograms, observable gauges). It is started later in start().
metricsRegistry_ = std::make_unique<telemetry::MetricsRegistry>(
telemetry_->isEnabled(), *this, logs_->journal("MetricsRegistry"));
if (!cluster_->load(config().section(SECTION_CLUSTER_NODES)))
{
JLOG(m_journal.fatal()) << "Invalid entry in cluster configuration.";
@@ -1494,6 +1509,16 @@ ApplicationImp::start(bool withTimers)
ledgerCleaner_->start();
perfLog_->start();
telemetry_->start();
// Start the metrics pipeline after telemetry; the endpoint uses the
// same base URL but the /v1/metrics path.
if (metricsRegistry_)
{
auto const& section = config_->section("telemetry");
std::string endpoint = "http://localhost:4318/v1/metrics";
set(endpoint, "metrics_endpoint", section);
metricsRegistry_->start(endpoint);
}
}
void
@@ -1584,6 +1609,10 @@ ApplicationImp::run()
ledgerCleaner_->stop();
m_nodeStore->stop();
perfLog_->stop();
// Stop metrics pipeline before telemetry — gauge callbacks reference
// Application services that may be shutting down.
if (metricsRegistry_)
metricsRegistry_->stop();
// Telemetry must stop last among trace-producing components.
// serverHandler_, overlay_, and jobQueue_ are already stopped above,
// so no threads should be calling startSpan() at this point.

View File

@@ -1,9 +1,11 @@
#include <xrpld/perflog/detail/PerfLogImp.h>
#include <xrpld/telemetry/MetricsRegistry.h>
#include <xrpl/basics/BasicConfig.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/core/JobTypes.h>
#include <xrpl/core/ServiceRegistry.h>
#include <xrpl/json/json_writer.h>
#include <atomic>
@@ -314,6 +316,10 @@ PerfLogImp::rpcStart(std::string const& method, std::uint64_t const requestId)
}
std::lock_guard lock(counters_.methodsMutex_);
counters_.methods_[requestId] = {counter->first.c_str(), steady_clock::now()};
// Task 9.4: Record RPC start in OTel metrics pipeline.
if (auto* mr = app_.getMetricsRegistry())
mr->recordRpcStarted(method);
}
void
@@ -343,13 +349,25 @@ PerfLogImp::rpcEnd(std::string const& method, std::uint64_t const requestId, boo
// LCOV_EXCL_STOP
}
}
std::lock_guard lock(counter->second.mutex);
if (finish)
++counter->second.value.finished;
else
++counter->second.value.errored;
counter->second.value.duration +=
std::chrono::duration_cast<microseconds>(steady_clock::now() - startTime);
auto const duration = std::chrono::duration_cast<microseconds>(steady_clock::now() - startTime);
{
std::lock_guard lock(counter->second.mutex);
if (finish)
++counter->second.value.finished;
else
++counter->second.value.errored;
counter->second.value.duration += duration;
}
// Task 9.4: Record RPC completion/error in OTel metrics pipeline.
if (auto* mr = app_.getMetricsRegistry())
{
auto const durUs = duration.count();
if (finish)
mr->recordRpcFinished(method, durUs);
else
mr->recordRpcErrored(method, durUs);
}
}
void
@@ -365,6 +383,10 @@ PerfLogImp::jobQueue(JobType const type)
}
std::lock_guard lock(counter->second.mutex);
++counter->second.value.queued;
// Task 9.5: Record job enqueue in OTel metrics pipeline.
if (auto* mr = app_.getMetricsRegistry())
mr->recordJobQueued(JobTypes::name(type));
}
void
@@ -391,6 +413,10 @@ PerfLogImp::jobStart(
std::lock_guard lock(counters_.jobsMutex_);
if (instance >= 0 && instance < counters_.jobs_.size())
counters_.jobs_[instance] = {type, startTime};
// Task 9.5: Record job start in OTel metrics pipeline.
if (auto* mr = app_.getMetricsRegistry())
mr->recordJobStarted(JobTypes::name(type), dur.count());
}
void
@@ -413,6 +439,10 @@ PerfLogImp::jobFinish(JobType const type, microseconds dur, int instance)
std::lock_guard lock(counters_.jobsMutex_);
if (instance >= 0 && instance < counters_.jobs_.size())
counters_.jobs_[instance] = {jtINVALID, steady_time_point()};
// Task 9.5: Record job finish in OTel metrics pipeline.
if (auto* mr = app_.getMetricsRegistry())
mr->recordJobFinished(JobTypes::name(type), dur.count());
}
void

View File

@@ -0,0 +1,473 @@
/** MetricsRegistry implementation — OpenTelemetry metric instruments for rippled.
This file contains:
- Construction / destruction logic for the OTel MeterProvider pipeline.
- Synchronous instrument creation (counters, histograms) for RPC, job
queue, and NodeStore I/O metrics.
- Observable gauge callback registration for cache hit rates, TxQ state,
CountedObject instances, load factors, and NodeStore queue depth.
- No-op stubs when XRPL_ENABLE_TELEMETRY is not defined.
*/
#include <xrpld/telemetry/MetricsRegistry.h>
#ifdef XRPL_ENABLE_TELEMETRY
#include <xrpld/app/ledger/AcceptedLedger.h>
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/misc/TxQ.h>
#include <xrpl/basics/CountedObject.h>
#include <xrpl/core/ServiceRegistry.h>
#include <xrpl/nodestore/Database.h>
#include <xrpl/server/LoadFeeTrack.h>
#include <opentelemetry/exporters/otlp/otlp_http_metric_exporter_factory.h>
#include <opentelemetry/exporters/otlp/otlp_http_metric_exporter_options.h>
#include <opentelemetry/metrics/provider.h>
#include <opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_factory.h>
#include <opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_options.h>
#include <opentelemetry/sdk/metrics/meter_provider.h>
#include <opentelemetry/sdk/metrics/meter_provider_factory.h>
namespace metric_api = opentelemetry::metrics;
namespace metric_sdk = opentelemetry::sdk::metrics;
namespace otlp_http = opentelemetry::exporter::otlp;
#endif // XRPL_ENABLE_TELEMETRY
namespace xrpl {
namespace telemetry {
MetricsRegistry::MetricsRegistry(bool enabled, ServiceRegistry& app, beast::Journal journal)
: enabled_(enabled), app_(app), journal_(journal)
{
}
MetricsRegistry::~MetricsRegistry()
{
stop();
}
void
MetricsRegistry::start(std::string const& endpoint)
{
#ifdef XRPL_ENABLE_TELEMETRY
if (!enabled_)
return;
JLOG(journal_.info()) << "MetricsRegistry: starting, endpoint=" << endpoint;
// Configure OTLP/HTTP metric exporter.
otlp_http::OtlpHttpMetricExporterOptions exporterOpts;
exporterOpts.url = endpoint;
auto exporter = otlp_http::OtlpHttpMetricExporterFactory::Create(exporterOpts);
// Configure periodic reader with 10-second export interval.
metric_sdk::PeriodicExportingMetricReaderOptions readerOpts;
readerOpts.export_interval_millis = std::chrono::milliseconds(10000);
readerOpts.export_timeout_millis = std::chrono::milliseconds(5000);
auto reader =
metric_sdk::PeriodicExportingMetricReaderFactory::Create(std::move(exporter), readerOpts);
// Create MeterProvider and attach the reader.
provider_ = std::make_shared<metric_sdk::MeterProvider>();
provider_->AddMetricReader(std::move(reader));
// Get a meter for all rippled instruments.
meter_ = provider_->GetMeter("rippled", "1.0.0");
// --- Create synchronous instruments ---
// RPC per-method counters and histogram.
rpcStartedCounter_ =
meter_->CreateUInt64Counter("rpc_method_started_total", "Total RPC method calls started");
rpcFinishedCounter_ = meter_->CreateUInt64Counter(
"rpc_method_finished_total", "Total RPC method calls completed successfully");
rpcErroredCounter_ = meter_->CreateUInt64Counter(
"rpc_method_errored_total", "Total RPC method calls that errored");
rpcDurationHistogram_ = meter_->CreateDoubleHistogram(
"rpc_method_duration_us", "RPC method execution time in microseconds");
// Job queue per-type counters and histograms.
jobQueuedCounter_ = meter_->CreateUInt64Counter("job_queued_total", "Total jobs enqueued");
jobStartedCounter_ = meter_->CreateUInt64Counter("job_started_total", "Total jobs started");
jobFinishedCounter_ = meter_->CreateUInt64Counter("job_finished_total", "Total jobs completed");
jobQueuedDurationHistogram_ = meter_->CreateDoubleHistogram(
"job_queued_duration_us", "Time jobs spent waiting in the queue (microseconds)");
jobRunningDurationHistogram_ = meter_->CreateDoubleHistogram(
"job_running_duration_us", "Job execution time in microseconds");
// Register all observable (async) gauges.
registerAsyncGauges();
JLOG(journal_.info()) << "MetricsRegistry: started successfully";
#else
(void)endpoint;
#endif // XRPL_ENABLE_TELEMETRY
}
void
MetricsRegistry::stop()
{
#ifdef XRPL_ENABLE_TELEMETRY
if (!provider_)
return;
JLOG(journal_.info()) << "MetricsRegistry: stopping";
// Force-flush any pending metrics, then destroy the provider.
// This stops the PeriodicExportingMetricReader, which in turn
// stops invoking observable gauge callbacks. No explicit
// RemoveCallback is needed — the provider destruction handles it.
provider_->ForceFlush();
provider_.reset();
JLOG(journal_.info()) << "MetricsRegistry: stopped";
#endif // XRPL_ENABLE_TELEMETRY
}
// -----------------------------------------------------------------
// Synchronous instrument recording — RPC metrics (Task 9.4)
// -----------------------------------------------------------------
void
MetricsRegistry::recordRpcStarted(std::string_view method)
{
#ifdef XRPL_ENABLE_TELEMETRY
if (!enabled_ || !rpcStartedCounter_)
return;
rpcStartedCounter_->Add(1, {{"method", std::string(method)}});
#else
(void)method;
#endif
}
void
MetricsRegistry::recordRpcFinished(std::string_view method, std::int64_t durationUs)
{
#ifdef XRPL_ENABLE_TELEMETRY
if (!enabled_ || !rpcFinishedCounter_)
return;
rpcFinishedCounter_->Add(1, {{"method", std::string(method)}});
if (rpcDurationHistogram_)
rpcDurationHistogram_->Record(
static_cast<double>(durationUs), {{"method", std::string(method)}});
#else
(void)method;
(void)durationUs;
#endif
}
void
MetricsRegistry::recordRpcErrored(std::string_view method, std::int64_t durationUs)
{
#ifdef XRPL_ENABLE_TELEMETRY
if (!enabled_ || !rpcErroredCounter_)
return;
rpcErroredCounter_->Add(1, {{"method", std::string(method)}});
if (rpcDurationHistogram_)
rpcDurationHistogram_->Record(
static_cast<double>(durationUs), {{"method", std::string(method)}});
#else
(void)method;
(void)durationUs;
#endif
}
// -----------------------------------------------------------------
// Synchronous instrument recording — Job Queue metrics (Task 9.5)
// -----------------------------------------------------------------
void
MetricsRegistry::recordJobQueued(std::string_view jobType)
{
#ifdef XRPL_ENABLE_TELEMETRY
if (!enabled_ || !jobQueuedCounter_)
return;
jobQueuedCounter_->Add(1, {{"job_type", std::string(jobType)}});
#else
(void)jobType;
#endif
}
void
MetricsRegistry::recordJobStarted(std::string_view jobType, std::int64_t queuedDurUs)
{
#ifdef XRPL_ENABLE_TELEMETRY
if (!enabled_ || !jobStartedCounter_)
return;
jobStartedCounter_->Add(1, {{"job_type", std::string(jobType)}});
if (jobQueuedDurationHistogram_)
jobQueuedDurationHistogram_->Record(
static_cast<double>(queuedDurUs), {{"job_type", std::string(jobType)}});
#else
(void)jobType;
(void)queuedDurUs;
#endif
}
void
MetricsRegistry::recordJobFinished(std::string_view jobType, std::int64_t runningDurUs)
{
#ifdef XRPL_ENABLE_TELEMETRY
if (!enabled_ || !jobFinishedCounter_)
return;
jobFinishedCounter_->Add(1, {{"job_type", std::string(jobType)}});
if (jobRunningDurationHistogram_)
jobRunningDurationHistogram_->Record(
static_cast<double>(runningDurUs), {{"job_type", std::string(jobType)}});
#else
(void)jobType;
(void)runningDurUs;
#endif
}
// -----------------------------------------------------------------
// Observable gauge callbacks (Tasks 9.1, 9.2, 9.3, 9.6, 9.7)
// -----------------------------------------------------------------
#ifdef XRPL_ENABLE_TELEMETRY
void
MetricsRegistry::registerAsyncGauges()
{
// --- Task 9.2: Cache hit rate and size gauges ---
cacheHitRateGauge_ =
meter_->CreateDoubleObservableGauge("cache_metrics", "Cache hit rates and sizes");
cacheHitRateGauge_->AddCallback(
[](opentelemetry::metrics::ObserverResult result, void* state) {
auto* self = static_cast<MetricsRegistry*>(state);
auto& app = self->app_;
try
{
// SLE cache hit rate (0.0 - 1.0).
auto sleRate = app.cachedSLEs().rate();
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<double>>>(result)
->Observe(sleRate, {{"metric", "SLE_hit_rate"}});
// Ledger cache hit rate.
auto ledgerRate = app.getLedgerMaster().getCacheHitRate();
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<double>>>(result)
->Observe(ledgerRate, {{"metric", "ledger_hit_rate"}});
// AcceptedLedger cache hit rate.
auto alRate = app.getAcceptedLedgerCache().getHitRate();
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<double>>>(result)
->Observe(alRate, {{"metric", "AL_hit_rate"}});
// TreeNode cache size.
auto tnCacheSize = app.getNodeFamily().getTreeNodeCache()->getCacheSize();
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<double>>>(result)
->Observe(
static_cast<double>(tnCacheSize), {{"metric", "treenode_cache_size"}});
// TreeNode track size.
auto tnTrackSize = app.getNodeFamily().getTreeNodeCache()->getTrackSize();
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<double>>>(result)
->Observe(
static_cast<double>(tnTrackSize), {{"metric", "treenode_track_size"}});
// FullBelow cache size.
auto fbSize = app.getNodeFamily().getFullBelowCache()->size();
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<double>>>(result)
->Observe(static_cast<double>(fbSize), {{"metric", "fullbelow_size"}});
}
catch (...)
{
// Silently skip if services are not yet ready.
}
},
this);
// --- Task 9.3: TxQ metrics gauges ---
txqGauge_ = meter_->CreateDoubleObservableGauge("txq_metrics", "Transaction queue metrics");
txqGauge_->AddCallback(
[](opentelemetry::metrics::ObserverResult result, void* state) {
auto* self = static_cast<MetricsRegistry*>(state);
auto& app = self->app_;
try
{
auto const metrics = app.getTxQ().getMetrics(*app.openLedger().current());
auto observe = [&](char const* name, double value) {
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<double>>>(result)
->Observe(value, {{"metric", name}});
};
observe("txq_count", static_cast<double>(metrics.txCount));
observe(
"txq_max_size",
metrics.txQMaxSize ? static_cast<double>(*metrics.txQMaxSize) : 0.0);
observe("txq_in_ledger", static_cast<double>(metrics.txInLedger));
observe("txq_per_ledger", static_cast<double>(metrics.txPerLedger));
observe(
"txq_reference_fee_level",
static_cast<double>(metrics.referenceFeeLevel.fee()));
observe(
"txq_min_processing_fee_level",
static_cast<double>(metrics.minProcessingFeeLevel.fee()));
observe("txq_med_fee_level", static_cast<double>(metrics.medFeeLevel.fee()));
observe(
"txq_open_ledger_fee_level",
static_cast<double>(metrics.openLedgerFeeLevel.fee()));
}
catch (...)
{
// Silently skip if TxQ or OpenLedger are not yet ready.
}
},
this);
// --- Task 9.6: Counted object instance gauges ---
objectCountGauge_ = meter_->CreateInt64ObservableGauge(
"object_count", "Live instance counts for key internal object types");
objectCountGauge_->AddCallback(
[](opentelemetry::metrics::ObserverResult result, void* /* state */) {
try
{
// Iterate through all CountedObject types via the linked
// list in CountedObjects. We report all types with count
// > 0, filtering to the key types of interest.
auto counts = CountedObjects::getInstance().getCounts(0);
for (auto const& [name, count] : counts)
{
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<int64_t>>>(result)
->Observe(static_cast<int64_t>(count), {{"type", name}});
}
}
catch (...)
{
// Silently skip on error.
}
},
this);
// --- Task 9.7: Load factor breakdown gauges ---
loadFactorGauge_ =
meter_->CreateDoubleObservableGauge("load_factor_metrics", "Fee load factor breakdown");
loadFactorGauge_->AddCallback(
[](opentelemetry::metrics::ObserverResult result, void* state) {
auto* self = static_cast<MetricsRegistry*>(state);
auto& app = self->app_;
try
{
auto& feeTrack = app.getFeeTrack();
auto const loadBase = static_cast<double>(feeTrack.getLoadBase());
auto observe = [&](char const* name, double value) {
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<double>>>(result)
->Observe(value, {{"metric", name}});
};
// Combined load factor (server component).
observe(
"load_factor_server", static_cast<double>(feeTrack.getLoadFactor()) / loadBase);
// Individual factor components.
observe(
"load_factor_local", static_cast<double>(feeTrack.getLocalFee()) / loadBase);
observe("load_factor_net", static_cast<double>(feeTrack.getRemoteFee()) / loadBase);
observe(
"load_factor_cluster",
static_cast<double>(feeTrack.getClusterFee()) / loadBase);
// Fee escalation factors from TxQ.
auto const metrics = app.getTxQ().getMetrics(*app.openLedger().current());
auto refLevel = static_cast<double>(metrics.referenceFeeLevel.fee());
if (refLevel > 0)
{
observe(
"load_factor_fee_escalation",
static_cast<double>(metrics.openLedgerFeeLevel.fee()) / refLevel);
observe(
"load_factor_fee_queue",
static_cast<double>(metrics.minProcessingFeeLevel.fee()) / refLevel);
}
// Combined load factor (max of server and fee escalation).
auto const loadFactorServer = feeTrack.getLoadFactor();
auto const loadBaseServer = feeTrack.getLoadBase();
double combined = static_cast<double>(loadFactorServer) / loadBase;
if (refLevel > 0)
{
double feeEscalation = static_cast<double>(metrics.openLedgerFeeLevel.fee()) *
loadBaseServer / refLevel;
if (feeEscalation > static_cast<double>(loadFactorServer))
{
combined = feeEscalation / loadBase;
}
}
observe("load_factor", combined);
}
catch (...)
{
// Silently skip if services are not yet ready.
}
},
this);
// --- Task 9.1: NodeStore I/O gauges ---
// The cumulative counters (reads, writes, bytes) are also exposed here
// as observable gauges. This avoids adding an xrpld dependency into the
// libxrpl nodestore code — the MetricsRegistry reads the existing atomic
// counters from Database via its public accessors.
nodeStoreGauge_ = meter_->CreateInt64ObservableGauge(
"nodestore_state", "NodeStore I/O counters, queue depth, and write load");
nodeStoreGauge_->AddCallback(
[](opentelemetry::metrics::ObserverResult result, void* state) {
auto* self = static_cast<MetricsRegistry*>(state);
auto& app = self->app_;
try
{
auto& db = app.getNodeStore();
auto observe = [&](char const* name, int64_t value) {
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<int64_t>>>(result)
->Observe(value, {{"metric", name}});
};
// Cumulative counters (monotonically increasing).
observe("node_reads_total", static_cast<int64_t>(db.getFetchTotalCount()));
observe("node_reads_hit", static_cast<int64_t>(db.getFetchHitCount()));
observe("node_writes", static_cast<int64_t>(db.getStoreCount()));
observe("node_written_bytes", static_cast<int64_t>(db.getStoreSize()));
observe("node_read_bytes", static_cast<int64_t>(db.getFetchSize()));
// Write load score (instantaneous).
observe("write_load", static_cast<int64_t>(db.getWriteLoad()));
// Read queue depth (instantaneous).
Json::Value obj(Json::objectValue);
db.getCountsJson(obj);
if (obj.isMember("read_queue"))
{
observe("read_queue", static_cast<int64_t>(obj["read_queue"].asUInt()));
}
}
catch (...)
{
// Silently skip on error.
}
},
this);
}
#endif // XRPL_ENABLE_TELEMETRY
} // namespace telemetry
} // namespace xrpl

View File

@@ -0,0 +1,280 @@
#pragma once
/** Central OTel Metrics Registry for rippled.
Owns all OpenTelemetry metric instruments (counters, histograms,
observable gauges) that are NOT already covered by the beast::insight
StatsD pipeline. The instruments are created once at startup and polled
by the OTel PeriodicExportingMetricReader at a configurable interval
(default 10 s).
When XRPL_ENABLE_TELEMETRY is **not** defined, this class compiles to a
lightweight no-op: every public method is an empty inline.
Dependency / ownership diagram (ASCII):
Application
|
+-- MetricsRegistry (unique_ptr, created in setup(), started/stopped with telemetry)
|
+-- OTel MeterProvider (owns reader + exporter)
| |
| +-- PeriodicExportingMetricReader
| +-- OtlpHttpMetricExporter
|
+-- Counters / Histograms (synchronous instruments)
| +-- rpc_method_started_total
| +-- rpc_method_finished_total
| +-- rpc_method_errored_total
| +-- rpc_method_duration_us (Histogram)
| +-- job_queued_total
| +-- job_started_total
| +-- job_finished_total
| +-- job_queued_duration_us (Histogram)
| +-- job_running_duration_us (Histogram)
|
+-- Observable Gauges (async callbacks, polled by reader)
+-- Cache hit rates (SLE, ledger, AL)
+-- TreeNode / FullBelow sizes
+-- TxQ metrics
+-- CountedObject counts
+-- Load factor breakdown
+-- NodeStore I/O gauges
Control-flow for async gauges:
PeriodicExportingMetricReader (background thread, 10 s tick)
|
v
OTel SDK invokes registered ObservableGauge callbacks
|
v
Each callback reads current value from Application services
(e.g. app.getTxQ().getMetrics(), app.getFeeTrack().getLoadFactor())
|
v
Result set is exported via OTLP/HTTP to the collector
Control-flow for synchronous instruments:
PerfLogImp::rpcStart/rpcEnd/jobQueue/jobStart/jobFinish
|
v
MetricsRegistry::recordRpc*(method, ...) / recordJob*(type, ...)
|
v
OTel Counter::Add() or Histogram::Record()
|
v
Periodically flushed by the MetricReader
Example usage:
@code
// In Application::setup(), after telemetry_ is created:
metricsRegistry_ = std::make_unique<telemetry::MetricsRegistry>(
telemetry_->isEnabled(), app, journal);
metricsRegistry_->start(setup.exporterEndpoint);
// In PerfLogImp::rpcStart():
if (auto* mr = app_.getMetricsRegistry())
mr->recordRpcStarted("server_info");
// In PerfLogImp::rpcEnd():
if (auto* mr = app_.getMetricsRegistry())
{
mr->recordRpcFinished("server_info", durationUs);
// or: mr->recordRpcErrored("server_info", durationUs);
}
// In PerfLogImp::jobQueue():
if (auto* mr = app_.getMetricsRegistry())
mr->recordJobQueued("ledgerData");
// Shutdown:
metricsRegistry_->stop();
@endcode
Caveats:
- The MetricsRegistry must be created AFTER the Telemetry object because
it reads isEnabled() to decide whether to initialize the OTel SDK.
- Observable gauge callbacks capture a reference to the Application; the
Application must outlive the MetricsRegistry (guaranteed because
MetricsRegistry is stopped before Application teardown).
- If a new CountedObject type is added, it will NOT appear automatically
in the object_count gauge; the callback iterates a fixed list.
- Adding a new synchronous instrument requires updating both the header
and the .cpp, then calling the new record*() method from the
instrumentation site.
*/
#include <xrpl/beast/utility/Journal.h>
#include <cstdint>
#include <memory>
#include <string>
#include <string_view>
#ifdef XRPL_ENABLE_TELEMETRY
#include <opentelemetry/metrics/meter.h>
#include <opentelemetry/metrics/meter_provider.h>
#include <opentelemetry/metrics/observer_result.h>
#include <opentelemetry/sdk/metrics/meter_provider.h>
#endif
namespace xrpl {
class ServiceRegistry;
namespace telemetry {
class MetricsRegistry
{
public:
/** Construct a MetricsRegistry.
@param enabled Whether OTel metric export is active. When false,
all methods become no-ops.
@param app Reference to the ServiceRegistry (Application) for
reading current metric values in gauge callbacks.
@param journal Journal for log output.
*/
MetricsRegistry(bool enabled, ServiceRegistry& app, beast::Journal journal);
~MetricsRegistry();
/// Non-copyable, non-movable.
MetricsRegistry(MetricsRegistry const&) = delete;
MetricsRegistry&
operator=(MetricsRegistry const&) = delete;
/** Initialize the OTel metrics pipeline and register all instruments.
@param endpoint OTLP/HTTP endpoint URL for metric export
(e.g. "http://localhost:4318/v1/metrics").
*/
void
start(std::string const& endpoint);
/** Flush pending metrics and shut down the pipeline. */
void
stop();
/** @return true if the registry is actively exporting metrics. */
bool
isEnabled() const noexcept
{
return enabled_;
}
// -----------------------------------------------------------------
// Synchronous instrument recording (called from PerfLog hot paths)
// -----------------------------------------------------------------
/** Record an RPC method call start.
@param method The RPC method name (e.g. "server_info").
*/
void
recordRpcStarted(std::string_view method);
/** Record an RPC method call completion.
@param method The RPC method name.
@param durationUs Execution time in microseconds.
*/
void
recordRpcFinished(std::string_view method, std::int64_t durationUs);
/** Record an RPC method call error.
@param method The RPC method name.
@param durationUs Execution time in microseconds.
*/
void
recordRpcErrored(std::string_view method, std::int64_t durationUs);
/** Record a job enqueued event.
@param jobType The job type name (e.g. "ledgerData").
*/
void
recordJobQueued(std::string_view jobType);
/** Record a job start event.
@param jobType The job type name.
@param queuedDurUs Time the job spent waiting in the queue (us).
*/
void
recordJobStarted(std::string_view jobType, std::int64_t queuedDurUs);
/** Record a job finish event.
@param jobType The job type name.
@param runningDurUs Execution time in microseconds.
*/
void
recordJobFinished(std::string_view jobType, std::int64_t runningDurUs);
private:
/// Master enable flag; when false all methods are no-ops.
bool const enabled_;
/// Reference to Application services for gauge callbacks.
ServiceRegistry& app_;
/// Journal for logging.
beast::Journal const journal_;
#ifdef XRPL_ENABLE_TELEMETRY
/// The SDK MeterProvider that owns the export pipeline.
std::shared_ptr<opentelemetry::sdk::metrics::MeterProvider> provider_;
/// The Meter used to create all instruments.
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::Meter> meter_;
// --- Synchronous instruments (RPC) ---
/// Counter: rpc_method_started_total{method="<name>"}
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> rpcStartedCounter_;
/// Counter: rpc_method_finished_total{method="<name>"}
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> rpcFinishedCounter_;
/// Counter: rpc_method_errored_total{method="<name>"}
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> rpcErroredCounter_;
/// Histogram: rpc_method_duration_us{method="<name>"}
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Histogram<double>>
rpcDurationHistogram_;
// --- Synchronous instruments (Job Queue) ---
/// Counter: job_queued_total{job_type="<name>"}
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> jobQueuedCounter_;
/// Counter: job_started_total{job_type="<name>"}
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> jobStartedCounter_;
/// Counter: job_finished_total{job_type="<name>"}
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> jobFinishedCounter_;
/// Histogram: job_queued_duration_us{job_type="<name>"}
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Histogram<double>>
jobQueuedDurationHistogram_;
/// Histogram: job_running_duration_us{job_type="<name>"}
opentelemetry::nostd::unique_ptr<opentelemetry::metrics::Histogram<double>>
jobRunningDurationHistogram_;
// --- Observable gauges (registered via callbacks) ---
// Handles are stored so we can remove callbacks on shutdown.
/// Observable gauges for cache hit rates and sizes.
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument>
cacheHitRateGauge_;
/// Observable gauges for TxQ metrics.
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> txqGauge_;
/// Observable gauges for counted object instances.
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument>
objectCountGauge_;
/// Observable gauges for load factor breakdown.
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> loadFactorGauge_;
/// Observable gauges for NodeStore write_load and read_queue.
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> nodeStoreGauge_;
/** Register all observable gauge callbacks with the OTel SDK.
Called once during start().
*/
void
registerAsyncGauges();
#endif // XRPL_ENABLE_TELEMETRY
};
} // namespace telemetry
} // namespace xrpl