mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-03 08:46:46 +00:00
fix(telemetry): detach metrics gauge callbacks before Application services stop
MetricsRegistry observable-gauge callbacks run on the OTel reader thread and read live state from nodeStore_, overlay_, networkOPs_, ledgerMaster, inboundLedgers, loadManager, and others. The old shutdown sequence called metricsRegistry_->stop() AFTER all those services were already stopped, which left a race window between each service's stop() and the final provider_->ForceFlush() during which a callback could dereference already-stopped service state. The try/catch guards in each callback mitigated crashes but not reads from freed members. - Add MetricsRegistry::detachCallbacks() that sets an atomic<bool> callbacksDetached_ with release ordering. Idempotent. - Guard every ObservableGauge callback entry with an acquire-load of the same flag and return early if it is set. Covers all 15 registered callbacks (cacheHitRate, txq, objectCount, loadFactor, nodeStore, serverInfo, buildInfo, completeLedgers, dbMetrics, validatorHealth, peerQuality, ledgerEconomy, stateTracking, storageDetail, validationAgreement). - Application::run() shutdown sequence now calls metricsRegistry_->detachCallbacks() right after m_loadManager->stop() and BEFORE m_shaMapStore, m_jobQueue, overlay_, grpcServer_, m_networkOPs, serverHandler_, m_ledgerReplayer, m_inboundTransactions, m_inboundLedgers, ledgerCleaner_, m_nodeStore, perfLog_ are stopped. The acquire/release pair guarantees subsequent reader-thread ticks see the detach before they dereference stopped services. - metricsRegistry_->stop() keeps setting the flag as a belt-and-suspenders defense in case a future caller forgets to detach first. - Drop the misleading "No explicit RemoveCallback is needed" comment from stop(); provider destruction alone does not beat the reader thread to already-freed state. The objectCountGauge callback previously discarded its state pointer via `void* /* state */`; restore the state argument so it can access self->callbacksDetached_ too.
This commit is contained in:
@@ -1686,6 +1686,19 @@ ApplicationImp::run()
|
||||
// The order of these stop calls is delicate.
|
||||
// Re-ordering them risks undefined behavior.
|
||||
m_loadManager->stop();
|
||||
|
||||
// Detach MetricsRegistry observable-gauge callbacks BEFORE stopping
|
||||
// any service the callbacks read from. The callbacks run on the OTel
|
||||
// reader thread and touch nodeStore_, overlay_, networkOPs_,
|
||||
// ledgerMaster, inboundLedgers, etc. A final tick that fires after
|
||||
// one of those services has shut down would dereference dangling
|
||||
// state. detachCallbacks() flips an atomic flag every callback
|
||||
// acquire-loads at its entry, so subsequent ticks become no-ops.
|
||||
// The final provider teardown still happens in metricsRegistry_->stop()
|
||||
// farther down.
|
||||
if (metricsRegistry_)
|
||||
metricsRegistry_->detachCallbacks();
|
||||
|
||||
m_shaMapStore->stop();
|
||||
m_jobQueue->stop();
|
||||
if (overlay_)
|
||||
|
||||
@@ -176,6 +176,15 @@ MetricsRegistry::start(std::string const& endpoint, std::string const& instanceI
|
||||
#endif // XRPL_ENABLE_TELEMETRY
|
||||
}
|
||||
|
||||
void
|
||||
MetricsRegistry::detachCallbacks() noexcept
|
||||
{
|
||||
#ifdef XRPL_ENABLE_TELEMETRY
|
||||
// Release so every subsequent callback acquire-load sees true.
|
||||
callbacksDetached_.store(true, std::memory_order_release);
|
||||
#endif // XRPL_ENABLE_TELEMETRY
|
||||
}
|
||||
|
||||
void
|
||||
MetricsRegistry::stop()
|
||||
{
|
||||
@@ -185,10 +194,16 @@ MetricsRegistry::stop()
|
||||
|
||||
JLOG(journal_.info()) << "MetricsRegistry: stopping";
|
||||
|
||||
// Belt-and-suspenders: detachCallbacks() should have already been
|
||||
// called by Application shutdown before any service the callbacks
|
||||
// observe was stopped. Setting the flag here is redundant for a
|
||||
// correct caller but protects against a future caller that forgets
|
||||
// to detach first.
|
||||
callbacksDetached_.store(true, std::memory_order_release);
|
||||
|
||||
// Force-flush any pending metrics, then destroy the provider.
|
||||
// This stops the PeriodicExportingMetricReader, which in turn
|
||||
// stops invoking observable gauge callbacks. No explicit
|
||||
// RemoveCallback is needed — the provider destruction handles it.
|
||||
// stops invoking observable gauge callbacks.
|
||||
provider_->ForceFlush();
|
||||
provider_.reset();
|
||||
|
||||
@@ -344,6 +359,8 @@ MetricsRegistry::registerCacheHitRateGauge()
|
||||
cacheHitRateGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -413,6 +430,8 @@ MetricsRegistry::registerTxqGauge()
|
||||
txqGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -457,7 +476,10 @@ MetricsRegistry::registerObjectCountGauge()
|
||||
objectCountGauge_ = meter_->CreateInt64ObservableGauge(
|
||||
"xrpld_object_count", "Live instance counts for key internal object types");
|
||||
objectCountGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* /* state */) {
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
try
|
||||
{
|
||||
// Iterate through all CountedObject types via the linked
|
||||
@@ -488,6 +510,8 @@ MetricsRegistry::registerLoadFactorGauge()
|
||||
loadFactorGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -562,6 +586,8 @@ MetricsRegistry::registerNodeStoreGauge()
|
||||
nodeStoreGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -633,6 +659,8 @@ MetricsRegistry::registerServerInfoGauge()
|
||||
serverInfoGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -721,6 +749,8 @@ MetricsRegistry::registerCompleteLedgersGauge()
|
||||
completeLedgersGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -778,6 +808,8 @@ MetricsRegistry::registerDbMetricsGauge()
|
||||
dbMetricsGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -815,6 +847,8 @@ MetricsRegistry::registerValidatorHealthGauge()
|
||||
validatorHealthGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -862,6 +896,8 @@ MetricsRegistry::registerPeerQualityGauge()
|
||||
peerQualityGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -941,6 +977,8 @@ MetricsRegistry::registerLedgerEconomyGauge()
|
||||
ledgerEconomyGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -999,6 +1037,8 @@ MetricsRegistry::registerStateTrackingGauge()
|
||||
stateTrackingGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -1046,6 +1086,8 @@ MetricsRegistry::registerStorageDetailGauge()
|
||||
storageDetailGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
auto& app = self->app_;
|
||||
|
||||
try
|
||||
@@ -1083,6 +1125,8 @@ MetricsRegistry::registerValidationAgreementGauge()
|
||||
validationAgreementGauge_->AddCallback(
|
||||
[](opentelemetry::metrics::ObserverResult result, void* state) {
|
||||
auto* self = static_cast<MetricsRegistry*>(state);
|
||||
if (self->callbacksDetached_.load(std::memory_order_acquire))
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
|
||||
@@ -129,6 +129,7 @@
|
||||
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
@@ -231,7 +232,30 @@ public:
|
||||
void
|
||||
start(std::string const& endpoint, std::string const& instanceId = {});
|
||||
|
||||
/** Flush pending metrics and shut down the pipeline. */
|
||||
/** Detach all ObservableGauge callbacks so they no-op on the next
|
||||
reader-thread tick.
|
||||
|
||||
Must be called BEFORE any Application service that the callbacks
|
||||
read (nodeStore, overlay, networkOPs, ledgerMaster, etc.) is
|
||||
stopped. The flag is checked with acquire ordering at the top of
|
||||
every callback; together with the release store here it
|
||||
guarantees that once `detachCallbacks()` returns, no subsequent
|
||||
callback invocation will dereference an already-stopped service.
|
||||
|
||||
Idempotent. Safe to call multiple times. Safe to call before
|
||||
`start()` (has no effect). The actual SDK-level provider
|
||||
shutdown still happens in `stop()`.
|
||||
*/
|
||||
void
|
||||
detachCallbacks() noexcept;
|
||||
|
||||
/** Flush pending metrics and shut down the pipeline.
|
||||
|
||||
@pre `detachCallbacks()` should have been called earlier in the
|
||||
shutdown sequence; otherwise there is a narrow race between
|
||||
the final reader-thread tick and the destruction of
|
||||
Application services that the gauge callbacks read from.
|
||||
*/
|
||||
void
|
||||
stop();
|
||||
|
||||
@@ -354,6 +378,14 @@ private:
|
||||
|
||||
/// Journal for logging.
|
||||
beast::Journal const journal_;
|
||||
|
||||
/// Set by detachCallbacks() during shutdown so every ObservableGauge
|
||||
/// callback returns early before reading Application services that
|
||||
/// may already be stopped. Checked with memory_order_acquire at the
|
||||
/// top of each callback to pair with the memory_order_release store
|
||||
/// in detachCallbacks().
|
||||
std::atomic<bool> callbacksDetached_{false};
|
||||
|
||||
/// The SDK MeterProvider that owns the export pipeline.
|
||||
std::shared_ptr<opentelemetry::sdk::metrics::MeterProvider> provider_;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user