diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt index 8a7e25b14..91e9bfc6a 100644 --- a/src/cluster/CMakeLists.txt +++ b/src/cluster/CMakeLists.txt @@ -9,6 +9,7 @@ target_sources( ClusterCommunicationService.cpp Metrics.cpp WriterDecider.cpp + impl/FallbackRecoveryTimer.cpp ) target_link_libraries(clio_cluster PRIVATE clio_util clio_data) diff --git a/src/cluster/ClioNode.cpp b/src/cluster/ClioNode.cpp index 2c0084bc7..2cdee98b5 100644 --- a/src/cluster/ClioNode.cpp +++ b/src/cluster/ClioNode.cpp @@ -66,6 +66,10 @@ ClioNode::from( return ClioNode::DbRole::Fallback; } + if (writerState.isFallbackRecovery()) { + return ClioNode::DbRole::FallbackRecovery; + } + return writerState.isWriting() ? ClioNode::DbRole::Writer : ClioNode::DbRole::NotWriter; }(); return ClioNode{ @@ -105,7 +109,7 @@ tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) auto dbRole = ClioNode::DbRole::Fallback; if (auto const* v = obj.if_contains(JsonFields::kDB_ROLE)) { auto const dbRoleValue = v->as_int64(); - if (dbRoleValue > static_cast(ClioNode::DbRole::MAX)) + if (dbRoleValue > static_cast(ClioNode::DbRole::Max)) throw std::runtime_error("Invalid db_role value"); dbRole = static_cast(dbRoleValue); } diff --git a/src/cluster/ClioNode.hpp b/src/cluster/ClioNode.hpp index faafd9dba..2659e626f 100644 --- a/src/cluster/ClioNode.hpp +++ b/src/cluster/ClioNode.hpp @@ -44,16 +44,26 @@ struct ClioNode { * @brief Database role of a node in the cluster. * * Roles are used to coordinate which node writes to the database: - * - ReadOnly: Node is configured to never write (strict read-only mode) - * - NotWriter: Node can write but is currently not the designated writer - * - Writer: Node is actively writing to the database - * - Fallback: Node is using the fallback writer decision mechanism - * - * When any node in the cluster is in Fallback mode, the entire cluster switches - * from the cluster communication mechanism to the slower but more reliable - * database-based conflict detection mechanism. + * - ReadOnly: Node is configured to never write (strict read-only mode). + * - NotWriter: Node can write but is currently not the designated writer. + * - Writer: Node is actively writing to the database. + * - Fallback: Node is using the fallback writer decision mechanism (slower but + * reliable database-based write-conflict detection). When any non-ReadOnly node + * in the cluster is in this role, the entire cluster switches to fallback mode. + * - FallbackRecovery: Node has been in Fallback long enough to attempt returning to + * election-based writer selection. The node continues participating in the + * fallback write-race while coordinating with peers. Once all non-ReadOnly nodes + * reach this role (or have already returned to election mode), the cluster exits + * fallback and performs a normal election. */ - enum class DbRole { ReadOnly = 0, NotWriter = 1, Writer = 2, Fallback = 3, MAX = 3 }; + enum class DbRole { + ReadOnly = 0, + NotWriter = 1, + Writer = 2, + Fallback = 3, + FallbackRecovery = 4, + Max = 4 + }; using Uuid = std::shared_ptr; using CUuid = std::shared_ptr; diff --git a/src/cluster/WriterDecider.cpp b/src/cluster/WriterDecider.cpp index 64c5e8531..0be2d9bb4 100644 --- a/src/cluster/WriterDecider.cpp +++ b/src/cluster/WriterDecider.cpp @@ -21,24 +21,48 @@ #include "cluster/Backend.hpp" #include "cluster/ClioNode.hpp" +#include "cluster/impl/FallbackRecoveryTimer.hpp" #include "etl/WriterState.hpp" #include "util/Assert.hpp" #include "util/Spawn.hpp" +#include #include #include +#include #include #include #include namespace cluster { -WriterDecider::WriterDecider( - boost::asio::thread_pool& ctx, +namespace { + +void +startFallbackRecoveryTimer( + impl::FallbackRecoveryTimer& fallbackRecoveryTimer, std::unique_ptr writerState ) - : ctx_(ctx), writerState_(std::move(writerState)) +{ + fallbackRecoveryTimer.start( // + [ws = std::move(writerState)](boost::system::error_code ec) mutable { + if (ec == boost::asio::error::operation_aborted) + return; + ASSERT(!ec, "Unexpected error {}: {}", ec.value(), ec.to_string()); + ws->setFallbackRecovery(true); + } + ); +} + +} // namespace + +WriterDecider::WriterDecider( + boost::asio::thread_pool& ctx, + std::unique_ptr writerState, + std::chrono::steady_clock::duration recoveryTime +) + : ctx_(ctx), writerState_(std::move(writerState)), fallbackRecoveryTimer_(ctx, recoveryTime) { } @@ -55,27 +79,53 @@ WriterDecider::onNewState( ctx_, [writerState = writerState_->clone(), selfId = std::move(selfId), + fallbackRecoveryTimer = fallbackRecoveryTimer_, clusterData = clusterData->value()](auto&&) mutable { auto const selfData = std::ranges::find_if( clusterData, [&selfId](ClioNode const& node) { return node.uuid == selfId; } ); ASSERT(selfData != clusterData.end(), "Self data should always be in the cluster data"); - if (selfData->dbRole == ClioNode::DbRole::Fallback) { - return; - } - if (selfData->dbRole == ClioNode::DbRole::ReadOnly) { writerState->giveUpWriting(); return; } + if (selfData->dbRole == ClioNode::DbRole::Fallback) { + auto const clusterInFallbackRecoveryState = + std::ranges::any_of(clusterData, [](ClioNode const& node) { + return node.dbRole == ClioNode::DbRole::FallbackRecovery; + }); + if (clusterInFallbackRecoveryState) { + writerState->setFallbackRecovery(true); + fallbackRecoveryTimer.cancel(); + } else if (not fallbackRecoveryTimer.isRunning()) { + startFallbackRecoveryTimer(fallbackRecoveryTimer, std::move(writerState)); + } + return; + } + + if (selfData->dbRole == ClioNode::DbRole::FallbackRecovery) { + auto const clusterIsReadyToRecover = + not std::ranges::any_of(clusterData, [](ClioNode const& node) { + return node.dbRole == ClioNode::DbRole::Fallback; + }); + if (clusterIsReadyToRecover) { + writerState->giveUpWriting(); + writerState->setFallbackRecovery(false); + } + return; + } + // If any node in the cluster is in Fallback mode, the entire cluster must switch // to the fallback writer decision mechanism for consistency - if (std::ranges::any_of(clusterData, [](ClioNode const& node) { + auto const clusterInFallbackState = + std::ranges::any_of(clusterData, [](ClioNode const& node) { return node.dbRole == ClioNode::DbRole::Fallback; - })) { + }); + if (clusterInFallbackState) { writerState->setWriterDecidingFallback(); + startFallbackRecoveryTimer(fallbackRecoveryTimer, std::move(writerState)); return; } diff --git a/src/cluster/WriterDecider.hpp b/src/cluster/WriterDecider.hpp index 4c1bfa4ce..fb5fc65bf 100644 --- a/src/cluster/WriterDecider.hpp +++ b/src/cluster/WriterDecider.hpp @@ -21,10 +21,12 @@ #include "cluster/Backend.hpp" #include "cluster/ClioNode.hpp" +#include "cluster/impl/FallbackRecoveryTimer.hpp" #include "etl/WriterState.hpp" #include +#include #include namespace cluster { @@ -33,44 +35,112 @@ namespace cluster { * @brief Decides which node in the cluster should be the writer based on cluster state. * * This class monitors cluster state changes and determines whether the current node - * should act as the writer to the database. The decision is made by: - * 1. Sorting all nodes by UUID for deterministic ordering - * 2. Selecting the first node that is allowed to write (not ReadOnly) - * 3. Activating writing on this node if it's the current node, otherwise deactivating + * should act as the writer to the database. * - * This ensures only one node in the cluster actively writes to the database at a time. + * ## Election (normal operation) + * + * All non-ReadOnly nodes are sorted by UUID. The first node with @c etlStarted and + * @c cacheIsFull is elected writer. If no fully-ready node exists, the first node + * with @c etlStarted is chosen. All others give up writing. + * + * ## Fallback mode + * + * Fallback is the slower but more reliable mechanism based on database write-conflict + * detection (a node waits ~10 s of DB silence before writing). The cluster enters + * fallback whenever any non-ReadOnly node publishes @c DbRole::Fallback — for example + * during a rolling upgrade when an old node without cluster-coordination support is + * present. + * + * ## Fallback recovery + * + * To avoid the cluster staying in fallback indefinitely, a recovery timer is started + * when this node enters fallback. After the timer fires the node enters + * @c DbRole::FallbackRecovery and coordinates with peers to return to election mode. + * If any peer is already in @c FallbackRecovery, the node joins immediately (contagion + * rule), cancelling its own pending timer. + * + * ## State machine for `onNewState` + * + * @code + * + * sees any Fallback node + * [election mode] ──────────────────────────────► [Fallback] + * (NotWriter / │ + * Writer) recovery timer fires + * ▲ (1 hour) + * │ OR sees FallbackRecovery + * │ node (contagion rule) + * │ │ + * │ ▼ + * │ no Fallback nodes visible [FallbackRecovery] + * └───────────────────────────────────────────────── + * + * @endcode + * + * Nodes in FallbackRecovery continue the fallback write-race so there is no write + * availability gap during the coordination phase. */ class WriterDecider { +public: + static constexpr std::chrono::steady_clock::duration kRECOVERY_TIME = std::chrono::hours{1}; + +private: /** @brief Thread pool for spawning asynchronous tasks */ boost::asio::thread_pool& ctx_; /** @brief Interface for controlling the writer state of this node */ std::unique_ptr writerState_; + /** + * @brief Timer that fires after a delay to initiate fallback recovery. + * + * Started when this node enters @c DbRole::Fallback (either via election-mode + * transition or via an externally triggered fallback). Cancelled when the node + * transitions to @c DbRole::FallbackRecovery (timer fired or contagion rule). + * Copied into spawned task closures by value — all copies share the same + * underlying mutex-protected state. + */ + impl::FallbackRecoveryTimer fallbackRecoveryTimer_; + public: /** * @brief Constructs a WriterDecider. * - * @param ctx Thread pool for executing asynchronous operations - * @param writerState Writer state interface for controlling write operations + * @param ctx Thread pool for executing asynchronous operations + * @param writerState Writer state interface for controlling write operations + * @param recoveryTime How long to wait in Fallback before attempting recovery + * (defaults to `kRECOVERY_TIME`; pass a short duration in tests) */ WriterDecider( boost::asio::thread_pool& ctx, - std::unique_ptr writerState + std::unique_ptr writerState, + std::chrono::steady_clock::duration recoveryTime = kRECOVERY_TIME ); /** * @brief Handles cluster state changes and decides whether this node should be the writer. * - * This method is called when cluster state changes. It asynchronously: - * - Sorts all nodes by UUID to establish a deterministic order - * - Identifies the first node allowed to write (not ReadOnly) - * - Activates writing if this node is selected, otherwise deactivates writing - * - Logs a warning if no nodes in the cluster are allowed to write + * Spawns an asynchronous task that applies the state machine described in the class + * documentation. Decisions are based on the @p clusterData snapshot: + * + * - If @p clusterData has no value (communication failure), no action is taken. + * - If self is @c ReadOnly, writing is given up unconditionally. + * - If self is @c Fallback and a @c FallbackRecovery node is visible, the contagion + * rule applies: this node also enters @c FallbackRecovery and the recovery timer + * is cancelled. + * - If self is @c Fallback and the recovery timer is not running, it is started + * (handles the case where fallback was triggered externally, e.g. by Monitor). + * - If self is @c FallbackRecovery and no @c Fallback nodes are visible, the + * recovery coordination is complete: writing is given up and the fallback recovery + * flag is cleared so the node enters election mode on the next cycle. + * - If self is in election mode and any @c Fallback node is visible, this node + * switches to @c Fallback and the recovery timer is started. + * - Otherwise, election proceeds: nodes are sorted by UUID and the first fully-ready + * (@c etlStarted && @c cacheIsFull) non-ReadOnly node is elected writer. * * @param selfId The UUID of the current node - * @param clusterData Shared pointer to current cluster data; may be empty if communication - * failed + * @param clusterData Shared pointer to current cluster data; may be empty if + * communication failed */ void onNewState(ClioNode::CUuid selfId, std::shared_ptr clusterData); diff --git a/src/cluster/impl/FallbackRecoveryTimer.cpp b/src/cluster/impl/FallbackRecoveryTimer.cpp new file mode 100644 index 000000000..ed45ff3a3 --- /dev/null +++ b/src/cluster/impl/FallbackRecoveryTimer.cpp @@ -0,0 +1,53 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "cluster/impl/FallbackRecoveryTimer.hpp" + +#include "util/Mutex.hpp" + +#include + +#include +#include + +namespace cluster::impl { + +FallbackRecoveryTimer::FallbackRecoveryTimer( + boost::asio::thread_pool& ctx, + std::chrono::steady_clock::duration recoveryTime +) + : impl_(std::make_shared>(Impl{ctx.get_executor(), recoveryTime})) +{ +} + +bool +FallbackRecoveryTimer::isRunning() const +{ + return impl_->lock()->isRunning; +} + +void +FallbackRecoveryTimer::cancel() +{ + auto locked = impl_->lock(); + locked->isRunning = false; + locked->timer.cancel(); +} + +} // namespace cluster::impl diff --git a/src/cluster/impl/FallbackRecoveryTimer.hpp b/src/cluster/impl/FallbackRecoveryTimer.hpp new file mode 100644 index 000000000..7871d08f3 --- /dev/null +++ b/src/cluster/impl/FallbackRecoveryTimer.hpp @@ -0,0 +1,110 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "util/Mutex.hpp" + +#include +#include +#include +#include + +#include +#include + +namespace cluster::impl { + +/** + * @brief One-shot timer for fallback recovery, with internal thread-safety. + * + * The timer state — including the @c boost::asio::steady_timer, the running flag, + * and the recovery duration — is held inside a mutex-protected @c Impl struct + * referenced via @c shared_ptr. This makes @c FallbackRecoveryTimer cheap to + * copy: all copies share the same underlying state. All public methods are + * thread-safe; callers do not need an external @c util::Mutex. + */ +class FallbackRecoveryTimer { + struct Impl { + boost::asio::steady_timer timer; + bool isRunning = false; + std::chrono::steady_clock::duration recoveryTime; + + Impl( + boost::asio::any_io_executor const& executor, + std::chrono::steady_clock::duration recoveryTime + ) + : timer(executor), recoveryTime(recoveryTime) + { + } + }; + + std::shared_ptr> impl_; + +public: + /** + * @brief Construct a timer bound to the given thread pool. + * + * @param ctx Thread pool whose executor the timer is posted to. + * @param recoveryTime Duration to wait before the timer fires. + */ + FallbackRecoveryTimer( + boost::asio::thread_pool& ctx, + std::chrono::steady_clock::duration recoveryTime + ); + + /** + * @brief Returns @c true if an @c async_wait is currently pending. + */ + [[nodiscard]] bool + isRunning() const; + + /** + * @brief Arm the timer and schedule @p callback for when it fires. + * + * Sets the running flag, calls @c expires_after, and posts @c async_wait. The + * internal async_wait handler clears the running flag before forwarding the + * @c error_code to @p callback. If the timer is cancelled, @p callback receives + * @c boost::asio::error::operation_aborted — callers must handle this. + * + * @param callback Invoked with the error code when the timer fires or is cancelled. + */ + template + void + start(Callback&& callback) + { + auto locked = impl_->lock(); + locked->isRunning = true; + locked->timer.expires_after(locked->recoveryTime); + locked->timer.async_wait([impl = impl_, cb = std::forward(callback)]( + boost::system::error_code ec + ) mutable { + impl->lock()->isRunning = false; + cb(ec); + }); + } + + /** + * @brief Cancel any pending @c async_wait and clear the running flag. + */ + void + cancel(); +}; + +} // namespace cluster::impl diff --git a/src/etl/WriterState.cpp b/src/etl/WriterState.cpp index 28162a5a6..daf6f5b94 100644 --- a/src/etl/WriterState.cpp +++ b/src/etl/WriterState.cpp @@ -69,6 +69,7 @@ void WriterState::setWriterDecidingFallback() { systemState_->isWriterDecidingFallback = true; + isFallbackRecovery_ = false; } bool @@ -77,6 +78,21 @@ WriterState::isFallback() const return systemState_->isWriterDecidingFallback; } +bool +WriterState::isFallbackRecovery() const +{ + return isFallbackRecovery_; +} + +void +WriterState::setFallbackRecovery(bool newValue) +{ + if (newValue) { + systemState_->isWriterDecidingFallback = false; + } + isFallbackRecovery_ = newValue; +} + bool WriterState::isEtlStarted() const { diff --git a/src/etl/WriterState.hpp b/src/etl/WriterState.hpp index 7ee0d47bf..259afb34d 100644 --- a/src/etl/WriterState.hpp +++ b/src/etl/WriterState.hpp @@ -21,6 +21,9 @@ #include "data/LedgerCacheInterface.hpp" #include "etl/SystemState.hpp" +#include "util/prometheus/Bool.hpp" +#include "util/prometheus/Label.hpp" +#include "util/prometheus/Prometheus.hpp" #include #include @@ -78,6 +81,36 @@ public: [[nodiscard]] virtual bool isFallback() const = 0; + /** + * @brief Check if this node is in fallback recovery mode. + * + * Fallback recovery is an intermediate state entered when the node has been in + * fallback mode long enough to attempt returning to election-based writer selection. + * In this state the node continues participating in the fallback write-race while + * coordinating with other nodes to exit fallback together. + * + * @return true if the node is in fallback recovery mode, false otherwise + */ + [[nodiscard]] virtual bool + isFallbackRecovery() const = 0; + + /** + * @brief Set or clear the fallback recovery flag. + * + * When @p newValue is true, the node enters fallback recovery mode: + * - @ref isFallbackRecovery returns true + * - The plain fallback flag (@ref isFallback) is cleared so the node no longer + * publishes @c DbRole::Fallback; it publishes @c DbRole::FallbackRecovery instead. + * + * When @p newValue is false, the recovery flag is cleared without touching the + * plain fallback flag. This is used when the recovery coordination completes and + * the node transitions back to election mode. + * + * @param newValue true to enter recovery mode, false to leave it + */ + virtual void + setFallbackRecovery(bool newValue) = 0; + /** * @brief Switch the cluster to the fallback writer decision mechanism. * @@ -85,6 +118,9 @@ public: * communication mechanism to the slower but more reliable fallback mechanism. * Once set, this flag propagates to all nodes in the cluster through the * ClioNode DbRole::Fallback state. + * + * Also clears the fallback recovery flag (@ref isFallbackRecovery) because entering + * a fresh fallback period cancels any in-progress recovery attempt. */ virtual void setWriterDecidingFallback() = 0; @@ -131,6 +167,20 @@ private: systemState_; /**< @brief Shared system state for ETL coordination */ std::reference_wrapper cache_; + /** + * @brief Prometheus metric tracking whether this node is in fallback recovery mode. + * + * @note Because @c prometheus::Bool holds a @c std::reference_wrapper to the underlying + * gauge, copies of @c WriterState (including clones) share the same metric value. + * Mutations made through a clone are therefore immediately visible on the original + * instance and vice-versa. + */ + util::prometheus::Bool isFallbackRecovery_ = PrometheusService::boolMetric( + "etl_writing_deciding_fallback_recovery", + util::prometheus::Labels{}, + "Whether clio is in recovery from the fallback writer decision mechanism" + ); + public: /** * @brief Construct a WriterState with the given system state and cache. @@ -185,6 +235,14 @@ public: bool isFallback() const override; + /** @copydoc WriterStateInterface::isFallbackRecovery */ + bool + isFallbackRecovery() const override; + + /** @copydoc WriterStateInterface::setFallbackRecovery */ + void + setFallbackRecovery(bool newValue) override; + /** @copydoc WriterStateInterface::isEtlStarted */ bool isEtlStarted() const override; diff --git a/tests/common/util/MockWriterState.hpp b/tests/common/util/MockWriterState.hpp index 442273918..6794ad599 100644 --- a/tests/common/util/MockWriterState.hpp +++ b/tests/common/util/MockWriterState.hpp @@ -32,6 +32,8 @@ struct MockWriterStateBase : public etl::WriterStateInterface { MOCK_METHOD(void, giveUpWriting, (), (override)); MOCK_METHOD(void, setWriterDecidingFallback, (), (override)); MOCK_METHOD(bool, isFallback, (), (const, override)); + MOCK_METHOD(bool, isFallbackRecovery, (), (const, override)); + MOCK_METHOD(void, setFallbackRecovery, (bool), (override)); MOCK_METHOD(bool, isEtlStarted, (), (const, override)); MOCK_METHOD(bool, isCacheFull, (), (const, override)); MOCK_METHOD(std::unique_ptr, clone, (), (const, override)); diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 06a775914..df88b91b9 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -27,6 +27,7 @@ target_sources( cluster/CacheLoaderDeciderTests.cpp cluster/ClioNodeTests.cpp cluster/ClusterCommunicationServiceTests.cpp + cluster/FallbackRecoveryTimerTests.cpp cluster/MetricsTests.cpp cluster/RepeatedTaskTests.cpp cluster/WriterDeciderTests.cpp diff --git a/tests/unit/cluster/BackendTests.cpp b/tests/unit/cluster/BackendTests.cpp index fb6d1b797..43415a0c0 100644 --- a/tests/unit/cluster/BackendTests.cpp +++ b/tests/unit/cluster/BackendTests.cpp @@ -234,6 +234,9 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsDataWithOtherNodes) EXPECT_CALL(writerStateRef, isFallback) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isFallbackRecovery) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); @@ -463,6 +466,9 @@ TEST_F(ClusterBackendTest, WriteNodeMessageWritesSelfDataWithRecentTimestampAndD EXPECT_CALL(writerStateRef, isFallback) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isFallbackRecovery) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); diff --git a/tests/unit/cluster/ClioNodeTests.cpp b/tests/unit/cluster/ClioNodeTests.cpp index c043de1d1..eb9aa0ca2 100644 --- a/tests/unit/cluster/ClioNodeTests.cpp +++ b/tests/unit/cluster/ClioNodeTests.cpp @@ -217,7 +217,11 @@ INSTANTIATE_TEST_SUITE_P( ClioNodeDbRoleTestBundle{.testName = "ReadOnly", .role = ClioNode::DbRole::ReadOnly}, ClioNodeDbRoleTestBundle{.testName = "NotWriter", .role = ClioNode::DbRole::NotWriter}, ClioNodeDbRoleTestBundle{.testName = "Writer", .role = ClioNode::DbRole::Writer}, - ClioNodeDbRoleTestBundle{.testName = "Fallback", .role = ClioNode::DbRole::Fallback} + ClioNodeDbRoleTestBundle{.testName = "Fallback", .role = ClioNode::DbRole::Fallback}, + ClioNodeDbRoleTestBundle{ + .testName = "FallbackRecovery", + .role = ClioNode::DbRole::FallbackRecovery + } ), tests::util::kNAME_GENERATOR ); @@ -255,6 +259,7 @@ struct ClioNodeFromTestBundle { std::string testName; bool readOnly; bool fallback; + bool fallbackRecovery; bool writing; bool etlStarted; bool cacheIsFull; @@ -278,6 +283,7 @@ INSTANTIATE_TEST_SUITE_P( .testName = "ReadOnly", .readOnly = true, .fallback = false, + .fallbackRecovery = false, .writing = false, .etlStarted = false, .cacheIsFull = false, @@ -288,6 +294,7 @@ INSTANTIATE_TEST_SUITE_P( .testName = "Fallback", .readOnly = false, .fallback = true, + .fallbackRecovery = false, .writing = false, .etlStarted = false, .cacheIsFull = false, @@ -298,6 +305,7 @@ INSTANTIATE_TEST_SUITE_P( .testName = "NotWriter", .readOnly = false, .fallback = false, + .fallbackRecovery = false, .writing = false, .etlStarted = true, .cacheIsFull = false, @@ -308,11 +316,23 @@ INSTANTIATE_TEST_SUITE_P( .testName = "Writer", .readOnly = false, .fallback = false, + .fallbackRecovery = false, .writing = true, .etlStarted = true, .cacheIsFull = true, .cacheIsCurrentlyLoading = true, .expectedRole = ClioNode::DbRole::Writer + }, + ClioNodeFromTestBundle{ + .testName = "FallbackRecovery", + .readOnly = false, + .fallback = false, + .fallbackRecovery = true, + .writing = false, + .etlStarted = false, + .cacheIsFull = false, + .cacheIsCurrentlyLoading = false, + .expectedRole = ClioNode::DbRole::FallbackRecovery } ), tests::util::kNAME_GENERATOR @@ -326,7 +346,11 @@ TEST_P(ClioNodeFromTest, FromWriterState) if (not param.readOnly) { EXPECT_CALL(writerState, isFallback()).WillOnce(testing::Return(param.fallback)); if (not param.fallback) { - EXPECT_CALL(writerState, isWriting()).WillOnce(testing::Return(param.writing)); + EXPECT_CALL(writerState, isFallbackRecovery()) + .WillOnce(testing::Return(param.fallbackRecovery)); + if (not param.fallbackRecovery) { + EXPECT_CALL(writerState, isWriting()).WillOnce(testing::Return(param.writing)); + } } } EXPECT_CALL(writerState, isEtlStarted()).WillOnce(testing::Return(param.etlStarted)); diff --git a/tests/unit/cluster/FallbackRecoveryTimerTests.cpp b/tests/unit/cluster/FallbackRecoveryTimerTests.cpp new file mode 100644 index 000000000..d387bfafe --- /dev/null +++ b/tests/unit/cluster/FallbackRecoveryTimerTests.cpp @@ -0,0 +1,143 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "cluster/impl/FallbackRecoveryTimer.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +using namespace cluster::impl; +using namespace testing; + +struct FallbackRecoveryTimerTest : Test { + ~FallbackRecoveryTimerTest() override + { + ctx_.join(); + } + +protected: + boost::asio::thread_pool ctx_{1}; +}; + +TEST_F(FallbackRecoveryTimerTest, NotRunningByDefault) +{ + FallbackRecoveryTimer timer{ctx_, std::chrono::hours{1}}; + EXPECT_FALSE(timer.isRunning()); +} + +TEST_F(FallbackRecoveryTimerTest, IsRunningAfterStart) +{ + FallbackRecoveryTimer timer{ctx_, std::chrono::hours{1}}; + timer.start([](boost::system::error_code) {}); + EXPECT_TRUE(timer.isRunning()); + timer.cancel(); +} + +TEST_F(FallbackRecoveryTimerTest, NotRunningAfterCancel) +{ + FallbackRecoveryTimer timer{ctx_, std::chrono::hours{1}}; + timer.start([](boost::system::error_code) {}); + timer.cancel(); + EXPECT_FALSE(timer.isRunning()); +} + +TEST_F(FallbackRecoveryTimerTest, CallbackFiresWithNoError) +{ + std::binary_semaphore sem{0}; + std::atomic callbackFired{false}; + boost::system::error_code capturedEc{}; + + FallbackRecoveryTimer timer{ctx_, std::chrono::milliseconds{0}}; + timer.start([&](boost::system::error_code ec) { + capturedEc = ec; + callbackFired = true; + sem.release(); + }); + + EXPECT_TRUE(sem.try_acquire_for(std::chrono::seconds{5})); + EXPECT_TRUE(callbackFired); + EXPECT_FALSE(capturedEc); +} + +TEST_F(FallbackRecoveryTimerTest, NotRunningAfterCallbackFires) +{ + std::binary_semaphore sem{0}; + + FallbackRecoveryTimer timer{ctx_, std::chrono::milliseconds{0}}; + timer.start([&](boost::system::error_code) { sem.release(); }); + + EXPECT_TRUE(sem.try_acquire_for(std::chrono::seconds{5})); + // Give the callback a moment to clear isRunning_ + ctx_.join(); + EXPECT_FALSE(timer.isRunning()); +} + +TEST_F(FallbackRecoveryTimerTest, CallbackReceivesOperationAbortedOnCancel) +{ + std::binary_semaphore sem{0}; + boost::system::error_code capturedEc{}; + + FallbackRecoveryTimer timer{ctx_, std::chrono::hours{1}}; + timer.start([&](boost::system::error_code ec) { + capturedEc = ec; + sem.release(); + }); + + timer.cancel(); + + EXPECT_TRUE(sem.try_acquire_for(std::chrono::seconds{5})); + EXPECT_EQ(capturedEc, boost::asio::error::operation_aborted); +} + +TEST_F(FallbackRecoveryTimerTest, CancelOnNonRunningTimerIsNoOp) +{ + FallbackRecoveryTimer timer{ctx_, std::chrono::hours{1}}; + EXPECT_FALSE(timer.isRunning()); + EXPECT_NO_FATAL_FAILURE({ timer.cancel(); }); + EXPECT_FALSE(timer.isRunning()); +} + +TEST_F(FallbackRecoveryTimerTest, SharedPtrUsageWorks) +{ + std::binary_semaphore sem{0}; + std::atomic callbackFired{false}; + + auto sharedTimer = std::make_shared(ctx_, std::chrono::milliseconds{0}); + + EXPECT_FALSE(sharedTimer->isRunning()); + + sharedTimer->start([&](boost::system::error_code ec) { + if (ec == boost::asio::error::operation_aborted) + return; + callbackFired = true; + sem.release(); + }); + + EXPECT_TRUE(sharedTimer->isRunning()); + EXPECT_TRUE(sem.try_acquire_for(std::chrono::seconds{5})); + EXPECT_TRUE(callbackFired); +} diff --git a/tests/unit/cluster/WriterDeciderTests.cpp b/tests/unit/cluster/WriterDeciderTests.cpp index 007c27815..aeae42a39 100644 --- a/tests/unit/cluster/WriterDeciderTests.cpp +++ b/tests/unit/cluster/WriterDeciderTests.cpp @@ -37,7 +37,16 @@ using namespace cluster; -enum class ExpectedAction { StartWriting, GiveUpWriting, NoAction, SetFallback }; +namespace { + +enum class ExpectedAction { + StartWriting, + GiveUpWriting, + NoAction, + SetFallback, + SetFallbackRecoveryTrue, // contagion: clone receives setFallbackRecovery(true) + GiveUpAndClearFallbackRecovery // recovery complete: giveUpWriting + setFallbackRecovery(false) +}; struct NodeParams { uint8_t uuidValue; @@ -55,6 +64,8 @@ struct WriterDeciderTestParams { bool useEmptyClusterData = false; }; +} // namespace + struct WriterDeciderTest : testing::TestWithParam { ~WriterDeciderTest() override { @@ -99,7 +110,7 @@ TEST_P(WriterDeciderTest, WriterSelection) auto const selfUuid = makeUuid(params.selfUuidValue); - WriterDecider decider{ctx, std::move(writerState)}; + WriterDecider decider{ctx, std::move(writerState), std::chrono::milliseconds{0}}; auto clonedState = std::make_unique(); @@ -117,6 +128,18 @@ TEST_P(WriterDeciderTest, WriterSelection) break; case ExpectedAction::SetFallback: EXPECT_CALL(*clonedState, setWriterDecidingFallback()); + EXPECT_CALL(*clonedState, setFallbackRecovery(true)); + EXPECT_CALL(writerStateRef, clone()) + .WillOnce(testing::Return(testing::ByMove(std::move(clonedState)))); + break; + case ExpectedAction::SetFallbackRecoveryTrue: + EXPECT_CALL(*clonedState, setFallbackRecovery(true)); + EXPECT_CALL(writerStateRef, clone()) + .WillOnce(testing::Return(testing::ByMove(std::move(clonedState)))); + break; + case ExpectedAction::GiveUpAndClearFallbackRecovery: + EXPECT_CALL(*clonedState, giveUpWriting()); + EXPECT_CALL(*clonedState, setFallbackRecovery(false)); EXPECT_CALL(writerStateRef, clone()) .WillOnce(testing::Return(testing::ByMove(std::move(clonedState)))); break; @@ -261,10 +284,10 @@ INSTANTIATE_TEST_SUITE_P( .expectedAction = ExpectedAction::StartWriting }, WriterDeciderTestParams{ - .testName = "SelfIsFallbackNoActionTaken", + .testName = "SelfIsFallbackNoContagionStartsRecoveryTimer", .selfUuidValue = 0x01, .nodes = {{0x01, ClioNode::DbRole::Fallback}, {0x02, ClioNode::DbRole::Writer}}, - .expectedAction = ExpectedAction::NoAction + .expectedAction = ExpectedAction::SetFallbackRecoveryTrue }, WriterDeciderTestParams{ .testName = "OtherNodeIsFallbackSetsFallbackMode", @@ -402,6 +425,67 @@ INSTANTIATE_TEST_SUITE_P( .etlStarted = true, .cacheIsFull = true}}, .expectedAction = ExpectedAction::StartWriting + }, + WriterDeciderTestParams{ + .testName = "SelfIsFallbackOtherIsFallbackRecovery_ContagionApplied", + .selfUuidValue = 0x01, + .nodes = + {{0x01, ClioNode::DbRole::Fallback}, {0x02, ClioNode::DbRole::FallbackRecovery}}, + .expectedAction = ExpectedAction::SetFallbackRecoveryTrue + }, + WriterDeciderTestParams{ + .testName = "SelfIsFallbackAllOthersFallbackRecovery_ContagionApplied", + .selfUuidValue = 0x01, + .nodes = + {{0x01, ClioNode::DbRole::Fallback}, + {0x02, ClioNode::DbRole::FallbackRecovery}, + {0x03, ClioNode::DbRole::FallbackRecovery}}, + .expectedAction = ExpectedAction::SetFallbackRecoveryTrue + }, + WriterDeciderTestParams{ + .testName = "SelfIsFallbackNoFallbackRecoveryInCluster_StartsRecoveryTimer", + .selfUuidValue = 0x01, + .nodes = {{0x01, ClioNode::DbRole::Fallback}, {0x02, ClioNode::DbRole::Fallback}}, + .expectedAction = ExpectedAction::SetFallbackRecoveryTrue + }, + WriterDeciderTestParams{ + .testName = "SelfIsFallbackRecoveryNoFallbackNodes_ExitsRecovery", + .selfUuidValue = 0x01, + .nodes = + {{0x01, ClioNode::DbRole::FallbackRecovery}, + {0x02, ClioNode::DbRole::FallbackRecovery}}, + .expectedAction = ExpectedAction::GiveUpAndClearFallbackRecovery + }, + WriterDeciderTestParams{ + .testName = "SelfIsFallbackRecoverySomePeersStillFallback_Waits", + .selfUuidValue = 0x01, + .nodes = + {{0x01, ClioNode::DbRole::FallbackRecovery}, {0x02, ClioNode::DbRole::Fallback}}, + .expectedAction = ExpectedAction::NoAction + }, + WriterDeciderTestParams{ + .testName = "SelfIsFallbackRecoveryAllPeersElectionMode_ExitsRecovery", + .selfUuidValue = 0x01, + .nodes = + {{0x01, ClioNode::DbRole::FallbackRecovery}, + {0x02, ClioNode::DbRole::NotWriter}, + {0x03, ClioNode::DbRole::Writer}}, + .expectedAction = ExpectedAction::GiveUpAndClearFallbackRecovery + }, + WriterDeciderTestParams{ + .testName = "SelfIsFallbackRecoveryMixedFallbackAndRecovery_Waits", + .selfUuidValue = 0x01, + .nodes = + {{0x01, ClioNode::DbRole::FallbackRecovery}, + {0x02, ClioNode::DbRole::FallbackRecovery}, + {0x03, ClioNode::DbRole::Fallback}}, + .expectedAction = ExpectedAction::NoAction + }, + WriterDeciderTestParams{ + .testName = "ElectionModeSeesOnlyFallbackRecovery_NoFallbackSwitch", + .selfUuidValue = 0x01, + .nodes = {{0x01, ClioNode::DbRole::Writer}, {0x02, ClioNode::DbRole::FallbackRecovery}}, + .expectedAction = ExpectedAction::StartWriting } ), [](testing::TestParamInfo const& info) { return info.param.testName; } diff --git a/tests/unit/etl/WriterStateTests.cpp b/tests/unit/etl/WriterStateTests.cpp index a7555e978..b4c3d199c 100644 --- a/tests/unit/etl/WriterStateTests.cpp +++ b/tests/unit/etl/WriterStateTests.cpp @@ -171,3 +171,72 @@ TEST_F(WriterStateTest, ClonedInstanceSharesSystemState) EXPECT_TRUE(writerState.isFallback()); EXPECT_TRUE(cloned->isFallback()); } + +TEST_F(WriterStateTest, IsFallbackRecoveryReturnsFalseByDefault) +{ + EXPECT_FALSE(writerState.isFallbackRecovery()); +} + +TEST_F(WriterStateTest, SetFallbackRecoveryTrueSetsFlag) +{ + writerState.setFallbackRecovery(true); + EXPECT_TRUE(writerState.isFallbackRecovery()); +} + +TEST_F(WriterStateTest, SetFallbackRecoveryTrueClearsFallbackFlag) +{ + systemState->isWriterDecidingFallback = true; + EXPECT_TRUE(writerState.isFallback()); + + writerState.setFallbackRecovery(true); + + EXPECT_FALSE(writerState.isFallback()); + EXPECT_TRUE(writerState.isFallbackRecovery()); +} + +TEST_F(WriterStateTest, SetFallbackRecoveryFalseClearsFlag) +{ + writerState.setFallbackRecovery(true); + ASSERT_TRUE(writerState.isFallbackRecovery()); + + writerState.setFallbackRecovery(false); + EXPECT_FALSE(writerState.isFallbackRecovery()); +} + +TEST_F(WriterStateTest, SetFallbackRecoveryFalseDoesNotAffectFallbackFlag) +{ + systemState->isWriterDecidingFallback = true; + + writerState.setFallbackRecovery(false); + + EXPECT_TRUE(writerState.isFallback()); +} + +TEST_F(WriterStateTest, SetWriterDecidingFallbackClearsFallbackRecovery) +{ + writerState.setFallbackRecovery(true); + ASSERT_TRUE(writerState.isFallbackRecovery()); + + writerState.setWriterDecidingFallback(); + + EXPECT_FALSE(writerState.isFallbackRecovery()); + EXPECT_TRUE(writerState.isFallback()); +} + +TEST_F(WriterStateTest, ClonedInstanceSharesFallbackRecovery) +{ + // prometheus::Bool holds a reference_wrapper to the underlying gauge, + // so clone and original share the same metric value. + auto cloned = writerState.clone(); + + EXPECT_FALSE(writerState.isFallbackRecovery()); + EXPECT_FALSE(cloned->isFallbackRecovery()); + + systemState->isWriterDecidingFallback = true; // precondition for setFallbackRecovery(true) + cloned->setFallbackRecovery(true); + + EXPECT_TRUE(writerState.isFallbackRecovery()); + EXPECT_TRUE(cloned->isFallbackRecovery()); + // setFallbackRecovery(true) also clears the fallback flag on shared SystemState + EXPECT_FALSE(writerState.isFallback()); +}