From 4c11ebbfc0f34fea71c9a6c8e6c4d471cfb6eac8 Mon Sep 17 00:00:00 2001 From: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com> Date: Tue, 31 Mar 2026 12:35:30 +0100 Subject: [PATCH] feat(telemetry): add ValidationTracker for validation agreement tracking (Task 7.8) Standalone class that tracks whether this validator's validations agree with network consensus, maintaining rolling 1h/24h windows and lifetime totals with a late-repair mechanism for out-of-order arrivals. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/xrpld/telemetry/ValidationTracker.h | 276 ++++++++++++++++++ .../telemetry/detail/ValidationTracker.cpp | 216 ++++++++++++++ 2 files changed, 492 insertions(+) create mode 100644 src/xrpld/telemetry/ValidationTracker.h create mode 100644 src/xrpld/telemetry/detail/ValidationTracker.cpp diff --git a/src/xrpld/telemetry/ValidationTracker.h b/src/xrpld/telemetry/ValidationTracker.h new file mode 100644 index 0000000000..462f309ff4 --- /dev/null +++ b/src/xrpld/telemetry/ValidationTracker.h @@ -0,0 +1,276 @@ +#pragma once + +/** @file ValidationTracker.h + Standalone validation agreement tracker for telemetry. +*/ + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace xrpl { +namespace telemetry { + +/** + * Tracks whether this validator's validations agree with network consensus, + * maintaining rolling 1-hour and 24-hour windows plus lifetime totals. + * + * The tracker operates by recording two independent events per ledger: + * 1. "We validated" -- our node published a validation for a ledger hash. + * 2. "Network validated" -- the network reached consensus on a ledger hash. + * + * After a configurable grace period (kGracePeriod), the reconcile() method + * compares the two flags. If both are set the ledger is counted as an + * "agreement"; otherwise it is a "miss". A late-repair mechanism allows a + * miss to be upgraded to an agreement if matching evidence arrives within + * kLateRepairWindow. + * + * Architecture / dependency diagram: + * @code + * +--------------------------+ + * | ConsensusAdapter / | + * | ValidatorSite | + * | (callers) | + * +---+-------------+-------+ + * | | + * | recordOur | recordNetwork + * | Validation | Validation + * v v + * +---------------------------+ + * | ValidationTracker | + * |---------------------------| + * | pending_ (hash_map) |----> LedgerEvent per hash + * | window1h_ (deque) |----> WindowEvent sliding window + * | window24h_ (deque) |----> WindowEvent sliding window + * | atomic totals | + * +---------------------------+ + * | + * | reconcile() called periodically + * v + * agreement / miss counters updated + * @endcode + * + * Usage -- basic recording and querying: + * @code + * xrpl::telemetry::ValidationTracker tracker; + * + * // On local validation: + * tracker.recordOurValidation(ledgerHash, seq); + * + * // On network consensus: + * tracker.recordNetworkValidation(ledgerHash, seq); + * + * // Periodically (e.g. every few seconds): + * tracker.reconcile(); + * + * // Query agreement percentage: + * double pct = tracker.agreementPct1h(); + * @endcode + * + * Usage -- edge case with late arrival: + * @code + * xrpl::telemetry::ValidationTracker tracker; + * + * // Network validates first, our validation arrives late: + * tracker.recordNetworkValidation(hash, seq); + * tracker.reconcile(); // initially counted as a miss + * + * // Late local validation arrives within repair window: + * tracker.recordOurValidation(hash, seq); + * tracker.reconcile(); // repaired to agreement + * @endcode + * + * @note Thread-safety: all public methods are thread-safe. The pending_ + * map and sliding-window deques are protected by mutex_. Lifetime totals + * use std::atomic for lock-free reads. + */ +class ValidationTracker +{ +public: + /// Monotonic clock used for all internal timestamps. + using Clock = std::chrono::steady_clock; + + /// Time point type from the monotonic clock. + using TimePoint = Clock::time_point; + + /** + * Record that this node sent a validation for the given ledger. + * @param ledgerHash Hash of the ledger we validated. + * @param seq Ledger sequence number. + */ + void + recordOurValidation(uint256 const& ledgerHash, LedgerIndex seq); + + /** + * Record that the network reached consensus on the given ledger. + * @param ledgerHash Hash of the network-validated ledger. + * @param seq Ledger sequence number. + */ + void + recordNetworkValidation(uint256 const& ledgerHash, LedgerIndex seq); + + /** + * Reconcile pending ledger events whose grace period has elapsed. + * Should be called periodically (e.g. every few seconds). Moves + * reconciled events into the sliding windows and updates totals. + * Also performs late-repair and eviction of stale data. + */ + void + reconcile(); + + /** @name Rolling-window percentage getters */ + /** @{ */ + + /** Agreement percentage over the last 1 hour. + * @return Percentage [0.0, 100.0], or 0.0 if no data. + */ + double + agreementPct1h() const; + + /** Agreement percentage over the last 24 hours. + * @return Percentage [0.0, 100.0], or 0.0 if no data. + */ + double + agreementPct24h() const; + + /** @} */ + + /** @name Rolling-window count getters */ + /** @{ */ + + /** Number of agreements in the 1-hour window. */ + uint64_t + agreements1h() const; + + /** Number of misses in the 1-hour window. */ + uint64_t + missed1h() const; + + /** Number of agreements in the 24-hour window. */ + uint64_t + agreements24h() const; + + /** Number of misses in the 24-hour window. */ + uint64_t + missed24h() const; + + /** @} */ + + /** @name Lifetime totals (atomic, lock-free reads) */ + /** @{ */ + + /** Total agreements since process start. */ + uint64_t + totalAgreements() const; + + /** Total misses since process start. */ + uint64_t + totalMissed() const; + + /** Total validations this node sent. */ + uint64_t + totalValidationsSent() const; + + /** Total network validations observed for comparison. */ + uint64_t + totalValidationsChecked() const; + + /** @} */ + +private: + /** + * Per-ledger tracking state held in the pending map. + */ + struct LedgerEvent + { + uint256 ledgerHash; ///< Ledger hash being tracked. + LedgerIndex seq; ///< Ledger sequence number. + TimePoint recordTime; ///< Time the event was first recorded. + bool weValidated = false; ///< True if we sent a validation. + bool networkValidated = false; ///< True if network reached consensus. + bool reconciled = false; ///< True once grace period elapsed. + bool agreed = false; ///< True if both flags set at reconcile. + }; + + /** + * Lightweight event stored in the sliding-window deques. + */ + struct WindowEvent + { + TimePoint time; ///< When the event was reconciled. + uint256 ledgerHash; ///< Ledger hash for late-repair matching. + bool agreed; ///< Whether this was an agreement. + }; + + /// Grace period before reconciling a ledger event. + static constexpr auto kGracePeriod = std::chrono::seconds(8); + + /// Window during which a missed event can be repaired. + static constexpr auto kLateRepairWindow = std::chrono::minutes(5); + + /// Maximum number of pending (unreconciled + recently reconciled) events. + static constexpr std::size_t kMaxPendingEvents = 1000; + + /// Duration of the short rolling window. + static constexpr auto kWindow1h = std::chrono::hours(1); + + /// Duration of the long rolling window. + static constexpr auto kWindow24h = std::chrono::hours(24); + + /// Protects pending_, window1h_, and window24h_. + mutable std::mutex mutex_; + + /// Pending ledger events indexed by ledger hash. + hash_map pending_; + + /// Sliding window of reconciled events (last 1 hour). + std::deque window1h_; + + /// Sliding window of reconciled events (last 24 hours). + std::deque window24h_; + + /// Lifetime count of agreements. + std::atomic totalAgreements_{0}; + + /// Lifetime count of misses. + std::atomic totalMissed_{0}; + + /// Lifetime count of validations this node sent. + std::atomic totalValidationsSent_{0}; + + /// Lifetime count of network validations observed. + std::atomic totalValidationsChecked_{0}; + + /** + * Remove entries older than their respective window durations. + * @param now Current time point. + */ + void + evictStaleWindows(TimePoint now); + + /** + * Remove reconciled pending entries older than the late-repair window. + * Also trims the map if it exceeds kMaxPendingEvents. + * @param now Current time point. + */ + void + evictOldPending(TimePoint now); + + /** + * Scan a window deque and flip the first non-agreed entry matching + * the given ledger hash to agreed. + * @param window The sliding-window deque to repair. + * @param hash Ledger hash to match. + */ + static void + repairWindowEntry(std::deque& window, uint256 const& hash); +}; + +} // namespace telemetry +} // namespace xrpl diff --git a/src/xrpld/telemetry/detail/ValidationTracker.cpp b/src/xrpld/telemetry/detail/ValidationTracker.cpp new file mode 100644 index 0000000000..cd8133d24c --- /dev/null +++ b/src/xrpld/telemetry/detail/ValidationTracker.cpp @@ -0,0 +1,216 @@ +/** @file ValidationTracker.cpp + Implementation of the ValidationTracker class. +*/ + +#include + +#include + +namespace xrpl { +namespace telemetry { + +void +ValidationTracker::recordOurValidation(uint256 const& ledgerHash, LedgerIndex seq) +{ + std::lock_guard lock(mutex_); + auto& evt = pending_[ledgerHash]; + if (evt.recordTime == TimePoint{}) + { + // First time seeing this ledger hash -- initialize. + evt.ledgerHash = ledgerHash; + evt.seq = seq; + evt.recordTime = Clock::now(); + } + evt.weValidated = true; + totalValidationsSent_.fetch_add(1, std::memory_order_relaxed); +} + +void +ValidationTracker::recordNetworkValidation(uint256 const& ledgerHash, LedgerIndex seq) +{ + std::lock_guard lock(mutex_); + auto& evt = pending_[ledgerHash]; + if (evt.recordTime == TimePoint{}) + { + evt.ledgerHash = ledgerHash; + evt.seq = seq; + evt.recordTime = Clock::now(); + } + evt.networkValidated = true; + totalValidationsChecked_.fetch_add(1, std::memory_order_relaxed); +} + +void +ValidationTracker::reconcile() +{ + std::lock_guard lock(mutex_); + auto const now = Clock::now(); + + for (auto& [hash, evt] : pending_) + { + if (!evt.reconciled && (now - evt.recordTime) > kGracePeriod) + { + // Initial reconciliation after grace period. + evt.reconciled = true; + evt.agreed = evt.weValidated && evt.networkValidated; + + if (evt.agreed) + totalAgreements_.fetch_add(1, std::memory_order_relaxed); + else + totalMissed_.fetch_add(1, std::memory_order_relaxed); + + WindowEvent we{now, evt.ledgerHash, evt.agreed}; + window1h_.push_back(we); + window24h_.push_back(we); + } + else if ( + evt.reconciled && !evt.agreed && evt.weValidated && evt.networkValidated && + (now - evt.recordTime) <= kLateRepairWindow) + { + // Late repair: was a miss, now both flags set. + evt.agreed = true; + totalMissed_.fetch_sub(1, std::memory_order_relaxed); + totalAgreements_.fetch_add(1, std::memory_order_relaxed); + + // Flip the corresponding window entries from miss to agreement. + repairWindowEntry(window1h_, evt.ledgerHash); + repairWindowEntry(window24h_, evt.ledgerHash); + } + } + + evictStaleWindows(now); + evictOldPending(now); +} + +void +ValidationTracker::evictStaleWindows(TimePoint now) +{ + auto const cutoff1h = now - kWindow1h; + while (!window1h_.empty() && window1h_.front().time < cutoff1h) + window1h_.pop_front(); + + auto const cutoff24h = now - kWindow24h; + while (!window24h_.empty() && window24h_.front().time < cutoff24h) + window24h_.pop_front(); +} + +void +ValidationTracker::evictOldPending(TimePoint now) +{ + auto const cutoff = now - kLateRepairWindow; + for (auto it = pending_.begin(); it != pending_.end();) + { + if (it->second.reconciled && it->second.recordTime < cutoff) + it = pending_.erase(it); + else + ++it; + } + + // Hard trim if still over limit -- remove oldest reconciled entries. + if (pending_.size() > kMaxPendingEvents) + { + for (auto it = pending_.begin(); + it != pending_.end() && pending_.size() > kMaxPendingEvents;) + { + if (it->second.reconciled) + it = pending_.erase(it); + else + ++it; + } + } +} + +double +ValidationTracker::agreementPct1h() const +{ + std::lock_guard lock(mutex_); + if (window1h_.empty()) + return 0.0; + auto const agreed = static_cast( + std::count_if(window1h_.begin(), window1h_.end(), [](auto const& e) { return e.agreed; })); + return (agreed / static_cast(window1h_.size())) * 100.0; +} + +double +ValidationTracker::agreementPct24h() const +{ + std::lock_guard lock(mutex_); + if (window24h_.empty()) + return 0.0; + auto const agreed = static_cast(std::count_if( + window24h_.begin(), window24h_.end(), [](auto const& e) { return e.agreed; })); + return (agreed / static_cast(window24h_.size())) * 100.0; +} + +uint64_t +ValidationTracker::agreements1h() const +{ + std::lock_guard lock(mutex_); + return static_cast( + std::count_if(window1h_.begin(), window1h_.end(), [](auto const& e) { return e.agreed; })); +} + +uint64_t +ValidationTracker::missed1h() const +{ + std::lock_guard lock(mutex_); + return static_cast( + std::count_if(window1h_.begin(), window1h_.end(), [](auto const& e) { return !e.agreed; })); +} + +uint64_t +ValidationTracker::agreements24h() const +{ + std::lock_guard lock(mutex_); + return static_cast(std::count_if( + window24h_.begin(), window24h_.end(), [](auto const& e) { return e.agreed; })); +} + +uint64_t +ValidationTracker::missed24h() const +{ + std::lock_guard lock(mutex_); + return static_cast(std::count_if( + window24h_.begin(), window24h_.end(), [](auto const& e) { return !e.agreed; })); +} + +uint64_t +ValidationTracker::totalAgreements() const +{ + return totalAgreements_.load(std::memory_order_relaxed); +} + +uint64_t +ValidationTracker::totalMissed() const +{ + return totalMissed_.load(std::memory_order_relaxed); +} + +uint64_t +ValidationTracker::totalValidationsSent() const +{ + return totalValidationsSent_.load(std::memory_order_relaxed); +} + +uint64_t +ValidationTracker::totalValidationsChecked() const +{ + return totalValidationsChecked_.load(std::memory_order_relaxed); +} + +void +ValidationTracker::repairWindowEntry(std::deque& window, uint256 const& hash) +{ + // Scan backwards since late repairs target recently added entries. + for (auto it = window.rbegin(); it != window.rend(); ++it) + { + if (!it->agreed && it->ledgerHash == hash) + { + it->agreed = true; + return; + } + } +} + +} // namespace telemetry +} // namespace xrpl