diff --git a/src/xrpld/app/main/Application.cpp b/src/xrpld/app/main/Application.cpp index 8d4c620fae..44a280af85 100644 --- a/src/xrpld/app/main/Application.cpp +++ b/src/xrpld/app/main/Application.cpp @@ -1686,6 +1686,19 @@ ApplicationImp::run() // The order of these stop calls is delicate. // Re-ordering them risks undefined behavior. m_loadManager->stop(); + + // Detach MetricsRegistry observable-gauge callbacks BEFORE stopping + // any service the callbacks read from. The callbacks run on the OTel + // reader thread and touch nodeStore_, overlay_, networkOPs_, + // ledgerMaster, inboundLedgers, etc. A final tick that fires after + // one of those services has shut down would dereference dangling + // state. detachCallbacks() flips an atomic flag every callback + // acquire-loads at its entry, so subsequent ticks become no-ops. + // The final provider teardown still happens in metricsRegistry_->stop() + // farther down. + if (metricsRegistry_) + metricsRegistry_->detachCallbacks(); + m_shaMapStore->stop(); m_jobQueue->stop(); if (overlay_) diff --git a/src/xrpld/telemetry/MetricsRegistry.cpp b/src/xrpld/telemetry/MetricsRegistry.cpp index d13486f07c..2e348f58c0 100644 --- a/src/xrpld/telemetry/MetricsRegistry.cpp +++ b/src/xrpld/telemetry/MetricsRegistry.cpp @@ -176,6 +176,15 @@ MetricsRegistry::start(std::string const& endpoint, std::string const& instanceI #endif // XRPL_ENABLE_TELEMETRY } +void +MetricsRegistry::detachCallbacks() noexcept +{ +#ifdef XRPL_ENABLE_TELEMETRY + // Release so every subsequent callback acquire-load sees true. + callbacksDetached_.store(true, std::memory_order_release); +#endif // XRPL_ENABLE_TELEMETRY +} + void MetricsRegistry::stop() { @@ -185,10 +194,16 @@ MetricsRegistry::stop() JLOG(journal_.info()) << "MetricsRegistry: stopping"; + // Belt-and-suspenders: detachCallbacks() should have already been + // called by Application shutdown before any service the callbacks + // observe was stopped. Setting the flag here is redundant for a + // correct caller but protects against a future caller that forgets + // to detach first. + callbacksDetached_.store(true, std::memory_order_release); + // Force-flush any pending metrics, then destroy the provider. // This stops the PeriodicExportingMetricReader, which in turn - // stops invoking observable gauge callbacks. No explicit - // RemoveCallback is needed — the provider destruction handles it. + // stops invoking observable gauge callbacks. provider_->ForceFlush(); provider_.reset(); @@ -344,6 +359,8 @@ MetricsRegistry::registerCacheHitRateGauge() cacheHitRateGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -413,6 +430,8 @@ MetricsRegistry::registerTxqGauge() txqGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -457,7 +476,10 @@ MetricsRegistry::registerObjectCountGauge() objectCountGauge_ = meter_->CreateInt64ObservableGauge( "xrpld_object_count", "Live instance counts for key internal object types"); objectCountGauge_->AddCallback( - [](opentelemetry::metrics::ObserverResult result, void* /* state */) { + [](opentelemetry::metrics::ObserverResult result, void* state) { + auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; try { // Iterate through all CountedObject types via the linked @@ -488,6 +510,8 @@ MetricsRegistry::registerLoadFactorGauge() loadFactorGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -562,6 +586,8 @@ MetricsRegistry::registerNodeStoreGauge() nodeStoreGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -633,6 +659,8 @@ MetricsRegistry::registerServerInfoGauge() serverInfoGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -721,6 +749,8 @@ MetricsRegistry::registerCompleteLedgersGauge() completeLedgersGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -778,6 +808,8 @@ MetricsRegistry::registerDbMetricsGauge() dbMetricsGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -815,6 +847,8 @@ MetricsRegistry::registerValidatorHealthGauge() validatorHealthGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -862,6 +896,8 @@ MetricsRegistry::registerPeerQualityGauge() peerQualityGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -941,6 +977,8 @@ MetricsRegistry::registerLedgerEconomyGauge() ledgerEconomyGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -999,6 +1037,8 @@ MetricsRegistry::registerStateTrackingGauge() stateTrackingGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -1046,6 +1086,8 @@ MetricsRegistry::registerStorageDetailGauge() storageDetailGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; auto& app = self->app_; try @@ -1083,6 +1125,8 @@ MetricsRegistry::registerValidationAgreementGauge() validationAgreementGauge_->AddCallback( [](opentelemetry::metrics::ObserverResult result, void* state) { auto* self = static_cast(state); + if (self->callbacksDetached_.load(std::memory_order_acquire)) + return; try { diff --git a/src/xrpld/telemetry/MetricsRegistry.h b/src/xrpld/telemetry/MetricsRegistry.h index 4d2cf11d1b..9a59fb28dd 100644 --- a/src/xrpld/telemetry/MetricsRegistry.h +++ b/src/xrpld/telemetry/MetricsRegistry.h @@ -129,6 +129,7 @@ #include +#include #include #include #include @@ -231,7 +232,30 @@ public: void start(std::string const& endpoint, std::string const& instanceId = {}); - /** Flush pending metrics and shut down the pipeline. */ + /** Detach all ObservableGauge callbacks so they no-op on the next + reader-thread tick. + + Must be called BEFORE any Application service that the callbacks + read (nodeStore, overlay, networkOPs, ledgerMaster, etc.) is + stopped. The flag is checked with acquire ordering at the top of + every callback; together with the release store here it + guarantees that once `detachCallbacks()` returns, no subsequent + callback invocation will dereference an already-stopped service. + + Idempotent. Safe to call multiple times. Safe to call before + `start()` (has no effect). The actual SDK-level provider + shutdown still happens in `stop()`. + */ + void + detachCallbacks() noexcept; + + /** Flush pending metrics and shut down the pipeline. + + @pre `detachCallbacks()` should have been called earlier in the + shutdown sequence; otherwise there is a narrow race between + the final reader-thread tick and the destruction of + Application services that the gauge callbacks read from. + */ void stop(); @@ -354,6 +378,14 @@ private: /// Journal for logging. beast::Journal const journal_; + + /// Set by detachCallbacks() during shutdown so every ObservableGauge + /// callback returns early before reading Application services that + /// may already be stopped. Checked with memory_order_acquire at the + /// top of each callback to pair with the memory_order_release store + /// in detachCallbacks(). + std::atomic callbacksDetached_{false}; + /// The SDK MeterProvider that owns the export pipeline. std::shared_ptr provider_;