diff --git a/docs/config-description.md b/docs/config-description.md index 9c6516474..312eac6b8 100644 --- a/docs/config-description.md +++ b/docs/config-description.md @@ -433,6 +433,14 @@ This document provides a list of all available Clio configuration properties in - **Constraints**: The minimum value is `1`. The maximum value is `65535`. - **Description**: The number of ledger objects to fetch concurrently per marker. +### cache.limit_load_in_cluster + +- **Required**: True +- **Type**: boolean +- **Default value**: `False` +- **Constraints**: None +- **Description**: If enabled only one clio node in a cluster (sharing the same database) will load cache at a time + ### cache.load - **Required**: True diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index 5ef8a5ba4..4e5a3e56f 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -125,10 +125,9 @@ ClioApplication::run(bool const useNgWebServer) auto systemState = etl::SystemState::makeSystemState(config_); - cluster::ClusterCommunicationService clusterCommunicationService{ - backend, std::make_unique(systemState, cache) - }; - clusterCommunicationService.run(); + auto [clusterCommunicationService, cacheLoadingState] = + cluster::ClusterCommunicationService::make(config_, backend, systemState); + clusterCommunicationService->run(); auto const amendmentCenter = std::make_shared(backend); @@ -160,7 +159,14 @@ ClioApplication::run(bool const useNgWebServer) // ETL is responsible for writing and publishing to streams. In read-only mode, ETL only // publishes auto etl = etl::ETLService::makeETLService( - config_, std::move(systemState), ctx, backend, subscriptions, balancer, ledgers + config_, + std::move(systemState), + std::move(cacheLoadingState), + ctx, + backend, + subscriptions, + balancer, + ledgers ); auto workQueue = rpc::WorkQueue::makeWorkQueue(config_); @@ -218,7 +224,7 @@ ClioApplication::run(bool const useNgWebServer) *subscriptions, *backend, cacheSaver, - clusterCommunicationService, + *clusterCommunicationService, ioc ) ); @@ -245,7 +251,7 @@ ClioApplication::run(bool const useNgWebServer) *subscriptions, *backend, cacheSaver, - clusterCommunicationService, + *clusterCommunicationService, ioc ) ); diff --git a/src/cluster/Backend.cpp b/src/cluster/Backend.cpp index 9a44df18d..038eb54c2 100644 --- a/src/cluster/Backend.cpp +++ b/src/cluster/Backend.cpp @@ -21,6 +21,7 @@ #include "cluster/ClioNode.hpp" #include "data/BackendInterface.hpp" +#include "data/LedgerCacheLoadingState.hpp" #include "etl/WriterState.hpp" #include @@ -45,11 +46,13 @@ Backend::Backend( boost::asio::thread_pool& ctx, std::shared_ptr backend, std::unique_ptr writerState, + std::unique_ptr cacheLoadingState, std::chrono::steady_clock::duration readInterval, std::chrono::steady_clock::duration writeInterval ) : backend_(std::move(backend)) , writerState_(std::move(writerState)) + , cacheLoadingState_(std::move(cacheLoadingState)) , readerTask_(readInterval, ctx) , writerTask_(writeInterval, ctx) , selfUuid_(std::make_shared(boost::uuids::random_generator{}())) @@ -120,14 +123,14 @@ Backend::doRead(boost::asio::yield_context yield) *expectedNodeData->uuid = uuid; otherNodesData.push_back(std::move(expectedNodeData).value()); } - otherNodesData.push_back(ClioNode::from(selfUuid_, *writerState_)); + otherNodesData.push_back(ClioNode::from(selfUuid_, *writerState_, *cacheLoadingState_)); return otherNodesData; } void Backend::doWrite() { - auto const selfData = ClioNode::from(selfUuid_, *writerState_); + auto const selfData = ClioNode::from(selfUuid_, *writerState_, *cacheLoadingState_); boost::json::value jsonValue{}; boost::json::value_from(selfData, jsonValue); backend_->writeNodeMessage(*selfData.uuid, boost::json::serialize(jsonValue.as_object())); diff --git a/src/cluster/Backend.hpp b/src/cluster/Backend.hpp index 9dc6196b9..c205c2e1d 100644 --- a/src/cluster/Backend.hpp +++ b/src/cluster/Backend.hpp @@ -22,6 +22,7 @@ #include "cluster/ClioNode.hpp" #include "cluster/impl/RepeatedTask.hpp" #include "data/BackendInterface.hpp" +#include "data/LedgerCacheLoadingState.hpp" #include "etl/WriterState.hpp" #include "util/log/Logger.hpp" @@ -63,6 +64,7 @@ private: std::shared_ptr backend_; std::unique_ptr writerState_; + std::unique_ptr cacheLoadingState_; impl::RepeatedTask readerTask_; impl::RepeatedTask writerTask_; @@ -78,6 +80,7 @@ public: * @param ctx The execution context for asynchronous operations * @param backend Interface to the backend database * @param writerState State indicating whether this node is writing to the database + * @param cacheLoadingState State controlling whether this node is allowed to load the cache * @param readInterval How often to read cluster state from the backend * @param writeInterval How often to write this node's state to the backend */ @@ -85,6 +88,7 @@ public: boost::asio::thread_pool& ctx, std::shared_ptr backend, std::unique_ptr writerState, + std::unique_ptr cacheLoadingState, std::chrono::steady_clock::duration readInterval, std::chrono::steady_clock::duration writeInterval ); diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt index e7fa1ab38..8a7e25b14 100644 --- a/src/cluster/CMakeLists.txt +++ b/src/cluster/CMakeLists.txt @@ -4,6 +4,7 @@ target_sources( clio_cluster PRIVATE Backend.cpp + CacheLoaderDecider.cpp ClioNode.cpp ClusterCommunicationService.cpp Metrics.cpp diff --git a/src/cluster/CacheLoaderDecider.cpp b/src/cluster/CacheLoaderDecider.cpp new file mode 100644 index 000000000..518fc424e --- /dev/null +++ b/src/cluster/CacheLoaderDecider.cpp @@ -0,0 +1,93 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2026, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "cluster/CacheLoaderDecider.hpp" + +#include "cluster/Backend.hpp" +#include "cluster/ClioNode.hpp" +#include "data/LedgerCacheLoadingState.hpp" +#include "util/Assert.hpp" +#include "util/Spawn.hpp" + +#include + +#include +#include +#include +#include +#include + +namespace cluster { + +CacheLoaderDecider::CacheLoaderDecider( + boost::asio::thread_pool& ctx, + std::unique_ptr cacheLoadingState +) + : ctx_(ctx), cacheLoadingState_(std::move(cacheLoadingState)) +{ +} + +void +CacheLoaderDecider::onNewState( + ClioNode::CUuid selfId, + std::shared_ptr clusterData +) +{ + if (not clusterData->has_value()) + return; + + util::spawn( + ctx_, + [cacheLoadingState = cacheLoadingState_->clone(), + selfId = std::move(selfId), + clusterData = clusterData->value()](auto&&) mutable { + auto const selfData = std::ranges::find_if( + clusterData, [&selfId](ClioNode const& node) { return node.uuid == selfId; } + ); + ASSERT(selfData != clusterData.end(), "Self data should always be in the cluster data"); + + if (selfData->cacheIsFull) + return; + + std::vector notFullNodes; + std::ranges::copy_if( + clusterData, std::back_inserter(notFullNodes), [](ClioNode const& node) { + return not node.cacheIsFull; + } + ); + + auto const someNodeIsLoadingCache = std::ranges::any_of( + notFullNodes, [](ClioNode const& node) { return node.cacheIsCurrentlyLoading; } + ); + if (someNodeIsLoadingCache) { + return; + } + + std::ranges::sort(notFullNodes, [](ClioNode const& lhs, ClioNode const& rhs) { + return *lhs.uuid < *rhs.uuid; + }); + + if (*notFullNodes.front().uuid == *selfId) { + cacheLoadingState->allowLoading(); + } + } + ); +} + +} // namespace cluster diff --git a/src/cluster/CacheLoaderDecider.hpp b/src/cluster/CacheLoaderDecider.hpp new file mode 100644 index 000000000..448d1cfad --- /dev/null +++ b/src/cluster/CacheLoaderDecider.hpp @@ -0,0 +1,80 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2026, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "cluster/Backend.hpp" +#include "cluster/ClioNode.hpp" +#include "data/LedgerCacheLoadingState.hpp" + +#include + +#include + +namespace cluster { + +/** + * @brief Decides which node in the cluster should load the ledger cache. + * + * This class monitors cluster state changes and determines whether the current node + * should begin loading the ledger cache from the backend. The decision is made by: + * 1. Doing nothing if this node's cache is already full + * 2. Doing nothing if any node in the cluster is currently loading the cache + * 3. Sorting all nodes whose cache is not yet full by UUID for deterministic ordering + * 4. Permitting loading on this node if it is the first in the sorted list + * + * This ensures at most one node in the cluster loads the cache at a time. + */ +class CacheLoaderDecider { + /** @brief Thread pool for spawning asynchronous tasks */ + boost::asio::thread_pool& ctx_; + + /** @brief Interface for controlling cache loading permission of this node */ + std::unique_ptr cacheLoadingState_; + +public: + /** + * @brief Constructs a CacheLoaderDecider. + * + * @param ctx Thread pool for executing asynchronous operations + * @param cacheLoadingState Cache loading state interface for permitting cache load + */ + CacheLoaderDecider( + boost::asio::thread_pool& ctx, + std::unique_ptr cacheLoadingState + ); + + /** + * @brief Handles cluster state changes and decides whether this node should load the cache. + * + * This method is called when cluster state changes. It asynchronously: + * - Does nothing if this node's cache is already full + * - Does nothing if any node in the cluster is currently loading the cache + * - Sorts all not-yet-full nodes by UUID to establish a deterministic order + * - Permits cache loading on this node if it is first in the sorted list + * + * @param selfId The UUID of the current node + * @param clusterData Shared pointer to current cluster data; may be empty if communication + * failed + */ + void + onNewState(ClioNode::CUuid selfId, std::shared_ptr clusterData); +}; + +} // namespace cluster diff --git a/src/cluster/ClioNode.cpp b/src/cluster/ClioNode.cpp index 3bfd4b12b..2c0084bc7 100644 --- a/src/cluster/ClioNode.cpp +++ b/src/cluster/ClioNode.cpp @@ -19,6 +19,7 @@ #include "cluster/ClioNode.hpp" +#include "data/LedgerCacheLoadingState.hpp" #include "etl/WriterState.hpp" #include "util/TimeUtils.hpp" @@ -44,12 +45,18 @@ struct JsonFields { static constexpr std::string_view const kDB_ROLE = "db_role"; static constexpr std::string_view const kETL_STARTED = "etl_started"; static constexpr std::string_view const kCACHE_IS_FULL = "cache_is_full"; + static constexpr std::string_view const kCACHE_IS_CURRENTLY_LOADING = + "cache_is_currently_loading"; }; } // namespace ClioNode -ClioNode::from(ClioNode::Uuid uuid, etl::WriterStateInterface const& writerState) +ClioNode::from( + ClioNode::Uuid uuid, + etl::WriterStateInterface const& writerState, + data::LedgerCacheLoadingStateInterface const& cacheLoadingState +) { auto const dbRole = [&writerState]() { if (writerState.isReadOnly()) { @@ -66,7 +73,8 @@ ClioNode::from(ClioNode::Uuid uuid, etl::WriterStateInterface const& writerState .updateTime = std::chrono::system_clock::now(), .dbRole = dbRole, .etlStarted = writerState.isEtlStarted(), - .cacheIsFull = writerState.isCacheFull() + .cacheIsFull = writerState.isCacheFull(), + .cacheIsCurrentlyLoading = cacheLoadingState.isCurrentlyLoading() }; } @@ -77,35 +85,49 @@ tag_invoke(boost::json::value_from_tag, boost::json::value& jv, ClioNode const& {JsonFields::kUPDATE_TIME, util::systemTpToUtcStr(node.updateTime, ClioNode::kTIME_FORMAT)}, {JsonFields::kDB_ROLE, static_cast(node.dbRole)}, {JsonFields::kETL_STARTED, node.etlStarted}, - {JsonFields::kCACHE_IS_FULL, node.cacheIsFull} + {JsonFields::kCACHE_IS_FULL, node.cacheIsFull}, + {JsonFields::kCACHE_IS_CURRENTLY_LOADING, node.cacheIsCurrentlyLoading} }; } ClioNode tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) { - auto const& updateTimeStr = jv.as_object().at(JsonFields::kUPDATE_TIME).as_string(); + auto const& obj = jv.as_object(); + auto const& updateTimeStr = obj.at(JsonFields::kUPDATE_TIME).as_string(); auto const updateTime = util::systemTpFromUtcStr(std::string(updateTimeStr), ClioNode::kTIME_FORMAT); if (!updateTime.has_value()) { throw std::runtime_error("Failed to parse update time"); } - auto const dbRoleValue = jv.as_object().at(JsonFields::kDB_ROLE).as_int64(); - if (dbRoleValue > static_cast(ClioNode::DbRole::MAX)) - throw std::runtime_error("Invalid db_role value"); + // Each field has a default value for backward compatibility + auto dbRole = ClioNode::DbRole::Fallback; + if (auto const* v = obj.if_contains(JsonFields::kDB_ROLE)) { + auto const dbRoleValue = v->as_int64(); + if (dbRoleValue > static_cast(ClioNode::DbRole::MAX)) + throw std::runtime_error("Invalid db_role value"); + dbRole = static_cast(dbRoleValue); + } - auto const etlStarted = jv.as_object().at(JsonFields::kETL_STARTED).as_bool(); - auto const cacheIsFull = jv.as_object().at(JsonFields::kCACHE_IS_FULL).as_bool(); + auto const etlStarted = + obj.contains(JsonFields::kETL_STARTED) ? obj.at(JsonFields::kETL_STARTED).as_bool() : true; + auto const cacheIsFull = obj.contains(JsonFields::kCACHE_IS_FULL) + ? obj.at(JsonFields::kCACHE_IS_FULL).as_bool() + : true; + auto const cacheIsCurrentlyLoading = obj.contains(JsonFields::kCACHE_IS_CURRENTLY_LOADING) + ? obj.at(JsonFields::kCACHE_IS_CURRENTLY_LOADING).as_bool() + : false; return ClioNode{ // Json data doesn't contain uuid so leaving it empty here. It will be filled outside of // this parsing .uuid = std::make_shared(), .updateTime = updateTime.value(), - .dbRole = static_cast(dbRoleValue), + .dbRole = dbRole, .etlStarted = etlStarted, - .cacheIsFull = cacheIsFull + .cacheIsFull = cacheIsFull, + .cacheIsCurrentlyLoading = cacheIsCurrentlyLoading }; } diff --git a/src/cluster/ClioNode.hpp b/src/cluster/ClioNode.hpp index f96668420..faafd9dba 100644 --- a/src/cluster/ClioNode.hpp +++ b/src/cluster/ClioNode.hpp @@ -19,6 +19,7 @@ #pragma once +#include "data/LedgerCacheLoadingState.hpp" #include "etl/WriterState.hpp" #include @@ -59,20 +60,26 @@ struct ClioNode { Uuid uuid; ///< The UUID of the node. std::chrono::system_clock::time_point - updateTime; ///< The time the data about the node was last updated. - DbRole dbRole; ///< The database role of the node - bool etlStarted; ///< Whether the ETL monitor has started on this node - bool cacheIsFull; ///< Whether the ledger cache is fully loaded on this node + updateTime; ///< The time the data about the node was last updated. + DbRole dbRole; ///< The database role of the node + bool etlStarted; ///< Whether the ETL monitor has started on this node + bool cacheIsFull; ///< Whether the ledger cache is fully loaded on this node + bool cacheIsCurrentlyLoading; ///< Whether this node is currently loading the ledger cache /** - * @brief Create a ClioNode from writer state. + * @brief Create a ClioNode from writer state and cache loading state. * * @param uuid The UUID of the node * @param writerState The writer state to determine the node's database role + * @param cacheLoadingState The cache loading state to determine if cache is being loaded * @return A ClioNode with the current time and role derived from writerState */ static ClioNode - from(Uuid uuid, etl::WriterStateInterface const& writerState); + from( + Uuid uuid, + etl::WriterStateInterface const& writerState, + data::LedgerCacheLoadingStateInterface const& cacheLoadingState + ); }; void diff --git a/src/cluster/ClusterCommunicationService.cpp b/src/cluster/ClusterCommunicationService.cpp index 0437f3cc6..c0640bc8c 100644 --- a/src/cluster/ClusterCommunicationService.cpp +++ b/src/cluster/ClusterCommunicationService.cpp @@ -20,7 +20,10 @@ #include "cluster/ClusterCommunicationService.hpp" #include "data/BackendInterface.hpp" +#include "data/LedgerCacheLoadingState.hpp" +#include "etl/SystemState.hpp" #include "etl/WriterState.hpp" +#include "util/config/ConfigDefinition.hpp" #include #include @@ -32,11 +35,20 @@ namespace cluster { ClusterCommunicationService::ClusterCommunicationService( std::shared_ptr backend, std::unique_ptr writerState, + std::unique_ptr cacheLoadingState, std::chrono::steady_clock::duration readInterval, std::chrono::steady_clock::duration writeInterval ) - : backend_(ctx_, std::move(backend), writerState->clone(), readInterval, writeInterval) + : backend_( + ctx_, + std::move(backend), + writerState->clone(), + cacheLoadingState->clone(), + readInterval, + writeInterval + ) , writerDecider_(ctx_, std::move(writerState)) + , cacheLoaderDecider_(ctx_, std::move(cacheLoadingState)) { } @@ -49,6 +61,9 @@ ClusterCommunicationService::run() backend_.subscribeToNewState([this](auto&&... args) { writerDecider_.onNewState(std::forward(args)...); }); + backend_.subscribeToNewState([this](auto&&... args) { + cacheLoaderDecider_.onNewState(std::forward(args)...); + }); backend_.run(); } @@ -63,4 +78,27 @@ ClusterCommunicationService::stop() backend_.stop(); } +ClusterCommunicationService::MakeResult +ClusterCommunicationService::make( + util::config::ClioConfigDefinition const& config, + std::shared_ptr backend, + std::shared_ptr state +) +{ + auto const& cache = backend->cache(); + auto cacheLoadingState = std::make_unique(cache); + if (not config.get("cache.limit_load_in_cluster")) { + cacheLoadingState->allowLoading(); + } + auto cacheLoadingStateClone = cacheLoadingState->clone(); + return MakeResult{ + .service = std::make_unique( + std::move(backend), + std::make_unique(std::move(state), cache), + std::move(cacheLoadingState) + ), + .cacheLoadingState = std::move(cacheLoadingStateClone) + }; +} + } // namespace cluster diff --git a/src/cluster/ClusterCommunicationService.hpp b/src/cluster/ClusterCommunicationService.hpp index d598a7336..f932b2326 100644 --- a/src/cluster/ClusterCommunicationService.hpp +++ b/src/cluster/ClusterCommunicationService.hpp @@ -20,11 +20,15 @@ #pragma once #include "cluster/Backend.hpp" +#include "cluster/CacheLoaderDecider.hpp" #include "cluster/Concepts.hpp" #include "cluster/Metrics.hpp" #include "cluster/WriterDecider.hpp" #include "data/BackendInterface.hpp" +#include "data/LedgerCacheLoadingState.hpp" +#include "etl/SystemState.hpp" #include "etl/WriterState.hpp" +#include "util/config/ConfigDefinition.hpp" #include #include @@ -48,6 +52,7 @@ class ClusterCommunicationService : public ClusterCommunicationServiceTag { Backend backend_; Metrics metrics_; WriterDecider writerDecider_; + CacheLoaderDecider cacheLoaderDecider_; public: static constexpr std::chrono::milliseconds kDEFAULT_READ_INTERVAL{1000}; @@ -58,12 +63,14 @@ public: * * @param backend The backend to use for communication. * @param writerState The state showing whether clio is writing to the database. + * @param cacheLoadingState State controlling cache loading permission for this node. * @param readInterval The interval to read messages from the cluster. * @param writeInterval The interval to write messages to the cluster. */ ClusterCommunicationService( std::shared_ptr backend, std::unique_ptr writerState, + std::unique_ptr cacheLoadingState, std::chrono::steady_clock::duration readInterval = kDEFAULT_READ_INTERVAL, std::chrono::steady_clock::duration writeInterval = kDEFAULT_WRITE_INTERVAL ); @@ -88,6 +95,37 @@ public: */ void stop(); + + /** + * @brief Result of ClusterCommunicationService::make(). + * + * The @c cacheLoadingState is a clone whose allowLoading() is connected to the state owned by + * the service, so the caller can pass it to the cache loader. + */ + struct MakeResult { + std::unique_ptr service; ///< The constructed service + std::unique_ptr + cacheLoadingState; ///< Clone of cache loading state for use by the cache loader + }; + + /** + * @brief Factory method: construct the service and return a cache loading state for the caller. + * + * Reads the @c cache.limit_load_in_cluster config flag: if true, loading is immediately + * allowed (single-node mode); if false, the cluster will gate permission via + * CacheLoaderDecider. + * + * @param config The application configuration + * @param backend The data backend + * @param state The shared ETL system state + * @return A MakeResult containing the service and a cache loading state clone + */ + static MakeResult + make( + util::config::ClioConfigDefinition const& config, + std::shared_ptr backend, + std::shared_ptr state + ); }; } // namespace cluster diff --git a/src/data/CMakeLists.txt b/src/data/CMakeLists.txt index ef861f821..f0f687906 100644 --- a/src/data/CMakeLists.txt +++ b/src/data/CMakeLists.txt @@ -6,6 +6,7 @@ target_sources( BackendCounters.cpp BackendInterface.cpp LedgerCache.cpp + LedgerCacheLoadingState.cpp LedgerCacheSaver.cpp LedgerHeaderCache.cpp cassandra/impl/Future.cpp diff --git a/src/data/LedgerCache.cpp b/src/data/LedgerCache.cpp index c4549199d..16eaba568 100644 --- a/src/data/LedgerCache.cpp +++ b/src/data/LedgerCache.cpp @@ -224,6 +224,7 @@ LedgerCache::setFull() return; full_ = true; + isCurrentlyLoading_ = false; std::scoped_lock const lck{mtx_}; deletes_.clear(); } @@ -290,4 +291,16 @@ LedgerCache::loadFromFile(std::string const& path, uint32_t minLatestSequence) return {}; } +void +LedgerCache::startLoading() +{ + isCurrentlyLoading_ = true; +} + +bool +LedgerCache::isCurrentlyLoading() const +{ + return isCurrentlyLoading_; +} + } // namespace data diff --git a/src/data/LedgerCache.hpp b/src/data/LedgerCache.hpp index d4194cfd9..2b2110ad4 100644 --- a/src/data/LedgerCache.hpp +++ b/src/data/LedgerCache.hpp @@ -103,6 +103,14 @@ private: util::prometheus::Labels{}, "Whether ledger cache is disabled or not" )}; + util::prometheus::Bool isCurrentlyLoading_{ + PrometheusService::boolMetric( + "ledger_cache_is_currently_loading", + util::prometheus::Labels{}, + "Whether ledger cache is currently loading or not" + ) + + }; // temporary set to prevent background thread from writing already deleted data. not used when // cache is full @@ -159,6 +167,12 @@ public: std::expected loadFromFile(std::string const& path, uint32_t minLatestSequence) override; + + void + startLoading() override; + + [[nodiscard]] bool + isCurrentlyLoading() const override; }; } // namespace data diff --git a/src/data/LedgerCacheInterface.hpp b/src/data/LedgerCacheInterface.hpp index ec4cd67a1..7e9d07114 100644 --- a/src/data/LedgerCacheInterface.hpp +++ b/src/data/LedgerCacheInterface.hpp @@ -193,6 +193,21 @@ public: */ [[nodiscard]] virtual std::expected loadFromFile(std::string const& path, uint32_t minLatestSequence) = 0; + + /** + * @brief Mark the cache as currently loading from the backend. + * @note Should be called before initiating a backend-based cache load. The flag is + * automatically cleared when setFull() is called. + */ + virtual void + startLoading() = 0; + + /** + * @brief Check whether the cache is currently being loaded from the backend. + * @return true if startLoading() has been called and setFull() has not yet been called + */ + [[nodiscard]] virtual bool + isCurrentlyLoading() const = 0; }; } // namespace data diff --git a/src/data/LedgerCacheLoadingState.cpp b/src/data/LedgerCacheLoadingState.cpp new file mode 100644 index 000000000..4417490c6 --- /dev/null +++ b/src/data/LedgerCacheLoadingState.cpp @@ -0,0 +1,61 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2026, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "data/LedgerCacheLoadingState.hpp" + +#include + +namespace data { + +LedgerCacheLoadingState::LedgerCacheLoadingState(LedgerCacheInterface const& cache) : cache_(cache) +{ +} + +void +LedgerCacheLoadingState::allowLoading() +{ + *isLoadingAllowed_ = true; + isLoadingAllowed_->notify_all(); +} + +bool +LedgerCacheLoadingState::isLoadingAllowed() const +{ + return *isLoadingAllowed_; +} + +void +LedgerCacheLoadingState::waitForLoadingAllowed() const +{ + isLoadingAllowed_->wait(false); +} + +bool +LedgerCacheLoadingState::isCurrentlyLoading() const +{ + return cache_.get().isCurrentlyLoading(); +} + +std::unique_ptr +LedgerCacheLoadingState::clone() const +{ + return std::make_unique(*this); +} + +} // namespace data diff --git a/src/data/LedgerCacheLoadingState.hpp b/src/data/LedgerCacheLoadingState.hpp new file mode 100644 index 000000000..3b420beb9 --- /dev/null +++ b/src/data/LedgerCacheLoadingState.hpp @@ -0,0 +1,116 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2026, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/LedgerCacheInterface.hpp" + +#include +#include +#include + +namespace data { + +/** + * @brief Interface for coordinating cache loading permissions across a cluster. + * + * Controls whether this node is allowed to load the ledger cache from the backend. + * In a cluster, at most one node should load the cache at a time; this state is used + * to gate loading until permission is granted. + */ +class LedgerCacheLoadingStateInterface { +public: + virtual ~LedgerCacheLoadingStateInterface() = default; + + /** + * @brief Allow this node to begin loading the cache from the backend. + */ + virtual void + allowLoading() = 0; + + /** + * @brief Check whether loading has been permitted. + * @return true if allowLoading() has been called + */ + [[nodiscard]] virtual bool + isLoadingAllowed() const = 0; + + /** + * @brief Block until loading is permitted. + * @note Returns immediately if allowLoading() was already called. + */ + virtual void + waitForLoadingAllowed() const = 0; + + /** + * @brief Check whether the cache is currently being loaded from the backend. + * @return true if the underlying cache has been marked as loading and is not yet full + */ + [[nodiscard]] virtual bool + isCurrentlyLoading() const = 0; + + /** + * @brief Create a clone that shares the same loading-allowed flag. + * @note Clones share the @c isLoadingAllowed_ atomic, so allowLoading() on any + * copy is visible to all clones. + * @return A new instance sharing the same loading permission state + */ + [[nodiscard]] virtual std::unique_ptr + clone() const = 0; +}; + +/** + * @brief Concrete implementation of @ref LedgerCacheLoadingStateInterface. + * + * Stores a reference to the ledger cache to delegate isCurrentlyLoading(), and a + * shared atomic flag for the loading-allowed coordination. + */ +class LedgerCacheLoadingState : public LedgerCacheLoadingStateInterface { + std::reference_wrapper cache_; + std::shared_ptr isLoadingAllowed_ = std::make_shared(false); + +public: + /** + * @brief Construct a new LedgerCacheLoadingState. + * @param cache The cache whose loading status will be monitored + */ + explicit LedgerCacheLoadingState(LedgerCacheInterface const& cache); + + /** @copydoc LedgerCacheLoadingStateInterface::allowLoading() */ + void + allowLoading() override; + + /** @copydoc LedgerCacheLoadingStateInterface::isLoadingAllowed() */ + [[nodiscard]] bool + isLoadingAllowed() const override; + + /** @copydoc LedgerCacheLoadingStateInterface::waitForLoadingAllowed() */ + void + waitForLoadingAllowed() const override; + + /** @copydoc LedgerCacheLoadingStateInterface::isCurrentlyLoading() */ + [[nodiscard]] bool + isCurrentlyLoading() const override; + + /** @copydoc LedgerCacheLoadingStateInterface::clone() */ + [[nodiscard]] std::unique_ptr + clone() const override; +}; + +} // namespace data diff --git a/src/etl/CacheLoader.hpp b/src/etl/CacheLoader.hpp index 34beadfdf..b919c4598 100644 --- a/src/etl/CacheLoader.hpp +++ b/src/etl/CacheLoader.hpp @@ -21,6 +21,7 @@ #include "data/BackendInterface.hpp" #include "data/LedgerCacheInterface.hpp" +#include "data/LedgerCacheLoadingState.hpp" #include "data/Types.hpp" #include "etl/CacheLoaderInterface.hpp" #include "etl/CacheLoaderSettings.hpp" @@ -60,6 +61,7 @@ class CacheLoader : public CacheLoaderInterface { std::reference_wrapper cache_; CacheLoaderSettings settings_; + std::unique_ptr cacheLoadingState_; ExecutionContextType ctx_; std::unique_ptr loader_; @@ -70,15 +72,18 @@ public: * @param config The configuration to use * @param backend The backend to use * @param cache The cache to load into + * @param cacheLoadingState State controlling whether loading from backend is currently allowed */ CacheLoader( util::config::ClioConfigDefinition const& config, std::shared_ptr backend, - data::LedgerCacheInterface& cache + data::LedgerCacheInterface& cache, + std::unique_ptr cacheLoadingState ) : backend_{std::move(backend)} , cache_{cache} , settings_{makeCacheLoaderSettings(config)} + , cacheLoadingState_(std::move(cacheLoadingState)) , ctx_{settings_.numThreads} { } @@ -109,6 +114,11 @@ public: return; } + LOG(log_.info()) << "Waiting for ledger cache loading to become allowed"; + cacheLoadingState_->waitForLoadingAllowed(); + LOG(log_.info()) << "Ledger cache loading is now allowed. Start loading..."; + cache_.get().startLoading(); + std::shared_ptr provider; if (settings_.numCacheCursorsFromDiff != 0) { LOG(log_.info()) << "Loading cache with cursor from num_cursors_from_diff=" diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index d73b2d971..4629f80a2 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -21,6 +21,7 @@ #include "data/BackendInterface.hpp" #include "data/LedgerCacheInterface.hpp" +#include "data/LedgerCacheLoadingState.hpp" #include "data/Types.hpp" #include "etl/CacheLoader.hpp" #include "etl/CacheLoaderInterface.hpp" @@ -79,6 +80,7 @@ std::shared_ptr ETLService::makeETLService( util::config::ClioConfigDefinition const& config, std::shared_ptr state, + std::unique_ptr cacheLoadingState, util::async::AnyExecutionContext ctx, std::shared_ptr backend, std::shared_ptr subscriptions, @@ -91,7 +93,9 @@ ETLService::makeETLService( auto fetcher = std::make_shared(backend, balancer); auto extractor = std::make_shared(fetcher); auto publisher = std::make_shared(ctx, backend, subscriptions, *state); - auto cacheLoader = std::make_shared>(config, backend, backend->cache()); + auto cacheLoader = std::make_shared>( + config, backend, backend->cache(), std::move(cacheLoadingState) + ); auto cacheUpdater = std::make_shared(backend->cache()); auto amendmentBlockHandler = std::make_shared(ctx, *state); auto monitorProvider = std::make_shared(); diff --git a/src/etl/ETLService.hpp b/src/etl/ETLService.hpp index 6260c0f44..10fe38b9b 100644 --- a/src/etl/ETLService.hpp +++ b/src/etl/ETLService.hpp @@ -20,6 +20,7 @@ #pragma once #include "data/BackendInterface.hpp" +#include "data/LedgerCacheLoadingState.hpp" #include "data/Types.hpp" #include "etl/CacheLoaderInterface.hpp" #include "etl/CacheUpdaterInterface.hpp" @@ -134,6 +135,7 @@ public: * * @param config The configuration to use * @param state The system state tracking object + * @param cacheLoadingState State controlling whether this node is allowed to load the cache * @param ctx Execution context for asynchronous operations * @param backend BackendInterface implementation * @param subscriptions Subscription manager @@ -145,6 +147,7 @@ public: makeETLService( util::config::ClioConfigDefinition const& config, std::shared_ptr state, + std::unique_ptr cacheLoadingState, util::async::AnyExecutionContext ctx, std::shared_ptr backend, std::shared_ptr subscriptions, diff --git a/src/util/config/ConfigDefinition.cpp b/src/util/config/ConfigDefinition.cpp index fd060c42e..babca5256 100644 --- a/src/util/config/ConfigDefinition.cpp +++ b/src/util/config/ConfigDefinition.cpp @@ -407,6 +407,7 @@ getClioConfig() ConfigValue{ConfigType::Integer}.defaultValue(0).withConstraint(gValidateNumCursors)}, {"cache.page_fetch_size", ConfigValue{ConfigType::Integer}.defaultValue(512).withConstraint(gValidateUint16)}, + {"cache.limit_load_in_cluster", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, {"cache.load", ConfigValue{ConfigType::String}.defaultValue("async").withConstraint(gValidateLoadMode)}, {"cache.file.path", ConfigValue{ConfigType::String}.optional()}, diff --git a/src/util/config/ConfigDescription.hpp b/src/util/config/ConfigDescription.hpp index 8903e1339..ecd0d3f83 100644 --- a/src/util/config/ConfigDescription.hpp +++ b/src/util/config/ConfigDescription.hpp @@ -326,6 +326,9 @@ This document provides a list of all available Clio configuration properties in "`cache.num_diffs`."}, KV{.key = "cache.page_fetch_size", .value = "The number of ledger objects to fetch concurrently per marker."}, + KV{.key = "cache.limit_load_in_cluster", + .value = "If enabled only one clio node in a cluster (sharing the same database) will " + "load cache at a time"}, KV{.key = "cache.load", .value = "The strategy used for Cache loading."}, KV{.key = "cache.file.path", .value = "The path to a file where cache will be saved to on shutdown and loaded from " diff --git a/tests/common/util/MockLedgerCache.hpp b/tests/common/util/MockLedgerCache.hpp index 1d55f37fa..e92945c32 100644 --- a/tests/common/util/MockLedgerCache.hpp +++ b/tests/common/util/MockLedgerCache.hpp @@ -104,4 +104,8 @@ struct MockLedgerCache : data::LedgerCacheInterface { (std::string const& path, uint32_t minLatestSequence), (override) ); + + MOCK_METHOD(void, startLoading, (), (override)); + + MOCK_METHOD(bool, isCurrentlyLoading, (), (const, override)); }; diff --git a/tests/common/util/MockLedgerCacheLoadingState.hpp b/tests/common/util/MockLedgerCacheLoadingState.hpp new file mode 100644 index 000000000..521c20e56 --- /dev/null +++ b/tests/common/util/MockLedgerCacheLoadingState.hpp @@ -0,0 +1,42 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2026, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/LedgerCacheLoadingState.hpp" + +#include + +#include + +struct MockLedgerCacheLoadingStateBase : data::LedgerCacheLoadingStateInterface { + MOCK_METHOD(void, allowLoading, (), (override)); + MOCK_METHOD(bool, isLoadingAllowed, (), (const, override)); + MOCK_METHOD(void, waitForLoadingAllowed, (), (const, override)); + MOCK_METHOD(bool, isCurrentlyLoading, (), (const, override)); + MOCK_METHOD( + std::unique_ptr, + clone, + (), + (const, override) + ); +}; + +using MockLedgerCacheLoadingState = testing::StrictMock; +using NiceMockLedgerCacheLoadingState = testing::NiceMock; diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 9966e37b3..06a775914 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -12,6 +12,7 @@ target_sources( data/BackendCountersTests.cpp data/BackendInterfaceTests.cpp data/LedgerCacheTests.cpp + data/LedgerCacheLoadingStateTests.cpp data/LedgerCacheSaverTests.cpp data/cassandra/AsyncExecutorTests.cpp data/cassandra/ExecutionStrategyTests.cpp @@ -23,6 +24,7 @@ target_sources( data/impl/OutputFileTests.cpp # Cluster cluster/BackendTests.cpp + cluster/CacheLoaderDeciderTests.cpp cluster/ClioNodeTests.cpp cluster/ClusterCommunicationServiceTests.cpp cluster/MetricsTests.cpp diff --git a/tests/unit/cluster/BackendTests.cpp b/tests/unit/cluster/BackendTests.cpp index 3cae7f54a..fb6d1b797 100644 --- a/tests/unit/cluster/BackendTests.cpp +++ b/tests/unit/cluster/BackendTests.cpp @@ -21,6 +21,7 @@ #include "cluster/ClioNode.hpp" #include "data/BackendInterface.hpp" #include "util/MockBackendTestFixture.hpp" +#include "util/MockLedgerCacheLoadingState.hpp" #include "util/MockPrometheus.hpp" #include "util/MockWriterState.hpp" @@ -53,6 +54,8 @@ struct ClusterBackendTest : util::prometheus::WithPrometheus, MockBackendTestStr boost::asio::thread_pool ctx; std::unique_ptr writerState = std::make_unique(); MockWriterState& writerStateRef = *writerState; + std::unique_ptr cacheLoadingState = + std::make_unique(); testing::StrictMock< testing::MockFunction)>> callbackMock; @@ -78,6 +81,7 @@ TEST_F(ClusterBackendTest, SubscribeToNewState) ctx, backend_, std::move(writerState), + std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; @@ -122,6 +126,7 @@ TEST_F(ClusterBackendTest, Stop) ctx, backend_, std::move(writerState), + std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; @@ -155,6 +160,7 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataThrowsException) ctx, backend_, std::move(writerState), + std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; @@ -194,6 +200,7 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsDataWithOtherNodes) ctx, backend_, std::move(writerState), + std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; @@ -205,7 +212,8 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsDataWithOtherNodes) "db_role": 2, "update_time": "2025-01-15T10:30:00Z", "etl_started": false, - "cache_is_full": false + "cache_is_full": false, + "cache_is_currently_loading": false })JSON"; EXPECT_CALL(*backend_, fetchClioNodesData) @@ -272,6 +280,7 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsOnlySelfData) ctx, backend_, std::move(writerState), + std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; @@ -282,7 +291,8 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsOnlySelfData) "db_role": 1, "update_time": "2025-01-16T10:30:00Z", "etl_started": false, - "cache_is_full": false + "cache_is_full": false, + "cache_is_currently_loading": false })JSON"; EXPECT_CALL(*backend_, fetchClioNodesData).Times(testing::AtLeast(1)).WillRepeatedly([&]() { @@ -327,6 +337,7 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsInvalidJson) ctx, backend_, std::move(writerState), + std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; @@ -378,6 +389,7 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsValidJsonButCannotConvertToC ctx, backend_, std::move(writerState), + std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; @@ -385,9 +397,10 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsValidJsonButCannotConvertToC clusterBackend.subscribeToNewState(callbackMock.AsStdFunction()); auto const otherUuid = boost::uuids::random_generator{}(); - // Valid JSON but missing required field 'db_role' + // Valid JSON but db_role has wrong type (string instead of integer) auto const validJsonMissingField = R"JSON({ - "update_time": "2025-01-16T10:30:00Z" + "update_time": "2025-01-16T10:30:00Z", + "db_role": "writer" })JSON"; EXPECT_CALL(*backend_, fetchClioNodesData) @@ -433,6 +446,7 @@ TEST_F(ClusterBackendTest, WriteNodeMessageWritesSelfDataWithRecentTimestampAndD ctx, backend_, std::move(writerState), + std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; @@ -476,3 +490,92 @@ TEST_F(ClusterBackendTest, WriteNodeMessageWritesSelfDataWithRecentTimestampAndD clusterBackend.run(); semaphore.acquire(); } + +TEST_F(ClusterBackendTest, WriteNodeMessageReflectsCacheIsCurrentlyLoading) +{ + auto& cacheLoadingStateRef = *cacheLoadingState; + Backend clusterBackend{ + ctx, + backend_, + std::move(writerState), + std::move(cacheLoadingState), + std::chrono::milliseconds(1), + std::chrono::milliseconds(1) + }; + + EXPECT_CALL(*backend_, fetchClioNodesData) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{})); + EXPECT_CALL(writerStateRef, isReadOnly) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(writerStateRef, isEtlStarted) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isCacheFull) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(cacheLoadingStateRef, isCurrentlyLoading) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(*backend_, writeNodeMessage) + .Times(testing::AtLeast(1)) + .WillRepeatedly([&](boost::uuids::uuid const&, std::string message) { + SemaphoreReleaseGuard const guard{semaphore}; + auto const json = boost::json::parse(message); + auto const node = boost::json::try_value_to(json); + ASSERT_TRUE(node.has_value()); + EXPECT_TRUE(node->cacheIsCurrentlyLoading); + }); + + clusterBackend.run(); + semaphore.acquire(); +} + +TEST_F(ClusterBackendTest, SubscribeToNewStateReflectsCacheIsCurrentlyLoading) +{ + auto& cacheLoadingStateRef = *cacheLoadingState; + Backend clusterBackend{ + ctx, + backend_, + std::move(writerState), + std::move(cacheLoadingState), + std::chrono::milliseconds(1), + std::chrono::milliseconds(1) + }; + + clusterBackend.subscribeToNewState(callbackMock.AsStdFunction()); + + EXPECT_CALL(*backend_, fetchClioNodesData) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{})); + EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1)); + EXPECT_CALL(writerStateRef, isReadOnly) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(writerStateRef, isEtlStarted) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isCacheFull) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(cacheLoadingStateRef, isCurrentlyLoading) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(callbackMock, Call) + .Times(testing::AtLeast(1)) + .WillRepeatedly([this]( + ClioNode::CUuid selfId, + std::shared_ptr clusterData + ) { + SemaphoreReleaseGuard const guard{semaphore}; + ASSERT_TRUE(clusterData->has_value()); + ASSERT_EQ(clusterData->value().size(), 1); + auto const& selfNode = clusterData->value().front(); + EXPECT_EQ(selfNode.uuid, selfId); + EXPECT_TRUE(selfNode.cacheIsCurrentlyLoading); + }); + + clusterBackend.run(); + semaphore.acquire(); +} diff --git a/tests/unit/cluster/CacheLoaderDeciderTests.cpp b/tests/unit/cluster/CacheLoaderDeciderTests.cpp new file mode 100644 index 000000000..94c0a77c6 --- /dev/null +++ b/tests/unit/cluster/CacheLoaderDeciderTests.cpp @@ -0,0 +1,223 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2026, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "cluster/Backend.hpp" +#include "cluster/CacheLoaderDecider.hpp" +#include "cluster/ClioNode.hpp" +#include "util/MockLedgerCacheLoadingState.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +using namespace cluster; + +enum class CacheLoaderExpectedAction { AllowLoading, NoAction }; + +struct CacheLoaderNodeParams { + uint8_t uuidValue; + bool cacheIsFull = false; + bool cacheIsCurrentlyLoading = false; +}; + +struct CacheLoaderDeciderTestParams { + std::string testName; + uint8_t selfUuidValue; + std::vector nodes; + CacheLoaderExpectedAction expectedAction; + bool useEmptyClusterData = false; +}; + +struct CacheLoaderDeciderTest : testing::TestWithParam { + ~CacheLoaderDeciderTest() override + { + ctx.stop(); + ctx.join(); + } + + boost::asio::thread_pool ctx{1}; + std::unique_ptr cacheLoadingState = + std::make_unique(); + MockLedgerCacheLoadingState& cacheLoadingStateRef = *cacheLoadingState; + + static ClioNode + makeNode(boost::uuids::uuid const& uuid, bool cacheIsFull, bool cacheIsCurrentlyLoading) + { + return ClioNode{ + .uuid = std::make_shared(uuid), + .updateTime = std::chrono::system_clock::now(), + .dbRole = ClioNode::DbRole::NotWriter, + .etlStarted = true, + .cacheIsFull = cacheIsFull, + .cacheIsCurrentlyLoading = cacheIsCurrentlyLoading, + }; + } + + static boost::uuids::uuid + makeUuid(uint8_t value) + { + boost::uuids::uuid uuid{}; + std::ranges::fill(uuid, value); + return uuid; + } +}; + +TEST_P(CacheLoaderDeciderTest, CacheLoaderSelection) +{ + auto const& params = GetParam(); + + auto const selfUuid = makeUuid(params.selfUuidValue); + + CacheLoaderDecider decider{ctx, std::move(cacheLoadingState)}; + + auto clonedState = std::make_unique(); + + switch (params.expectedAction) { + case CacheLoaderExpectedAction::AllowLoading: + EXPECT_CALL(*clonedState, allowLoading()); + EXPECT_CALL(cacheLoadingStateRef, clone()) + .WillOnce(testing::Return(testing::ByMove(std::move(clonedState)))); + break; + case CacheLoaderExpectedAction::NoAction: + if (not params.useEmptyClusterData) { + EXPECT_CALL(cacheLoadingStateRef, clone()) + .WillOnce(testing::Return(testing::ByMove(std::move(clonedState)))); + } + break; + } + + std::shared_ptr clusterData; + ClioNode::CUuid selfIdPtr; + + if (params.useEmptyClusterData) { + clusterData = std::make_shared( + std::unexpected(std::string("Communication failed")) + ); + selfIdPtr = std::make_shared(selfUuid); + } else { + std::vector nodes; + nodes.reserve(params.nodes.size()); + for (auto const& nodeParam : params.nodes) { + auto node = makeNode( + makeUuid(nodeParam.uuidValue), + nodeParam.cacheIsFull, + nodeParam.cacheIsCurrentlyLoading + ); + if (nodeParam.uuidValue == params.selfUuidValue) { + selfIdPtr = node.uuid; + } + nodes.push_back(std::move(node)); + } + clusterData = std::make_shared(std::move(nodes)); + } + + decider.onNewState(selfIdPtr, clusterData); + + ctx.join(); +} + +INSTANTIATE_TEST_SUITE_P( + CacheLoaderDeciderTests, + CacheLoaderDeciderTest, + testing::Values( + CacheLoaderDeciderTestParams{ + .testName = "SelfCacheIsFullNoAction", + .selfUuidValue = 0x01, + .nodes = + {{.uuidValue = 0x01, .cacheIsFull = true}, + {.uuidValue = 0x02, .cacheIsFull = false}}, + .expectedAction = CacheLoaderExpectedAction::NoAction + }, + CacheLoaderDeciderTestParams{ + .testName = "SelfIsFirstNotFullByUuid_AllowLoading", + .selfUuidValue = 0x01, + .nodes = + {{.uuidValue = 0x01, .cacheIsFull = false}, + {.uuidValue = 0x02, .cacheIsFull = false}}, + .expectedAction = CacheLoaderExpectedAction::AllowLoading + }, + CacheLoaderDeciderTestParams{ + .testName = "OtherNodeIsFirstNotFullByUuid_NoAction", + .selfUuidValue = 0x02, + .nodes = + {{.uuidValue = 0x01, .cacheIsFull = false}, + {.uuidValue = 0x02, .cacheIsFull = false}}, + .expectedAction = CacheLoaderExpectedAction::NoAction + }, + CacheLoaderDeciderTestParams{ + .testName = "ShuffledNodes_SelfIsFirstNotFull_AllowLoading", + .selfUuidValue = 0x02, + .nodes = + {{.uuidValue = 0x04, .cacheIsFull = false}, + {.uuidValue = 0x02, .cacheIsFull = false}, + {.uuidValue = 0x03, .cacheIsFull = true}}, + .expectedAction = CacheLoaderExpectedAction::AllowLoading + }, + CacheLoaderDeciderTestParams{ + .testName = "FullNodesExcludedFromElection", + .selfUuidValue = 0x03, + .nodes = + {{.uuidValue = 0x01, .cacheIsFull = true}, + {.uuidValue = 0x02, .cacheIsFull = true}, + {.uuidValue = 0x03, .cacheIsFull = false}}, + .expectedAction = CacheLoaderExpectedAction::AllowLoading + }, + CacheLoaderDeciderTestParams{ + .testName = "SomeoneIsCurrentlyLoading_NoAction", + .selfUuidValue = 0x01, + .nodes = + {{.uuidValue = 0x01, .cacheIsFull = false, .cacheIsCurrentlyLoading = false}, + {.uuidValue = 0x02, .cacheIsFull = false, .cacheIsCurrentlyLoading = true}}, + .expectedAction = CacheLoaderExpectedAction::NoAction + }, + CacheLoaderDeciderTestParams{ + .testName = "SelfIsCurrentlyLoading_NoAction", + .selfUuidValue = 0x01, + .nodes = + {{.uuidValue = 0x01, .cacheIsFull = false, .cacheIsCurrentlyLoading = true}, + {.uuidValue = 0x02, .cacheIsFull = false, .cacheIsCurrentlyLoading = false}}, + .expectedAction = CacheLoaderExpectedAction::NoAction + }, + CacheLoaderDeciderTestParams{ + .testName = "SingleNodeCluster_SelfAllowLoading", + .selfUuidValue = 0x01, + .nodes = {{.uuidValue = 0x01, .cacheIsFull = false}}, + .expectedAction = CacheLoaderExpectedAction::AllowLoading + }, + CacheLoaderDeciderTestParams{ + .testName = "EmptyClusterData_NoAction", + .selfUuidValue = 0x01, + .nodes = {}, + .expectedAction = CacheLoaderExpectedAction::NoAction, + .useEmptyClusterData = true + } + ), + [](testing::TestParamInfo const& info) { + return info.param.testName; + } +); diff --git a/tests/unit/cluster/ClioNodeTests.cpp b/tests/unit/cluster/ClioNodeTests.cpp index 2b86d577e..c043de1d1 100644 --- a/tests/unit/cluster/ClioNodeTests.cpp +++ b/tests/unit/cluster/ClioNodeTests.cpp @@ -18,6 +18,7 @@ //============================================================================== #include "cluster/ClioNode.hpp" +#include "util/MockLedgerCacheLoadingState.hpp" #include "util/MockWriterState.hpp" #include "util/NameGenerator.hpp" #include "util/TimeUtils.hpp" @@ -53,7 +54,8 @@ TEST_F(ClioNodeTest, Serialization) .updateTime = updateTime, .dbRole = ClioNode::DbRole::Writer, .etlStarted = true, - .cacheIsFull = false + .cacheIsFull = false, + .cacheIsCurrentlyLoading = true }; boost::json::value jsonValue; @@ -74,6 +76,9 @@ TEST_F(ClioNodeTest, Serialization) EXPECT_TRUE(obj.contains("cache_is_full")); EXPECT_EQ(obj.at("cache_is_full").as_bool(), node.cacheIsFull); + + EXPECT_TRUE(obj.contains("cache_is_currently_loading")); + EXPECT_EQ(obj.at("cache_is_currently_loading").as_bool(), node.cacheIsCurrentlyLoading); } TEST_F(ClioNodeTest, Deserialization) @@ -82,7 +87,8 @@ TEST_F(ClioNodeTest, Deserialization) {"update_time", updateTimeStr}, {"db_role", 1}, {"etl_started", true}, - {"cache_is_full", false} + {"cache_is_full", false}, + {"cache_is_currently_loading", true} }; ClioNode node{ @@ -90,7 +96,8 @@ TEST_F(ClioNodeTest, Deserialization) .updateTime = {}, .dbRole = ClioNode::DbRole::ReadOnly, .etlStarted = false, - .cacheIsFull = false + .cacheIsFull = false, + .cacheIsCurrentlyLoading = false }; ASSERT_NO_THROW(node = boost::json::value_to(jsonValue)); @@ -100,6 +107,7 @@ TEST_F(ClioNodeTest, Deserialization) EXPECT_EQ(node.dbRole, ClioNode::DbRole::NotWriter); EXPECT_TRUE(node.etlStarted); EXPECT_FALSE(node.cacheIsFull); + EXPECT_TRUE(node.cacheIsCurrentlyLoading); } TEST_F(ClioNodeTest, DeserializationInvalidTime) @@ -120,15 +128,77 @@ TEST_F(ClioNodeTest, DeserializationMissingTime) TEST_F(ClioNodeTest, DeserializationMissingEtlStarted) { boost::json::value const jsonValue = { - {"update_time", updateTimeStr}, {"db_role", 1}, {"cache_is_full", false} + {"update_time", updateTimeStr}, + {"db_role", 1}, + {"cache_is_full", false}, + {"cache_is_currently_loading", false} }; - EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); + ClioNode node{}; + ASSERT_NO_THROW(node = boost::json::value_to(jsonValue)); + EXPECT_TRUE(node.etlStarted); // defaults to true } TEST_F(ClioNodeTest, DeserializationMissingCacheIsFull) { boost::json::value const jsonValue = { - {"update_time", updateTimeStr}, {"db_role", 1}, {"etl_started", true} + {"update_time", updateTimeStr}, + {"db_role", 1}, + {"etl_started", true}, + {"cache_is_currently_loading", false} + }; + ClioNode node{}; + ASSERT_NO_THROW(node = boost::json::value_to(jsonValue)); + EXPECT_TRUE(node.cacheIsFull); // defaults to true +} + +TEST_F(ClioNodeTest, DeserializationMissingCacheIsCurrentlyLoading) +{ + boost::json::value const jsonValue = { + {"update_time", updateTimeStr}, + {"db_role", 1}, + {"etl_started", true}, + {"cache_is_full", false} + }; + ClioNode node{}; + ASSERT_NO_THROW(node = boost::json::value_to(jsonValue)); + EXPECT_FALSE(node.cacheIsCurrentlyLoading); // defaults to false +} + +TEST_F(ClioNodeTest, DeserializationMissingDbRole) +{ + boost::json::value const jsonValue = { + {"update_time", updateTimeStr}, + {"etl_started", false}, + {"cache_is_full", false}, + {"cache_is_currently_loading", false} + }; + ClioNode node{}; + ASSERT_NO_THROW(node = boost::json::value_to(jsonValue)); + EXPECT_EQ(node.dbRole, ClioNode::DbRole::Fallback); // defaults to Fallback +} + +TEST_F(ClioNodeTest, DeserializationOldNodeFormat) +{ + // Old nodes (pre cluster-coordination release) only write update_time. + // Parsing must succeed with safe backward-compatible defaults. + boost::json::value const jsonValue = {{"update_time", updateTimeStr}}; + ClioNode node{}; + ASSERT_NO_THROW(node = boost::json::value_to(jsonValue)); + EXPECT_EQ(node.updateTime, updateTime); + EXPECT_EQ(node.dbRole, ClioNode::DbRole::Fallback); + EXPECT_TRUE(node.etlStarted); + EXPECT_TRUE(node.cacheIsFull); + EXPECT_FALSE(node.cacheIsCurrentlyLoading); +} + +TEST_F(ClioNodeTest, DeserializationInvalidDbRole) +{ + boost::json::value const jsonValue = { + {"update_time", updateTimeStr}, + {"db_role", 10}, + {"etl_started", false}, + {"cache_is_full", false}, + {"cache_is_currently_loading", false} }; EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); } @@ -160,7 +230,8 @@ TEST_P(ClioNodeDbRoleTest, Serialization) .updateTime = updateTime, .dbRole = param.role, .etlStarted = false, - .cacheIsFull = false + .cacheIsFull = false, + .cacheIsCurrentlyLoading = false }; auto const jsonValue = boost::json::value_from(node); EXPECT_EQ(jsonValue.as_object().at("db_role").as_int64(), static_cast(param.role)); @@ -173,31 +244,13 @@ TEST_P(ClioNodeDbRoleTest, Deserialization) {"update_time", updateTimeStr}, {"db_role", static_cast(param.role)}, {"etl_started", false}, - {"cache_is_full", false} + {"cache_is_full", false}, + {"cache_is_currently_loading", false} }; auto const node = boost::json::value_to(jsonValue); EXPECT_EQ(node.dbRole, param.role); } -TEST_F(ClioNodeDbRoleTest, DeserializationInvalidDbRole) -{ - boost::json::value const jsonValue = { - {"update_time", updateTimeStr}, - {"db_role", 10}, - {"etl_started", false}, - {"cache_is_full", false} - }; - EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); -} - -TEST_F(ClioNodeDbRoleTest, DeserializationMissingDbRole) -{ - boost::json::value const jsonValue = { - {"update_time", updateTimeStr}, {"etl_started", false}, {"cache_is_full", false} - }; - EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); -} - struct ClioNodeFromTestBundle { std::string testName; bool readOnly; @@ -205,6 +258,7 @@ struct ClioNodeFromTestBundle { bool writing; bool etlStarted; bool cacheIsFull; + bool cacheIsCurrentlyLoading; ClioNode::DbRole expectedRole; }; @@ -213,6 +267,7 @@ struct ClioNodeFromTest : ClioNodeTest, testing::WithParamInterface(boost::uuids::random_generator()()); MockWriterState writerState; + MockLedgerCacheLoadingState cacheLoadingState; }; INSTANTIATE_TEST_SUITE_P( @@ -226,6 +281,7 @@ INSTANTIATE_TEST_SUITE_P( .writing = false, .etlStarted = false, .cacheIsFull = false, + .cacheIsCurrentlyLoading = false, .expectedRole = ClioNode::DbRole::ReadOnly }, ClioNodeFromTestBundle{ @@ -235,6 +291,7 @@ INSTANTIATE_TEST_SUITE_P( .writing = false, .etlStarted = false, .cacheIsFull = false, + .cacheIsCurrentlyLoading = false, .expectedRole = ClioNode::DbRole::Fallback }, ClioNodeFromTestBundle{ @@ -244,6 +301,7 @@ INSTANTIATE_TEST_SUITE_P( .writing = false, .etlStarted = true, .cacheIsFull = false, + .cacheIsCurrentlyLoading = false, .expectedRole = ClioNode::DbRole::NotWriter }, ClioNodeFromTestBundle{ @@ -253,6 +311,7 @@ INSTANTIATE_TEST_SUITE_P( .writing = true, .etlStarted = true, .cacheIsFull = true, + .cacheIsCurrentlyLoading = true, .expectedRole = ClioNode::DbRole::Writer } ), @@ -272,15 +331,18 @@ TEST_P(ClioNodeFromTest, FromWriterState) } EXPECT_CALL(writerState, isEtlStarted()).WillOnce(testing::Return(param.etlStarted)); EXPECT_CALL(writerState, isCacheFull()).WillOnce(testing::Return(param.cacheIsFull)); + EXPECT_CALL(cacheLoadingState, isCurrentlyLoading()) + .WillOnce(testing::Return(param.cacheIsCurrentlyLoading)); auto const beforeTime = std::chrono::system_clock::now(); - auto const node = ClioNode::from(uuid, writerState); + auto const node = ClioNode::from(uuid, writerState, cacheLoadingState); auto const afterTime = std::chrono::system_clock::now(); EXPECT_EQ(node.uuid, uuid); EXPECT_EQ(node.dbRole, param.expectedRole); EXPECT_EQ(node.etlStarted, param.etlStarted); EXPECT_EQ(node.cacheIsFull, param.cacheIsFull); + EXPECT_EQ(node.cacheIsCurrentlyLoading, param.cacheIsCurrentlyLoading); EXPECT_GE(node.updateTime, beforeTime); EXPECT_LE(node.updateTime, afterTime); } diff --git a/tests/unit/cluster/ClusterCommunicationServiceTests.cpp b/tests/unit/cluster/ClusterCommunicationServiceTests.cpp index e6c7b719e..80ec4873d 100644 --- a/tests/unit/cluster/ClusterCommunicationServiceTests.cpp +++ b/tests/unit/cluster/ClusterCommunicationServiceTests.cpp @@ -20,9 +20,15 @@ #include "cluster/ClioNode.hpp" #include "cluster/ClusterCommunicationService.hpp" #include "data/BackendInterface.hpp" +#include "etl/SystemState.hpp" #include "util/MockBackendTestFixture.hpp" +#include "util/MockLedgerCacheLoadingState.hpp" #include "util/MockPrometheus.hpp" #include "util/MockWriterState.hpp" +#include "util/NameGenerator.hpp" +#include "util/config/ConfigDefinition.hpp" +#include "util/config/ConfigValue.hpp" +#include "util/config/Types.hpp" #include "util/prometheus/Prometheus.hpp" #include @@ -48,6 +54,8 @@ using namespace cluster; struct ClusterCommunicationServiceTest : util::prometheus::WithPrometheus, MockBackendTest { std::unique_ptr writerState = std::make_unique(); NiceMockWriterState& writerStateRef = *writerState; + std::unique_ptr cacheLoadingState = + std::make_unique(); static constexpr std::chrono::milliseconds kSHORT_INTERVAL{1}; @@ -68,6 +76,7 @@ struct ClusterCommunicationServiceTest : util::prometheus::WithPrometheus, MockB .dbRole = role, .etlStarted = true, .cacheIsFull = true, + .cacheIsCurrentlyLoading = false, }; } @@ -88,6 +97,9 @@ struct ClusterCommunicationServiceTest : util::prometheus::WithPrometheus, MockB })); ON_CALL(writerStateRef, isReadOnly()).WillByDefault(testing::Return(false)); ON_CALL(writerStateRef, isWriting()).WillByDefault(testing::Return(true)); + ON_CALL(*cacheLoadingState, clone()).WillByDefault(testing::Invoke([]() { + return std::make_unique(); + })); } static bool @@ -122,7 +134,11 @@ TEST_F(ClusterCommunicationServiceTest, BackendReadsAndWritesData) })); ClusterCommunicationService service{ - backend_, std::move(writerState), kSHORT_INTERVAL, kSHORT_INTERVAL + backend_, + std::move(writerState), + std::move(cacheLoadingState), + kSHORT_INTERVAL, + kSHORT_INTERVAL }; service.run(); @@ -163,7 +179,11 @@ TEST_F(ClusterCommunicationServiceTest, MetricsGetsNewStateFromBackend) auto isHealthyMetric = PrometheusService::boolMetric("cluster_communication_is_healthy", {}); ClusterCommunicationService service{ - backend_, std::move(writerState), kSHORT_INTERVAL, kSHORT_INTERVAL + backend_, + std::move(writerState), + std::move(cacheLoadingState), + kSHORT_INTERVAL, + kSHORT_INTERVAL }; service.run(); @@ -208,7 +228,11 @@ TEST_F(ClusterCommunicationServiceTest, WriterDeciderCallsWriterStateMethodsAcco })); ClusterCommunicationService service{ - backend_, std::move(writerState), kSHORT_INTERVAL, kSHORT_INTERVAL + backend_, + std::move(writerState), + std::move(cacheLoadingState), + kSHORT_INTERVAL, + kSHORT_INTERVAL }; service.run(); @@ -238,7 +262,11 @@ TEST_F(ClusterCommunicationServiceTest, StopHaltsBackendOperations) })); ClusterCommunicationService service{ - backend_, std::move(writerState), kSHORT_INTERVAL, kSHORT_INTERVAL + backend_, + std::move(writerState), + std::move(cacheLoadingState), + kSHORT_INTERVAL, + kSHORT_INTERVAL }; service.run(); @@ -249,3 +277,47 @@ TEST_F(ClusterCommunicationServiceTest, StopHaltsBackendOperations) std::this_thread::sleep_for(std::chrono::milliseconds{50}); EXPECT_EQ(backendOperationsCount.load(), countAfterStop); } + +struct ClusterCommunicationServiceMakeTestBundle { + std::string testName; + bool limitLoadInCluster; +}; + +struct ClusterCommunicationServiceMakeTest + : util::prometheus::WithPrometheus, + MockBackendTest, + testing::WithParamInterface { + std::shared_ptr systemState = std::make_shared(); +}; + +INSTANTIATE_TEST_SUITE_P( + LimitLoadInCluster, + ClusterCommunicationServiceMakeTest, + testing::Values( + ClusterCommunicationServiceMakeTestBundle{ + .testName = "AllowsLoadingWhenTrue", + .limitLoadInCluster = true + }, + ClusterCommunicationServiceMakeTestBundle{ + .testName = "DoesNotAllowLoadingWhenFalse", + .limitLoadInCluster = false + } + ), + tests::util::kNAME_GENERATOR +); + +TEST_P(ClusterCommunicationServiceMakeTest, LoadingAllowedMatchesConfig) +{ + auto const& param = GetParam(); + util::config::ClioConfigDefinition config{ + {{"cache.limit_load_in_cluster", + util::config::ConfigValue{util::config::ConfigType::Boolean}.defaultValue( + param.limitLoadInCluster + )}} + }; + + auto result = ClusterCommunicationService::make(config, backend_, systemState); + + ASSERT_NE(result.cacheLoadingState, nullptr); + EXPECT_EQ(result.cacheLoadingState->isLoadingAllowed(), not param.limitLoadInCluster); +} diff --git a/tests/unit/cluster/MetricsTests.cpp b/tests/unit/cluster/MetricsTests.cpp index 8f18dc549..74072cdfb 100644 --- a/tests/unit/cluster/MetricsTests.cpp +++ b/tests/unit/cluster/MetricsTests.cpp @@ -73,21 +73,24 @@ TEST_F(MetricsTest, OnNewStateWithValidClusterData) .updateTime = std::chrono::system_clock::now(), .dbRole = ClioNode::DbRole::Writer, .etlStarted = true, - .cacheIsFull = true + .cacheIsFull = true, + .cacheIsCurrentlyLoading = false }; ClioNode const node2{ .uuid = uuid2, .updateTime = std::chrono::system_clock::now(), .dbRole = ClioNode::DbRole::ReadOnly, .etlStarted = true, - .cacheIsFull = true + .cacheIsFull = true, + .cacheIsCurrentlyLoading = false }; ClioNode const node3{ .uuid = uuid3, .updateTime = std::chrono::system_clock::now(), .dbRole = ClioNode::DbRole::NotWriter, .etlStarted = true, - .cacheIsFull = false + .cacheIsFull = false, + .cacheIsCurrentlyLoading = false }; std::vector const nodes = {node1, node2, node3}; @@ -157,7 +160,8 @@ TEST_F(MetricsTest, OnNewStateWithSingleNode) .updateTime = std::chrono::system_clock::now(), .dbRole = ClioNode::DbRole::Writer, .etlStarted = true, - .cacheIsFull = false + .cacheIsFull = false, + .cacheIsCurrentlyLoading = false }; std::vector const nodes = {node1}; @@ -195,14 +199,16 @@ TEST_F(MetricsTest, OnNewStateRecoveryFromFailure) .updateTime = std::chrono::system_clock::now(), .dbRole = ClioNode::DbRole::Writer, .etlStarted = true, - .cacheIsFull = true + .cacheIsFull = true, + .cacheIsCurrentlyLoading = false }; ClioNode const node2{ .uuid = uuid2, .updateTime = std::chrono::system_clock::now(), .dbRole = ClioNode::DbRole::ReadOnly, .etlStarted = true, - .cacheIsFull = false + .cacheIsFull = false, + .cacheIsCurrentlyLoading = false }; std::vector const nodes = {node1, node2}; diff --git a/tests/unit/cluster/WriterDeciderTests.cpp b/tests/unit/cluster/WriterDeciderTests.cpp index 2360c224a..007c27815 100644 --- a/tests/unit/cluster/WriterDeciderTests.cpp +++ b/tests/unit/cluster/WriterDeciderTests.cpp @@ -44,6 +44,7 @@ struct NodeParams { ClioNode::DbRole role; bool etlStarted = true; bool cacheIsFull = true; + bool cacheIsCurrentlyLoading = false; }; struct WriterDeciderTestParams { @@ -78,7 +79,8 @@ struct WriterDeciderTest : testing::TestWithParam { .updateTime = std::chrono::system_clock::now(), .dbRole = role, .etlStarted = etlStarted, - .cacheIsFull = cacheIsFull + .cacheIsFull = cacheIsFull, + .cacheIsCurrentlyLoading = false }; } diff --git a/tests/unit/data/LedgerCacheLoadingStateTests.cpp b/tests/unit/data/LedgerCacheLoadingStateTests.cpp new file mode 100644 index 000000000..83b562735 --- /dev/null +++ b/tests/unit/data/LedgerCacheLoadingStateTests.cpp @@ -0,0 +1,102 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2026, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "data/LedgerCache.hpp" +#include "data/LedgerCacheLoadingState.hpp" +#include "util/MockPrometheus.hpp" + +#include + +#include +#include +#include + +using namespace data; + +struct LedgerCacheLoadingStateTest : util::prometheus::WithPrometheus { + LedgerCache cache; + LedgerCacheLoadingState state{cache}; +}; + +TEST_F(LedgerCacheLoadingStateTest, DefaultStateLoadingNotAllowed) +{ + EXPECT_FALSE(state.isLoadingAllowed()); +} + +TEST_F(LedgerCacheLoadingStateTest, AllowLoadingPermitsLoading) +{ + state.allowLoading(); + EXPECT_TRUE(state.isLoadingAllowed()); +} + +TEST_F(LedgerCacheLoadingStateTest, IsCurrentlyLoadingDelegatesToCache) +{ + EXPECT_FALSE(state.isCurrentlyLoading()); + cache.startLoading(); + EXPECT_TRUE(state.isCurrentlyLoading()); + cache.setFull(); + EXPECT_FALSE(state.isCurrentlyLoading()); +} + +TEST_F(LedgerCacheLoadingStateTest, CloneSharesAllowedFlag) +{ + auto clone = state.clone(); + ASSERT_NE(clone, nullptr); + + EXPECT_FALSE(clone->isLoadingAllowed()); + state.allowLoading(); + EXPECT_TRUE(clone->isLoadingAllowed()); +} + +TEST_F(LedgerCacheLoadingStateTest, AllowLoadingOnCloneVisibleToOriginal) +{ + auto clone = state.clone(); + clone->allowLoading(); + EXPECT_TRUE(state.isLoadingAllowed()); +} + +TEST_F(LedgerCacheLoadingStateTest, WaitForLoadingAllowedReturnsIfAlreadyAllowed) +{ + state.allowLoading(); + std::binary_semaphore done{0}; + std::thread t{[&] { + state.waitForLoadingAllowed(); + done.release(); + }}; + EXPECT_TRUE(done.try_acquire_for(std::chrono::milliseconds{1000})); + t.join(); +} + +TEST_F(LedgerCacheLoadingStateTest, WaitForLoadingAllowedUnblocksWhenAllowed) +{ + std::binary_semaphore started{0}; + std::binary_semaphore done{0}; + std::thread waiter{[&] { + started.release(); + state.waitForLoadingAllowed(); + done.release(); + }}; + + EXPECT_TRUE(started.try_acquire_for(std::chrono::milliseconds{1000})); + EXPECT_FALSE(done.try_acquire_for(std::chrono::milliseconds{10})); + + state.allowLoading(); + EXPECT_TRUE(done.try_acquire_for(std::chrono::milliseconds{1000})); + waiter.join(); +} diff --git a/tests/unit/data/LedgerCacheTests.cpp b/tests/unit/data/LedgerCacheTests.cpp index 289c59610..43ba5abd0 100644 --- a/tests/unit/data/LedgerCacheTests.cpp +++ b/tests/unit/data/LedgerCacheTests.cpp @@ -41,10 +41,26 @@ TEST_F(LedgerCacheTest, defaultState) { EXPECT_FALSE(cache.isDisabled()); EXPECT_FALSE(cache.isFull()); + EXPECT_FALSE(cache.isCurrentlyLoading()); EXPECT_EQ(cache.size(), 0u); EXPECT_EQ(cache.latestLedgerSequence(), 0u); } +TEST_F(LedgerCacheTest, startLoadingSetsIsCurrentlyLoading) +{ + EXPECT_FALSE(cache.isCurrentlyLoading()); + cache.startLoading(); + EXPECT_TRUE(cache.isCurrentlyLoading()); +} + +TEST_F(LedgerCacheTest, setFullResetsIsCurrentlyLoading) +{ + cache.startLoading(); + ASSERT_TRUE(cache.isCurrentlyLoading()); + cache.setFull(); + EXPECT_FALSE(cache.isCurrentlyLoading()); +} + struct LedgerCachePrometheusMetricTest : util::prometheus::WithMockPrometheus { LedgerCache cache; }; @@ -64,15 +80,28 @@ TEST_F(LedgerCachePrometheusMetricTest, setFull) { auto& fullMock = makeMock("ledger_cache_full", {}); auto& disabledMock = makeMock("ledger_cache_disabled", {}); + auto& loadingMock = makeMock("ledger_cache_is_currently_loading", {}); EXPECT_CALL(disabledMock, value()).WillOnce(testing::Return(0)); EXPECT_CALL(fullMock, set(1)); + EXPECT_CALL(loadingMock, set(0)); cache.setFull(); EXPECT_CALL(fullMock, value()).WillOnce(testing::Return(1)); EXPECT_TRUE(cache.isFull()); } +TEST_F(LedgerCachePrometheusMetricTest, startLoading) +{ + auto& loadingMock = makeMock("ledger_cache_is_currently_loading", {}); + + EXPECT_CALL(loadingMock, set(1)); + cache.startLoading(); + + EXPECT_CALL(loadingMock, value()).WillOnce(testing::Return(1)); + EXPECT_TRUE(cache.isCurrentlyLoading()); +} + struct LedgerCacheSaveLoadTest : LedgerCacheTest { ripple::uint256 const key1{1}; ripple::uint256 const key2{2}; diff --git a/tests/unit/etl/CacheLoaderTests.cpp b/tests/unit/etl/CacheLoaderTests.cpp index c0e9d9a85..f82dfbcec 100644 --- a/tests/unit/etl/CacheLoaderTests.cpp +++ b/tests/unit/etl/CacheLoaderTests.cpp @@ -24,6 +24,7 @@ #include "etl/impl/CacheLoader.hpp" #include "util/MockBackendTestFixture.hpp" #include "util/MockLedgerCache.hpp" +#include "util/MockLedgerCacheLoadingState.hpp" #include "util/MockPrometheus.hpp" #include "util/async/context/BasicExecutionContext.hpp" #include "util/config/ConfigDefinition.hpp" @@ -38,8 +39,10 @@ #include #include +#include #include #include +#include #include namespace json = boost::json; @@ -81,7 +84,10 @@ constexpr auto kSEQ = 30; struct CacheLoaderTest : util::prometheus::WithPrometheus, MockBackendTest { DiffProvider diffProvider; - MockLedgerCache cache; + testing::StrictMock cache; + std::unique_ptr cacheLoadingState = + std::make_unique(); + MockLedgerCacheLoadingState& cacheLoadingStateRef = *cacheLoadingState; }; using Settings = etl::CacheLoaderSettings; @@ -304,7 +310,7 @@ TEST_P(ParametrizedCacheLoaderTest, CacheDisabledLeadsToCancellation) TEST_F(CacheLoaderTest, SyncCacheLoaderWaitsTillFullyLoaded) { auto const cfg = getParseCacheConfig(json::parse(R"JSON({"cache": {"load": "sync"}})JSON")); - CacheLoader<> loader{cfg, backend_, cache}; + CacheLoader<> loader{cfg, backend_, cache, std::move(cacheLoadingState)}; auto const diffs = diffProvider.getLatestDiff(); auto const loops = diffs.size() + 1; @@ -323,6 +329,8 @@ TEST_F(CacheLoaderTest, SyncCacheLoaderWaitsTillFullyLoaded) EXPECT_CALL(cache, updateImpl).Times(loops); EXPECT_CALL(cache, isFull).WillOnce(Return(false)).WillRepeatedly(Return(true)); EXPECT_CALL(cache, setFull).Times(1); + EXPECT_CALL(cache, startLoading).Times(1); + EXPECT_CALL(cacheLoadingStateRef, waitForLoadingAllowed()); loader.load(kSEQ); } @@ -330,7 +338,7 @@ TEST_F(CacheLoaderTest, SyncCacheLoaderWaitsTillFullyLoaded) TEST_F(CacheLoaderTest, AsyncCacheLoaderCanBeStopped) { auto const cfg = getParseCacheConfig(json::parse(R"JSON({"cache": {"load": "async"}})JSON")); - CacheLoader loader{cfg, backend_, cache}; + CacheLoader<> loader{cfg, backend_, cache, std::move(cacheLoadingState)}; auto const diffs = diffProvider.getLatestDiff(); auto const loops = diffs.size() + 1; @@ -349,6 +357,8 @@ TEST_F(CacheLoaderTest, AsyncCacheLoaderCanBeStopped) EXPECT_CALL(cache, updateImpl).Times(AtMost(loops)); EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false)); EXPECT_CALL(cache, setFull).Times(AtMost(1)); + EXPECT_CALL(cache, startLoading).Times(1); + EXPECT_CALL(cacheLoadingStateRef, waitForLoadingAllowed()).Times(1); loader.load(kSEQ); loader.stop(); @@ -358,7 +368,7 @@ TEST_F(CacheLoaderTest, AsyncCacheLoaderCanBeStopped) TEST_F(CacheLoaderTest, DisabledCacheLoaderDoesNotLoadCache) { auto const cfg = getParseCacheConfig(json::parse(R"JSON({"cache": {"load": "none"}})JSON")); - CacheLoader loader{cfg, backend_, cache}; + CacheLoader<> loader{cfg, backend_, cache, std::move(cacheLoadingState)}; EXPECT_CALL(cache, updateImpl).Times(0); EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false)); @@ -370,7 +380,7 @@ TEST_F(CacheLoaderTest, DisabledCacheLoaderDoesNotLoadCache) TEST_F(CacheLoaderTest, DisabledCacheLoaderCanCallStopAndWait) { auto const cfg = getParseCacheConfig(json::parse(R"JSON({"cache": {"load": "none"}})JSON")); - CacheLoader loader{cfg, backend_, cache}; + CacheLoader<> loader{cfg, backend_, cache, std::move(cacheLoadingState)}; EXPECT_CALL(cache, updateImpl).Times(0); EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false)); @@ -399,7 +409,7 @@ struct CacheLoaderFromFileTest : CacheLoaderTest { ) ) ); - CacheLoader<> loader{cfg, backend_, cache}; + CacheLoader<> loader{cfg, backend_, cache, std::move(cacheLoadingState)}; }; TEST_F(CacheLoaderFromFileTest, Success) @@ -428,6 +438,7 @@ TEST_F(CacheLoaderFromFileTest, FailureBackToNormalLoad) EXPECT_CALL(cache, loadFromFile(filePath, kSEQ - maxSequenceLag)) .WillOnce(Return(std::expected(std::unexpected("File not found")))); + EXPECT_CALL(cacheLoadingStateRef, waitForLoadingAllowed()).Times(1); EXPECT_CALL(*backend_, fetchLedgerDiff(_, _)).Times(32).WillRepeatedly(Return(diffs)); EXPECT_CALL(*backend_, doFetchSuccessorKey).Times(keysSize * loops).WillRepeatedly([this]() { return diffProvider.nextKey(keysSize); @@ -441,6 +452,7 @@ TEST_F(CacheLoaderFromFileTest, FailureBackToNormalLoad) EXPECT_CALL(cache, updateImpl).Times(loops); EXPECT_CALL(cache, isFull).WillOnce(Return(false)).WillRepeatedly(Return(true)); EXPECT_CALL(cache, setFull).Times(1); + EXPECT_CALL(cache, startLoading).Times(1); loader.load(kSEQ); } @@ -450,7 +462,9 @@ TEST_F(CacheLoaderFromFileTest, DontLoadWhenCacheIsDisabled) auto const disabledCacheCfg = getParseCacheConfig( json::parse(R"JSON({"cache": {"load": "none", "file": {"path": "/tmp/cache.bin"}}})JSON") ); - CacheLoader loaderWithCacheDisabled{disabledCacheCfg, backend_, cache}; + CacheLoader<> loaderWithCacheDisabled{ + disabledCacheCfg, backend_, cache, std::make_unique() + }; EXPECT_CALL(cache, isFull).WillOnce(Return(false)); EXPECT_CALL(cache, setDisabled); @@ -466,6 +480,7 @@ TEST_F(CacheLoaderFromFileTest, MaxSequenceLagCalculation) EXPECT_CALL(cache, loadFromFile(filePath, kSEQ - maxSequenceLag)) .WillOnce(Return(std::expected{})); EXPECT_CALL(cache, latestLedgerSequence).WillOnce(Return(kLOADED_SEQ)); + EXPECT_CALL(cache, setFull).Times(1); loader.load(kSEQ); } @@ -506,6 +521,7 @@ TEST_F(CacheLoaderFromFileTest, MaxSequenceLagClampedToMinOfLedgerRange) EXPECT_CALL(cache, loadFromFile(filePath, minSeq)) .WillOnce(Return(std::expected{})); EXPECT_CALL(cache, latestLedgerSequence).WillOnce(Return(minSeq + 1)); + EXPECT_CALL(cache, setFull).Times(1); loader.load(currentSeq); }