fix(telemetry): address code review issues in OTelCollector

- Fix use-after-free: extract gauge callback to static function and call
  RemoveCallback in ~OTelGaugeImpl() before unregistering from collector
- Use memory_order_acq_rel on callHooks() debounce CAS for proper
  happens-before relationship between hook invocations
- Add explicit 2s timeout to ForceFlush() in destructor to prevent
  blocking indefinitely when OTLP endpoint is unreachable at shutdown
- Add OTLP receiver to metrics pipeline so native OTel metrics from
  xrpld are actually received by the collector
- Remove stale health check port from docker-compose (extension was
  removed from collector config)
- Clarify fallback docs: StatsD path requires re-enabling receiver/port
- Fix comments: Counter uses uint64_t not int64_t, gauge clamps to
  [0, INT64_MAX] not [0, UINT64_MAX]

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-05-06 14:24:52 +01:00
parent ed31bab500
commit 761688383d
5 changed files with 28 additions and 28 deletions

View File

@@ -27,7 +27,6 @@ services:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP (traces + native OTel metrics)
- "8889:8889" # Prometheus metrics (spanmetrics + OTLP)
- "13133:13133" # Health check
# StatsD UDP port removed — beast::insight now uses native OTLP.
# Uncomment if using server=statsd fallback:
# - "8125:8125/udp"

View File

@@ -2,7 +2,7 @@
#
# Pipelines:
# traces: OTLP receiver -> batch processor -> debug + Tempo + spanmetrics
# metrics: spanmetrics connector -> Prometheus exporter
# metrics: OTLP receiver + spanmetrics connector -> Prometheus exporter
#
# xrpld sends traces via OTLP/HTTP to port 4318. The collector batches
# them, forwards to Tempo, and derives RED metrics via the spanmetrics
@@ -55,5 +55,5 @@ service:
processors: [batch]
exporters: [debug, otlp/tempo, spanmetrics]
metrics:
receivers: [spanmetrics]
receivers: [otlp, spanmetrics]
exporters: [prometheus]

View File

@@ -293,7 +293,7 @@ prefix=xrpld
The `OTelCollector` implementation exports metrics via OTLP/HTTP to the same OTel Collector that receives traces. No separate StatsD receiver is needed.
> **Fallback**: Set `server=statsd` and `address=127.0.0.1:8125` to use the legacy StatsD UDP path during the transition period.
> **Fallback**: Set `server=statsd` and `address=127.0.0.1:8125` to use the legacy StatsD UDP path. This requires re-enabling the `statsd` receiver in `otel-collector-config.yaml` and uncommenting port 8125 in `docker-compose.yml`.
### Metric Reference

View File

@@ -49,7 +49,7 @@ namespace insight {
* Replaces StatsD-based metric collection with native OTel Metrics SDK
* instruments. Each beast::insight instrument maps to an OTel equivalent:
*
* - Counter -> OTel Counter<int64_t>
* - Counter -> OTel Counter<uint64_t>
* - Gauge -> OTel ObservableGauge<int64_t> (async callback)
* - Event -> OTel Histogram<double> (duration in milliseconds)
* - Meter -> OTel Counter<uint64_t> (monotonic, unsigned)

View File

@@ -17,7 +17,7 @@
* OTelCounterImpl / OTelGaugeImpl / OTelEventImpl / OTelMeterImpl
* | | | |
* v v v v
* OTel Counter<uint64> ObservableGauge Histogram<double> Counter<uint64>
* Counter<uint64_t> ObservableGauge Histogram<double> Counter<uint64_t>
* | | | |
* +--------------------+----------------+--------------+
* |
@@ -123,7 +123,7 @@ private:
/**
* @brief OTel-backed implementation of beast::insight::CounterImpl.
*
* Wraps an OTel Counter<int64_t> instrument. Each increment() call
* Wraps an OTel Counter<uint64_t> instrument. Each increment() call
* is forwarded directly to the OTel counter's Add() method. The
* PeriodicMetricReader collects and exports the accumulated delta.
*
@@ -239,7 +239,7 @@ public:
/**
* @brief Increment (or decrement) the gauge by a signed amount.
*
* Clamps the result to [0, UINT64_MAX] to match StatsDGaugeImpl
* Clamps the result to [0, INT64_MAX] to match StatsDGaugeImpl
* behavior.
*
* @param amount Signed amount to add to the current value.
@@ -254,6 +254,10 @@ public:
int64_t
currentValue() const;
/** Static callback registered with the OTel SDK observable gauge. */
static void
gaugeCallback(opentelemetry::metrics::ObserverResult result, void* state);
private:
OTelGaugeImpl&
operator=(OTelGaugeImpl const&);
@@ -578,27 +582,25 @@ OTelGaugeImpl::OTelGaugeImpl(
: m_gauge(meter->CreateInt64ObservableGauge(name)), m_collector(collector)
{
m_collector->addGauge(this);
m_gauge->AddCallback(gaugeCallback, this);
}
// Register the async callback that the SDK calls during collection.
// Before reading the gauge value, invoke all registered hooks so that
// hook handlers (e.g. NetworkOPs State_Accounting) have a chance to
// update gauge values. callHooks() uses a debounce timestamp so hooks
// run at most once per collection cycle even with many gauges.
m_gauge->AddCallback(
[](opentelemetry::metrics::ObserverResult result, void* state) {
auto* self = static_cast<OTelGaugeImpl*>(state);
self->m_collector->callHooks();
if (auto intResult = opentelemetry::nostd::get_if<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<int64_t>>>(&result))
{
(*intResult)->Observe(self->currentValue());
}
},
this);
void
OTelGaugeImpl::gaugeCallback(opentelemetry::metrics::ObserverResult result, void* state)
{
auto* self = static_cast<OTelGaugeImpl*>(state);
self->m_collector->callHooks();
if (auto intResult = opentelemetry::nostd::get_if<
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<int64_t>>>(
&result))
{
(*intResult)->Observe(self->currentValue());
}
}
OTelGaugeImpl::~OTelGaugeImpl()
{
m_gauge->RemoveCallback(gaugeCallback, this);
m_collector->removeGauge(this);
}
@@ -720,8 +722,7 @@ OTelCollectorImp::~OTelCollectorImp()
m_journal.info() << "OTelCollector shutting down";
if (m_provider)
{
// ForceFlush to export any pending metrics before shutdown.
m_provider->ForceFlush();
m_provider->ForceFlush(std::chrono::milliseconds(2000));
m_provider->Shutdown();
}
if (m_journal.info())
@@ -783,10 +784,10 @@ OTelCollectorImp::callHooks()
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
auto last = m_lastHookCallMs.load(std::memory_order_relaxed);
auto last = m_lastHookCallMs.load(std::memory_order_acquire);
if (now - last < 500)
return;
if (!m_lastHookCallMs.compare_exchange_strong(last, now, std::memory_order_relaxed))
if (!m_lastHookCallMs.compare_exchange_strong(last, now, std::memory_order_acq_rel))
return; // Another thread won the race.
std::lock_guard lock(m_mutex);