diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index efa136263..5ef8a5ba4 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -126,7 +126,7 @@ ClioApplication::run(bool const useNgWebServer) auto systemState = etl::SystemState::makeSystemState(config_); cluster::ClusterCommunicationService clusterCommunicationService{ - backend, std::make_unique(systemState) + backend, std::make_unique(systemState, cache) }; clusterCommunicationService.run(); diff --git a/src/cluster/ClioNode.cpp b/src/cluster/ClioNode.cpp index 1ea7d3b4b..3bfd4b12b 100644 --- a/src/cluster/ClioNode.cpp +++ b/src/cluster/ClioNode.cpp @@ -42,6 +42,8 @@ namespace { struct JsonFields { static constexpr std::string_view const kUPDATE_TIME = "update_time"; static constexpr std::string_view const kDB_ROLE = "db_role"; + static constexpr std::string_view const kETL_STARTED = "etl_started"; + static constexpr std::string_view const kCACHE_IS_FULL = "cache_is_full"; }; } // namespace @@ -56,14 +58,15 @@ ClioNode::from(ClioNode::Uuid uuid, etl::WriterStateInterface const& writerState if (writerState.isFallback()) { return ClioNode::DbRole::Fallback; } - if (writerState.isLoadingCache()) { - return ClioNode::DbRole::LoadingCache; - } return writerState.isWriting() ? ClioNode::DbRole::Writer : ClioNode::DbRole::NotWriter; }(); return ClioNode{ - .uuid = std::move(uuid), .updateTime = std::chrono::system_clock::now(), .dbRole = dbRole + .uuid = std::move(uuid), + .updateTime = std::chrono::system_clock::now(), + .dbRole = dbRole, + .etlStarted = writerState.isEtlStarted(), + .cacheIsFull = writerState.isCacheFull() }; } @@ -72,7 +75,9 @@ tag_invoke(boost::json::value_from_tag, boost::json::value& jv, ClioNode const& { jv = { {JsonFields::kUPDATE_TIME, util::systemTpToUtcStr(node.updateTime, ClioNode::kTIME_FORMAT)}, - {JsonFields::kDB_ROLE, static_cast(node.dbRole)} + {JsonFields::kDB_ROLE, static_cast(node.dbRole)}, + {JsonFields::kETL_STARTED, node.etlStarted}, + {JsonFields::kCACHE_IS_FULL, node.cacheIsFull} }; } @@ -90,12 +95,17 @@ tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) if (dbRoleValue > static_cast(ClioNode::DbRole::MAX)) throw std::runtime_error("Invalid db_role value"); + auto const etlStarted = jv.as_object().at(JsonFields::kETL_STARTED).as_bool(); + auto const cacheIsFull = jv.as_object().at(JsonFields::kCACHE_IS_FULL).as_bool(); + return ClioNode{ // Json data doesn't contain uuid so leaving it empty here. It will be filled outside of // this parsing .uuid = std::make_shared(), .updateTime = updateTime.value(), - .dbRole = static_cast(dbRoleValue) + .dbRole = static_cast(dbRoleValue), + .etlStarted = etlStarted, + .cacheIsFull = cacheIsFull }; } diff --git a/src/cluster/ClioNode.hpp b/src/cluster/ClioNode.hpp index 9bccf7007..f96668420 100644 --- a/src/cluster/ClioNode.hpp +++ b/src/cluster/ClioNode.hpp @@ -52,22 +52,17 @@ struct ClioNode { * from the cluster communication mechanism to the slower but more reliable * database-based conflict detection mechanism. */ - enum class DbRole { - ReadOnly = 0, - LoadingCache = 1, - NotWriter = 2, - Writer = 3, - Fallback = 4, - MAX = 4 - }; + enum class DbRole { ReadOnly = 0, NotWriter = 1, Writer = 2, Fallback = 3, MAX = 3 }; using Uuid = std::shared_ptr; using CUuid = std::shared_ptr; Uuid uuid; ///< The UUID of the node. std::chrono::system_clock::time_point - updateTime; ///< The time the data about the node was last updated. - DbRole dbRole; ///< The database role of the node + updateTime; ///< The time the data about the node was last updated. + DbRole dbRole; ///< The database role of the node + bool etlStarted; ///< Whether the ETL monitor has started on this node + bool cacheIsFull; ///< Whether the ledger cache is fully loaded on this node /** * @brief Create a ClioNode from writer state. diff --git a/src/cluster/WriterDecider.cpp b/src/cluster/WriterDecider.cpp index de11d2f4b..64c5e8531 100644 --- a/src/cluster/WriterDecider.cpp +++ b/src/cluster/WriterDecider.cpp @@ -84,21 +84,36 @@ WriterDecider::onNewState( return *lhs.uuid < *rhs.uuid; }); - auto const it = std::ranges::find_if(clusterData, [](ClioNode const& node) { - return node.dbRole == ClioNode::DbRole::NotWriter or - node.dbRole == ClioNode::DbRole::Writer; + auto it = std::ranges::find_if(clusterData, [](ClioNode const& node) { + return node.etlStarted and node.cacheIsFull and + (node.dbRole == ClioNode::DbRole::NotWriter or + node.dbRole == ClioNode::DbRole::Writer); }); - if (it == clusterData.end()) { - // No writer nodes in the cluster yet + auto electNode = [&selfId, &writerState](auto it) { + if (*it->uuid == *selfId) { + writerState->startWriting(); + } else { + writerState->giveUpWriting(); + } + }; + if (it != clusterData.end()) { + electNode(it); return; } - if (*it->uuid == *selfId) { - writerState->startWriting(); - } else { - writerState->giveUpWriting(); + // Try to find a node with at least started ETL + it = std::ranges::find_if(clusterData, [](ClioNode const& node) { + return node.etlStarted and + (node.dbRole == ClioNode::DbRole::NotWriter or + node.dbRole == ClioNode::DbRole::Writer); + }); + + if (it != clusterData.end()) { + electNode(it); + return; } + writerState->giveUpWriting(); } ); } diff --git a/src/etl/CacheLoader.hpp b/src/etl/CacheLoader.hpp index 147efd2dd..34beadfdf 100644 --- a/src/etl/CacheLoader.hpp +++ b/src/etl/CacheLoader.hpp @@ -103,6 +103,9 @@ public: } if (loadCacheFromFile()) { + // Cache file may contain outdated data, so fetch whatever left up to seq from DB + updateCacheToSeq(seq); + cache_.get().setFull(); return; } @@ -191,9 +194,23 @@ private: LOG(log_.info()) << "Loaded cache from file in " << duration_ms << " ms. Latest sequence: " << cache_.get().latestLedgerSequence(); - backend_->forceUpdateRange(cache_.get().latestLedgerSequence()); return true; } + + void + updateCacheToSeq(uint32_t const seq) + { + while (cache_.get().latestLedgerSequence() < seq) { + auto const seqToLoad = cache_.get().latestLedgerSequence() + 1; + LOG(log_.info()) << "Fetching ledger " << seqToLoad + << "from DB after loading cache from file"; + auto const diff = data::synchronousAndRetryOnTimeout([this, seqToLoad](auto yield) { + return backend_->fetchLedgerDiff(seqToLoad, yield); + }); + cache_.get().update(diff, seqToLoad); + LOG(log_.info()) << "Updated cache to " << seqToLoad; + } + } }; } // namespace etl diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index b43b146c2..d73b2d971 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -215,13 +215,13 @@ ETLService::run() return; } - auto const nextSequence = syncCacheWithDb(); + auto const nextSequence = rng->maxSequence + 1; LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence; startMonitor(nextSequence); - state_->isLoadingCache = false; + state_->etlStarted = true; // If we are a writer as the result of loading the initial ledger - start loading if (state_->isWriting) @@ -356,24 +356,6 @@ ETLService::loadInitialLedgerIfNeeded() return rng; } -uint32_t -ETLService::syncCacheWithDb() -{ - auto rng = backend_->hardFetchLedgerRangeNoThrow(); - - while (not backend_->cache().isDisabled() and - rng->maxSequence > backend_->cache().latestLedgerSequence()) { - LOG(log_.info()) << "Syncing cache with DB. DB latest seq: " << rng->maxSequence - << ". Cache latest seq: " << backend_->cache().latestLedgerSequence(); - for (auto seq = backend_->cache().latestLedgerSequence(); seq <= rng->maxSequence; ++seq) { - LOG(log_.info()) << "ETLService (via syncCacheWithDb) got new seq from db: " << seq; - updateCache(seq); - } - rng = backend_->hardFetchLedgerRangeNoThrow(); - } - return rng->maxSequence + 1; -} - void ETLService::updateCache(uint32_t seq) { diff --git a/src/etl/SystemState.hpp b/src/etl/SystemState.hpp index 60466a4e1..aa1a75b05 100644 --- a/src/etl/SystemState.hpp +++ b/src/etl/SystemState.hpp @@ -28,6 +28,7 @@ #include #include +#include #include namespace etl { @@ -36,11 +37,6 @@ namespace etl { * @brief Represents the state of the ETL subsystem. */ struct SystemState { - SystemState() - { - isLoadingCache = true; - } - /** * @brief Factory method to create a SystemState instance. * @@ -74,12 +70,8 @@ struct SystemState { "Whether the process is writing to the database" ); - /** @brief Whether the process is still loading cache after startup. */ - util::prometheus::Bool isLoadingCache = PrometheusService::boolMetric( - "etl_loading_cache", - util::prometheus::Labels{}, - "Whether etl is loading cache after clio startup" - ); + /** @brief Shows whether ETL started monitor and ready to become a writer if needed */ + std::atomic_bool etlStarted{false}; /** * @brief Commands for controlling the ETL writer state. diff --git a/src/etl/WriterState.cpp b/src/etl/WriterState.cpp index 8947febd4..28162a5a6 100644 --- a/src/etl/WriterState.cpp +++ b/src/etl/WriterState.cpp @@ -19,6 +19,7 @@ #include "etl/WriterState.hpp" +#include "data/LedgerCacheInterface.hpp" #include "etl/SystemState.hpp" #include @@ -26,7 +27,11 @@ namespace etl { -WriterState::WriterState(std::shared_ptr state) : systemState_(std::move(state)) +WriterState::WriterState( + std::shared_ptr state, + data::LedgerCacheInterface const& cache +) + : systemState_(std::move(state)), cache_(cache) { } @@ -73,9 +78,15 @@ WriterState::isFallback() const } bool -WriterState::isLoadingCache() const +WriterState::isEtlStarted() const { - return systemState_->isLoadingCache; + return systemState_->etlStarted; +} + +bool +WriterState::isCacheFull() const +{ + return cache_.get().isFull(); } std::unique_ptr diff --git a/src/etl/WriterState.hpp b/src/etl/WriterState.hpp index 5fcb22fdc..7ee0d47bf 100644 --- a/src/etl/WriterState.hpp +++ b/src/etl/WriterState.hpp @@ -19,8 +19,10 @@ #pragma once +#include "data/LedgerCacheInterface.hpp" #include "etl/SystemState.hpp" +#include #include namespace etl { @@ -88,12 +90,20 @@ public: setWriterDecidingFallback() = 0; /** - * @brief Whether clio is still loading cache after startup. + * @brief Whether the ETL monitor has started and the node is ready to become a writer. * - * @return true if clio is still loading cache, false otherwise. + * @return true if ETL has started the monitor loop, false otherwise. */ [[nodiscard]] virtual bool - isLoadingCache() const = 0; + isEtlStarted() const = 0; + + /** + * @brief Whether the ledger cache is fully loaded. + * + * @return true if the cache is full, false otherwise. + */ + [[nodiscard]] virtual bool + isCacheFull() const = 0; /** * @brief Create a clone of this writer state. @@ -119,13 +129,16 @@ class WriterState : public WriterStateInterface { private: std::shared_ptr systemState_; /**< @brief Shared system state for ETL coordination */ + std::reference_wrapper cache_; public: /** - * @brief Construct a WriterState with the given system state. + * @brief Construct a WriterState with the given system state and cache. + * * @param state Shared pointer to the system state for coordination + * @param cache The ledger cache used to report cache fullness */ - WriterState(std::shared_ptr state); + WriterState(std::shared_ptr state, data::LedgerCacheInterface const& cache); bool isReadOnly() const override; @@ -172,13 +185,13 @@ public: bool isFallback() const override; - /** - * @brief Whether clio is still loading cache after startup. - * - * @return true if clio is still loading cache, false otherwise. - */ + /** @copydoc WriterStateInterface::isEtlStarted */ bool - isLoadingCache() const override; + isEtlStarted() const override; + + /** @copydoc WriterStateInterface::isCacheFull */ + bool + isCacheFull() const override; /** * @brief Create a clone of this writer state. diff --git a/tests/common/util/MockLedgerCache.hpp b/tests/common/util/MockLedgerCache.hpp index e9084cacc..1d55f37fa 100644 --- a/tests/common/util/MockLedgerCache.hpp +++ b/tests/common/util/MockLedgerCache.hpp @@ -35,7 +35,7 @@ struct MockLedgerCache : data::LedgerCacheInterface { MOCK_METHOD( void, - updateImp, + updateImpl, (std::vector const& a, uint32_t b, bool c), () ); @@ -43,7 +43,7 @@ struct MockLedgerCache : data::LedgerCacheInterface { void update(std::vector const& a, uint32_t b, bool c = false) override { - updateImp(a, b, c); + updateImpl(a, b, c); } MOCK_METHOD( diff --git a/tests/common/util/MockWriterState.hpp b/tests/common/util/MockWriterState.hpp index a1821d599..442273918 100644 --- a/tests/common/util/MockWriterState.hpp +++ b/tests/common/util/MockWriterState.hpp @@ -32,7 +32,8 @@ struct MockWriterStateBase : public etl::WriterStateInterface { MOCK_METHOD(void, giveUpWriting, (), (override)); MOCK_METHOD(void, setWriterDecidingFallback, (), (override)); MOCK_METHOD(bool, isFallback, (), (const, override)); - MOCK_METHOD(bool, isLoadingCache, (), (const, override)); + MOCK_METHOD(bool, isEtlStarted, (), (const, override)); + MOCK_METHOD(bool, isCacheFull, (), (const, override)); MOCK_METHOD(std::unique_ptr, clone, (), (const, override)); }; diff --git a/tests/unit/cluster/BackendTests.cpp b/tests/unit/cluster/BackendTests.cpp index 7af04e470..3cae7f54a 100644 --- a/tests/unit/cluster/BackendTests.cpp +++ b/tests/unit/cluster/BackendTests.cpp @@ -91,6 +91,12 @@ TEST_F(ClusterBackendTest, SubscribeToNewState) EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(writerStateRef, isEtlStarted) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isCacheFull) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly([this]( @@ -127,6 +133,12 @@ TEST_F(ClusterBackendTest, Stop) EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(writerStateRef, isEtlStarted) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isCacheFull) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); clusterBackend.run(); std::this_thread::sleep_for(std::chrono::milliseconds{20}); @@ -156,6 +168,12 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataThrowsException) EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(writerStateRef, isEtlStarted) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isCacheFull) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly( @@ -184,8 +202,10 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsDataWithOtherNodes) auto const otherUuid = boost::uuids::random_generator{}(); auto const otherNodeJson = R"JSON({ - "db_role": 3, - "update_time": "2025-01-15T10:30:00Z" + "db_role": 2, + "update_time": "2025-01-15T10:30:00Z", + "etl_started": false, + "cache_is_full": false })JSON"; EXPECT_CALL(*backend_, fetchClioNodesData) @@ -206,7 +226,10 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsDataWithOtherNodes) EXPECT_CALL(writerStateRef, isFallback) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); - EXPECT_CALL(writerStateRef, isLoadingCache) + EXPECT_CALL(writerStateRef, isEtlStarted) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isWriting) @@ -257,7 +280,9 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsOnlySelfData) auto const selfNodeJson = R"JSON({ "db_role": 1, - "update_time": "2025-01-16T10:30:00Z" + "update_time": "2025-01-16T10:30:00Z", + "etl_started": false, + "cache_is_full": false })JSON"; EXPECT_CALL(*backend_, fetchClioNodesData).Times(testing::AtLeast(1)).WillRepeatedly([&]() { @@ -271,6 +296,12 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsOnlySelfData) EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(writerStateRef, isEtlStarted) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isCacheFull) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly([this]( @@ -320,6 +351,12 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsInvalidJson) EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(writerStateRef, isEtlStarted) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isCacheFull) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly([this, invalidJson]( @@ -368,6 +405,12 @@ TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsValidJsonButCannotConvertToC EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(writerStateRef, isEtlStarted) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isCacheFull) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly( @@ -406,7 +449,10 @@ TEST_F(ClusterBackendTest, WriteNodeMessageWritesSelfDataWithRecentTimestampAndD EXPECT_CALL(writerStateRef, isFallback) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); - EXPECT_CALL(writerStateRef, isLoadingCache) + EXPECT_CALL(writerStateRef, isEtlStarted) + .Times(testing::AtLeast(1)) + .WillRepeatedly(testing::Return(false)); + EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isWriting) diff --git a/tests/unit/cluster/ClioNodeTests.cpp b/tests/unit/cluster/ClioNodeTests.cpp index f0f496227..2b86d577e 100644 --- a/tests/unit/cluster/ClioNodeTests.cpp +++ b/tests/unit/cluster/ClioNodeTests.cpp @@ -51,7 +51,9 @@ TEST_F(ClioNodeTest, Serialization) ClioNode const node{ .uuid = std::make_shared(boost::uuids::random_generator()()), .updateTime = updateTime, - .dbRole = ClioNode::DbRole::Writer + .dbRole = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = false }; boost::json::value jsonValue; @@ -66,23 +68,38 @@ TEST_F(ClioNodeTest, Serialization) EXPECT_TRUE(obj.contains("db_role")); EXPECT_TRUE(obj.at("db_role").is_number()); EXPECT_EQ(obj.at("db_role").as_int64(), static_cast(node.dbRole)); + + EXPECT_TRUE(obj.contains("etl_started")); + EXPECT_EQ(obj.at("etl_started").as_bool(), node.etlStarted); + + EXPECT_TRUE(obj.contains("cache_is_full")); + EXPECT_EQ(obj.at("cache_is_full").as_bool(), node.cacheIsFull); } TEST_F(ClioNodeTest, Deserialization) { - boost::json::value const jsonValue = {{"update_time", updateTimeStr}, {"db_role", 1}}; + boost::json::value const jsonValue = { + {"update_time", updateTimeStr}, + {"db_role", 1}, + {"etl_started", true}, + {"cache_is_full", false} + }; ClioNode node{ .uuid = std::make_shared(), .updateTime = {}, - .dbRole = ClioNode::DbRole::ReadOnly + .dbRole = ClioNode::DbRole::ReadOnly, + .etlStarted = false, + .cacheIsFull = false }; ASSERT_NO_THROW(node = boost::json::value_to(jsonValue)); EXPECT_NE(node.uuid, nullptr); EXPECT_EQ(*node.uuid, boost::uuids::uuid{}); EXPECT_EQ(node.updateTime, updateTime); - EXPECT_EQ(node.dbRole, ClioNode::DbRole::LoadingCache); + EXPECT_EQ(node.dbRole, ClioNode::DbRole::NotWriter); + EXPECT_TRUE(node.etlStarted); + EXPECT_FALSE(node.cacheIsFull); } TEST_F(ClioNodeTest, DeserializationInvalidTime) @@ -100,6 +117,22 @@ TEST_F(ClioNodeTest, DeserializationMissingTime) EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); } +TEST_F(ClioNodeTest, DeserializationMissingEtlStarted) +{ + boost::json::value const jsonValue = { + {"update_time", updateTimeStr}, {"db_role", 1}, {"cache_is_full", false} + }; + EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); +} + +TEST_F(ClioNodeTest, DeserializationMissingCacheIsFull) +{ + boost::json::value const jsonValue = { + {"update_time", updateTimeStr}, {"db_role", 1}, {"etl_started", true} + }; + EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); +} + struct ClioNodeDbRoleTestBundle { std::string testName; ClioNode::DbRole role; @@ -112,10 +145,6 @@ INSTANTIATE_TEST_SUITE_P( ClioNodeDbRoleTest, testing::Values( ClioNodeDbRoleTestBundle{.testName = "ReadOnly", .role = ClioNode::DbRole::ReadOnly}, - ClioNodeDbRoleTestBundle{ - .testName = "LoadingCache", - .role = ClioNode::DbRole::LoadingCache - }, ClioNodeDbRoleTestBundle{.testName = "NotWriter", .role = ClioNode::DbRole::NotWriter}, ClioNodeDbRoleTestBundle{.testName = "Writer", .role = ClioNode::DbRole::Writer}, ClioNodeDbRoleTestBundle{.testName = "Fallback", .role = ClioNode::DbRole::Fallback} @@ -129,7 +158,9 @@ TEST_P(ClioNodeDbRoleTest, Serialization) ClioNode const node{ .uuid = std::make_shared(boost::uuids::random_generator()()), .updateTime = updateTime, - .dbRole = param.role + .dbRole = param.role, + .etlStarted = false, + .cacheIsFull = false }; auto const jsonValue = boost::json::value_from(node); EXPECT_EQ(jsonValue.as_object().at("db_role").as_int64(), static_cast(param.role)); @@ -139,7 +170,10 @@ TEST_P(ClioNodeDbRoleTest, Deserialization) { auto const param = GetParam(); boost::json::value const jsonValue = { - {"update_time", updateTimeStr}, {"db_role", static_cast(param.role)} + {"update_time", updateTimeStr}, + {"db_role", static_cast(param.role)}, + {"etl_started", false}, + {"cache_is_full", false} }; auto const node = boost::json::value_to(jsonValue); EXPECT_EQ(node.dbRole, param.role); @@ -147,13 +181,20 @@ TEST_P(ClioNodeDbRoleTest, Deserialization) TEST_F(ClioNodeDbRoleTest, DeserializationInvalidDbRole) { - boost::json::value const jsonValue = {{"update_time", updateTimeStr}, {"db_role", 10}}; + boost::json::value const jsonValue = { + {"update_time", updateTimeStr}, + {"db_role", 10}, + {"etl_started", false}, + {"cache_is_full", false} + }; EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); } TEST_F(ClioNodeDbRoleTest, DeserializationMissingDbRole) { - boost::json::value const jsonValue = {{"update_time", updateTimeStr}}; + boost::json::value const jsonValue = { + {"update_time", updateTimeStr}, {"etl_started", false}, {"cache_is_full", false} + }; EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); } @@ -161,8 +202,9 @@ struct ClioNodeFromTestBundle { std::string testName; bool readOnly; bool fallback; - bool loadingCache; bool writing; + bool etlStarted; + bool cacheIsFull; ClioNode::DbRole expectedRole; }; @@ -181,40 +223,36 @@ INSTANTIATE_TEST_SUITE_P( .testName = "ReadOnly", .readOnly = true, .fallback = false, - .loadingCache = false, .writing = false, + .etlStarted = false, + .cacheIsFull = false, .expectedRole = ClioNode::DbRole::ReadOnly }, ClioNodeFromTestBundle{ .testName = "Fallback", .readOnly = false, .fallback = true, - .loadingCache = false, .writing = false, + .etlStarted = false, + .cacheIsFull = false, .expectedRole = ClioNode::DbRole::Fallback }, ClioNodeFromTestBundle{ - .testName = "LoadingCache", + .testName = "NotWriter", .readOnly = false, .fallback = false, - .loadingCache = true, - .writing = false, - .expectedRole = ClioNode::DbRole::LoadingCache - }, - ClioNodeFromTestBundle{ - .testName = "NotWriterNotReadOnly", - .readOnly = false, - .fallback = false, - .loadingCache = false, .writing = false, + .etlStarted = true, + .cacheIsFull = false, .expectedRole = ClioNode::DbRole::NotWriter }, ClioNodeFromTestBundle{ .testName = "Writer", .readOnly = false, .fallback = false, - .loadingCache = false, .writing = true, + .etlStarted = true, + .cacheIsFull = true, .expectedRole = ClioNode::DbRole::Writer } ), @@ -229,13 +267,11 @@ TEST_P(ClioNodeFromTest, FromWriterState) if (not param.readOnly) { EXPECT_CALL(writerState, isFallback()).WillOnce(testing::Return(param.fallback)); if (not param.fallback) { - EXPECT_CALL(writerState, isLoadingCache()) - .WillOnce(testing::Return(param.loadingCache)); - if (not param.loadingCache) { - EXPECT_CALL(writerState, isWriting()).WillOnce(testing::Return(param.writing)); - } + EXPECT_CALL(writerState, isWriting()).WillOnce(testing::Return(param.writing)); } } + EXPECT_CALL(writerState, isEtlStarted()).WillOnce(testing::Return(param.etlStarted)); + EXPECT_CALL(writerState, isCacheFull()).WillOnce(testing::Return(param.cacheIsFull)); auto const beforeTime = std::chrono::system_clock::now(); auto const node = ClioNode::from(uuid, writerState); @@ -243,6 +279,8 @@ TEST_P(ClioNodeFromTest, FromWriterState) EXPECT_EQ(node.uuid, uuid); EXPECT_EQ(node.dbRole, param.expectedRole); + EXPECT_EQ(node.etlStarted, param.etlStarted); + EXPECT_EQ(node.cacheIsFull, param.cacheIsFull); EXPECT_GE(node.updateTime, beforeTime); EXPECT_LE(node.updateTime, afterTime); } diff --git a/tests/unit/cluster/ClusterCommunicationServiceTests.cpp b/tests/unit/cluster/ClusterCommunicationServiceTests.cpp index b62c6e502..e6c7b719e 100644 --- a/tests/unit/cluster/ClusterCommunicationServiceTests.cpp +++ b/tests/unit/cluster/ClusterCommunicationServiceTests.cpp @@ -65,7 +65,9 @@ struct ClusterCommunicationServiceTest : util::prometheus::WithPrometheus, MockB return ClioNode{ .uuid = std::make_shared(uuid), .updateTime = std::chrono::system_clock::now(), - .dbRole = role + .dbRole = role, + .etlStarted = true, + .cacheIsFull = true, }; } diff --git a/tests/unit/cluster/MetricsTests.cpp b/tests/unit/cluster/MetricsTests.cpp index 9513e0c85..8f18dc549 100644 --- a/tests/unit/cluster/MetricsTests.cpp +++ b/tests/unit/cluster/MetricsTests.cpp @@ -71,17 +71,23 @@ TEST_F(MetricsTest, OnNewStateWithValidClusterData) ClioNode const node1{ .uuid = uuid1, .updateTime = std::chrono::system_clock::now(), - .dbRole = ClioNode::DbRole::Writer + .dbRole = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = true }; ClioNode const node2{ .uuid = uuid2, .updateTime = std::chrono::system_clock::now(), - .dbRole = ClioNode::DbRole::ReadOnly + .dbRole = ClioNode::DbRole::ReadOnly, + .etlStarted = true, + .cacheIsFull = true }; ClioNode const node3{ .uuid = uuid3, .updateTime = std::chrono::system_clock::now(), - .dbRole = ClioNode::DbRole::NotWriter + .dbRole = ClioNode::DbRole::NotWriter, + .etlStarted = true, + .cacheIsFull = false }; std::vector const nodes = {node1, node2, node3}; @@ -149,7 +155,9 @@ TEST_F(MetricsTest, OnNewStateWithSingleNode) ClioNode const node1{ .uuid = uuid1, .updateTime = std::chrono::system_clock::now(), - .dbRole = ClioNode::DbRole::Writer + .dbRole = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = false }; std::vector const nodes = {node1}; @@ -185,12 +193,16 @@ TEST_F(MetricsTest, OnNewStateRecoveryFromFailure) ClioNode const node1{ .uuid = uuid1, .updateTime = std::chrono::system_clock::now(), - .dbRole = ClioNode::DbRole::Writer + .dbRole = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = true }; ClioNode const node2{ .uuid = uuid2, .updateTime = std::chrono::system_clock::now(), - .dbRole = ClioNode::DbRole::ReadOnly + .dbRole = ClioNode::DbRole::ReadOnly, + .etlStarted = true, + .cacheIsFull = false }; std::vector const nodes = {node1, node2}; diff --git a/tests/unit/cluster/WriterDeciderTests.cpp b/tests/unit/cluster/WriterDeciderTests.cpp index 4899eb640..2360c224a 100644 --- a/tests/unit/cluster/WriterDeciderTests.cpp +++ b/tests/unit/cluster/WriterDeciderTests.cpp @@ -39,10 +39,17 @@ using namespace cluster; enum class ExpectedAction { StartWriting, GiveUpWriting, NoAction, SetFallback }; +struct NodeParams { + uint8_t uuidValue; + ClioNode::DbRole role; + bool etlStarted = true; + bool cacheIsFull = true; +}; + struct WriterDeciderTestParams { std::string testName; uint8_t selfUuidValue; - std::vector> nodes; + std::vector nodes; ExpectedAction expectedAction; bool useEmptyClusterData = false; }; @@ -59,12 +66,19 @@ struct WriterDeciderTest : testing::TestWithParam { MockWriterState& writerStateRef = *writerState; static ClioNode - makeNode(boost::uuids::uuid const& uuid, ClioNode::DbRole role) + makeNode( + boost::uuids::uuid const& uuid, + ClioNode::DbRole role, + bool etlStarted, + bool cacheIsFull + ) { return ClioNode{ .uuid = std::make_shared(uuid), .updateTime = std::chrono::system_clock::now(), - .dbRole = role + .dbRole = role, + .etlStarted = etlStarted, + .cacheIsFull = cacheIsFull }; } @@ -125,9 +139,14 @@ TEST_P(WriterDeciderTest, WriterSelection) } else { std::vector nodes; nodes.reserve(params.nodes.size()); - for (auto const& [uuidValue, role] : params.nodes) { - auto node = makeNode(makeUuid(uuidValue), role); - if (uuidValue == params.selfUuidValue) { + for (auto const& nodeParam : params.nodes) { + auto node = makeNode( + makeUuid(nodeParam.uuidValue), + nodeParam.role, + nodeParam.etlStarted, + nodeParam.cacheIsFull + ); + if (nodeParam.uuidValue == params.selfUuidValue) { selfIdPtr = node.uuid; // Use the same shared_ptr as in the node } nodes.push_back(std::move(node)); @@ -276,47 +295,111 @@ INSTANTIATE_TEST_SUITE_P( {0x04, ClioNode::DbRole::Writer}}, .expectedAction = ExpectedAction::SetFallback }, + // Tests for etlStarted / cacheIsFull election logic WriterDeciderTestParams{ - .testName = "SelfIsLoadingCacheOtherIsWriter", - .selfUuidValue = 0x01, - .nodes = {{0x01, ClioNode::DbRole::LoadingCache}, {0x02, ClioNode::DbRole::Writer}}, - .expectedAction = ExpectedAction::GiveUpWriting - }, - WriterDeciderTestParams{ - .testName = "OtherNodeIsLoadingCacheSkipToNextWriter", + .testName = "EtlNotStartedNodeSkipped_CacheFullNodeSelected", .selfUuidValue = 0x02, .nodes = - {{0x01, ClioNode::DbRole::LoadingCache}, - {0x02, ClioNode::DbRole::Writer}, - {0x03, ClioNode::DbRole::NotWriter}}, + {{.uuidValue = 0x01, + .role = ClioNode::DbRole::Writer, + .etlStarted = false, + .cacheIsFull = true}, + {.uuidValue = 0x02, + .role = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = true}, + {.uuidValue = 0x03, + .role = ClioNode::DbRole::NotWriter, + .etlStarted = true, + .cacheIsFull = true}}, .expectedAction = ExpectedAction::StartWriting }, WriterDeciderTestParams{ - .testName = "AllNodesLoadingCacheNoActionTaken", + .testName = "AllNodesEtlNotStarted_GiveUpWriting", .selfUuidValue = 0x01, .nodes = - {{0x01, ClioNode::DbRole::LoadingCache}, {0x02, ClioNode::DbRole::LoadingCache}}, - .expectedAction = ExpectedAction::NoAction + {{.uuidValue = 0x01, + .role = ClioNode::DbRole::Writer, + .etlStarted = false, + .cacheIsFull = false}, + {.uuidValue = 0x02, + .role = ClioNode::DbRole::Writer, + .etlStarted = false, + .cacheIsFull = false}}, + .expectedAction = ExpectedAction::GiveUpWriting }, WriterDeciderTestParams{ - .testName = "MixedWithLoadingCacheReadOnlyFirstNonReadOnlyNonLoadingCacheSelected", - .selfUuidValue = 0x03, + .testName = "CacheNotFullFallsBackToEtlStartedSelection_SelfSelected", + .selfUuidValue = 0x01, .nodes = - {{0x01, ClioNode::DbRole::ReadOnly}, - {0x02, ClioNode::DbRole::LoadingCache}, - {0x03, ClioNode::DbRole::Writer}, - {0x04, ClioNode::DbRole::NotWriter}}, + {{.uuidValue = 0x01, + .role = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = false}, + {.uuidValue = 0x02, + .role = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = false}}, .expectedAction = ExpectedAction::StartWriting }, WriterDeciderTestParams{ - .testName = "LoadingCacheBeforeWriterSkipsLoadingCache", + .testName = "CacheFullNodePreferredOverCacheNotFullNode", + .selfUuidValue = 0x02, + .nodes = + {{.uuidValue = 0x01, + .role = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = false}, + {.uuidValue = 0x02, + .role = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = true}, + {.uuidValue = 0x03, + .role = ClioNode::DbRole::NotWriter, + .etlStarted = true, + .cacheIsFull = false}}, + .expectedAction = ExpectedAction::StartWriting + }, + WriterDeciderTestParams{ + .testName = "CacheFullNodePreferredEvenIfHigherUuid", .selfUuidValue = 0x04, .nodes = - {{0x01, ClioNode::DbRole::LoadingCache}, - {0x02, ClioNode::DbRole::LoadingCache}, - {0x03, ClioNode::DbRole::Writer}, - {0x04, ClioNode::DbRole::NotWriter}}, + {{.uuidValue = 0x01, + .role = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = false}, + {.uuidValue = 0x02, + .role = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = false}, + {.uuidValue = 0x03, + .role = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = true}, + {.uuidValue = 0x04, + .role = ClioNode::DbRole::NotWriter, + .etlStarted = true, + .cacheIsFull = true}}, .expectedAction = ExpectedAction::GiveUpWriting + }, + WriterDeciderTestParams{ + .testName = "MixedWithReadOnly_EtlNotStartedNodeSkipped", + .selfUuidValue = 0x03, + .nodes = + {{.uuidValue = 0x01, .role = ClioNode::DbRole::ReadOnly}, + {.uuidValue = 0x02, + .role = ClioNode::DbRole::Writer, + .etlStarted = false, + .cacheIsFull = false}, + {.uuidValue = 0x03, + .role = ClioNode::DbRole::Writer, + .etlStarted = true, + .cacheIsFull = true}, + {.uuidValue = 0x04, + .role = ClioNode::DbRole::NotWriter, + .etlStarted = true, + .cacheIsFull = true}}, + .expectedAction = ExpectedAction::StartWriting } ), [](testing::TestParamInfo const& info) { return info.param.testName; } diff --git a/tests/unit/etl/CacheLoaderTests.cpp b/tests/unit/etl/CacheLoaderTests.cpp index e91222a05..c0e9d9a85 100644 --- a/tests/unit/etl/CacheLoaderTests.cpp +++ b/tests/unit/etl/CacheLoaderTests.cpp @@ -209,7 +209,7 @@ TEST_P(ParametrizedCacheLoaderTest, LoadCacheWithDifferentSettings) .WillRepeatedly(Return(std::vector(keysSize - 1, Blob{'s'}))); EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false)); - EXPECT_CALL(cache, updateImp).Times(loops); + EXPECT_CALL(cache, updateImpl).Times(loops); EXPECT_CALL(cache, setFull).Times(1); async::CoroExecutionContext ctx{settings.numThreads}; @@ -244,7 +244,7 @@ TEST_P(ParametrizedCacheLoaderTest, AutomaticallyCancelledAndAwaitedInDestructor .WillRepeatedly(Return(std::vector(keysSize - 1, Blob{'s'}))); EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false)); - EXPECT_CALL(cache, updateImp).Times(AtMost(loops)); + EXPECT_CALL(cache, updateImpl).Times(AtMost(loops)); EXPECT_CALL(cache, setFull).Times(AtMost(1)); async::CoroExecutionContext ctx{settings.numThreads}; @@ -279,7 +279,7 @@ TEST_P(ParametrizedCacheLoaderTest, CacheDisabledLeadsToCancellation) .WillRepeatedly(Return(std::vector(keysSize - 1, Blob{'s'}))); EXPECT_CALL(cache, isDisabled).WillOnce(Return(false)).WillRepeatedly(Return(true)); - EXPECT_CALL(cache, updateImp).Times(AtMost(1)); + EXPECT_CALL(cache, updateImpl).Times(AtMost(1)); EXPECT_CALL(cache, setFull).Times(0); async::CoroExecutionContext ctx{settings.numThreads}; @@ -320,7 +320,7 @@ TEST_F(CacheLoaderTest, SyncCacheLoaderWaitsTillFullyLoaded) .WillRepeatedly(Return(std::vector{keysSize - 1, Blob{'s'}})); EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false)); - EXPECT_CALL(cache, updateImp).Times(loops); + EXPECT_CALL(cache, updateImpl).Times(loops); EXPECT_CALL(cache, isFull).WillOnce(Return(false)).WillRepeatedly(Return(true)); EXPECT_CALL(cache, setFull).Times(1); @@ -346,7 +346,7 @@ TEST_F(CacheLoaderTest, AsyncCacheLoaderCanBeStopped) .WillRepeatedly(Return(std::vector{keysSize - 1, Blob{'s'}})); EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false)); - EXPECT_CALL(cache, updateImp).Times(AtMost(loops)); + EXPECT_CALL(cache, updateImpl).Times(AtMost(loops)); EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false)); EXPECT_CALL(cache, setFull).Times(AtMost(1)); @@ -360,7 +360,7 @@ TEST_F(CacheLoaderTest, DisabledCacheLoaderDoesNotLoadCache) auto const cfg = getParseCacheConfig(json::parse(R"JSON({"cache": {"load": "none"}})JSON")); CacheLoader loader{cfg, backend_, cache}; - EXPECT_CALL(cache, updateImp).Times(0); + EXPECT_CALL(cache, updateImpl).Times(0); EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false)); EXPECT_CALL(cache, setDisabled).Times(1); @@ -372,7 +372,7 @@ TEST_F(CacheLoaderTest, DisabledCacheLoaderCanCallStopAndWait) auto const cfg = getParseCacheConfig(json::parse(R"JSON({"cache": {"load": "none"}})JSON")); CacheLoader loader{cfg, backend_, cache}; - EXPECT_CALL(cache, updateImp).Times(0); + EXPECT_CALL(cache, updateImpl).Times(0); EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false)); EXPECT_CALL(cache, setDisabled).Times(1); @@ -410,11 +410,12 @@ TEST_F(CacheLoaderFromFileTest, Success) EXPECT_CALL(cache, loadFromFile(filePath, kSEQ - maxSequenceLag)) .WillOnce(Return(std::expected{})); EXPECT_CALL(cache, latestLedgerSequence).WillOnce(Return(kLOADED_SEQ)); + EXPECT_CALL(cache, setFull); loader.load(kSEQ); std::optional const expectedLedgerRange = - LedgerRange{.minSequence = kSEQ - 20, .maxSequence = kLOADED_SEQ}; + LedgerRange{.minSequence = kSEQ - 20, .maxSequence = kSEQ}; EXPECT_EQ(backend_->fetchLedgerRange(), expectedLedgerRange); } @@ -437,7 +438,7 @@ TEST_F(CacheLoaderFromFileTest, FailureBackToNormalLoad) .WillRepeatedly(Return(std::vector{keysSize - 1, Blob{'s'}})); EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false)); - EXPECT_CALL(cache, updateImp).Times(loops); + EXPECT_CALL(cache, updateImpl).Times(loops); EXPECT_CALL(cache, isFull).WillOnce(Return(false)).WillRepeatedly(Return(true)); EXPECT_CALL(cache, setFull).Times(1); @@ -469,6 +470,32 @@ TEST_F(CacheLoaderFromFileTest, MaxSequenceLagCalculation) loader.load(kSEQ); } +TEST_F(CacheLoaderFromFileTest, FileSequenceBehindBackendFetchesMissingLedgersFromDB) +{ + constexpr uint32_t kFILE_SEQ = kSEQ - 2; + auto const diffs = diffProvider.getLatestDiff(); + + EXPECT_CALL(cache, isFull).WillOnce(Return(false)); + EXPECT_CALL(cache, loadFromFile(filePath, kSEQ - maxSequenceLag)) + .WillOnce(Return(std::expected{})); + + // latestLedgerSequence is called twice per loop iteration (condition + seqToLoad + 1) + // plus once for the final exit check + EXPECT_CALL(cache, latestLedgerSequence) + .WillOnce(Return(kFILE_SEQ)) // iteration 1: condition (true) + .WillOnce(Return(kFILE_SEQ)) // iteration 1: seqToLoad + 1 = kFILE_SEQ + 1 + .WillOnce(Return(kFILE_SEQ + 1)) // iteration 2: condition (true) + .WillOnce(Return(kFILE_SEQ + 1)) // iteration 2: seqToLoad + 1 = kFILE_SEQ + 2 + .WillOnce(Return(kSEQ)); // exit condition (false) + + EXPECT_CALL(*backend_, fetchLedgerDiff(kFILE_SEQ + 1, _)).WillOnce(Return(diffs)); + EXPECT_CALL(*backend_, fetchLedgerDiff(kFILE_SEQ + 2, _)).WillOnce(Return(diffs)); + EXPECT_CALL(cache, updateImpl).Times(2); + EXPECT_CALL(cache, setFull).Times(1); + + loader.load(kSEQ); +} + TEST_F(CacheLoaderFromFileTest, MaxSequenceLagClampedToMinOfLedgerRange) { uint32_t const currentSeq = 110; diff --git a/tests/unit/etl/ETLServiceTests.cpp b/tests/unit/etl/ETLServiceTests.cpp index 883266755..a4642e822 100644 --- a/tests/unit/etl/ETLServiceTests.cpp +++ b/tests/unit/etl/ETLServiceTests.cpp @@ -325,7 +325,7 @@ TEST_F(ETLServiceTests, RunWithEmptyDatabase) auto mockTaskManager = std::make_unique>(); auto& mockTaskManagerRef = *mockTaskManager; auto ledgerData = createTestData(kSEQ); - EXPECT_TRUE(systemState_->isLoadingCache); + EXPECT_FALSE(systemState_->etlStarted); testing::Sequence const s; EXPECT_CALL(*backend_, hardFetchLedgerRange) @@ -336,20 +336,19 @@ TEST_F(ETLServiceTests, RunWithEmptyDatabase) EXPECT_CALL(*balancer_, loadInitialLedger(kSEQ, testing::_, testing::_)) .WillOnce(testing::Return(std::vector{})); EXPECT_CALL(*loader_, loadInitialLedger).WillOnce(testing::Return(ripple::LedgerHeader{})); - // In syncCacheWithDb() - EXPECT_CALL(*backend_, hardFetchLedgerRange).Times(2).InSequence(s).WillRepeatedly([this]() { - backend_->cache().update({}, kSEQ, false); - return data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}; - }); + EXPECT_CALL(*backend_, hardFetchLedgerRange) + .Times(1) + .InSequence(s) + .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); EXPECT_CALL(mockTaskManagerRef, run); EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1, testing::_)) .WillOnce([&](auto&&...) { - EXPECT_FALSE(systemState_->isLoadingCache); + EXPECT_TRUE(systemState_->etlStarted); return std::unique_ptr(mockTaskManager.release()); }); EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, kSEQ + 1, testing::_)) .WillOnce([this](auto, auto, auto, auto, auto) { - EXPECT_TRUE(systemState_->isLoadingCache); + EXPECT_FALSE(systemState_->etlStarted); return std::make_unique>(); }); @@ -358,13 +357,13 @@ TEST_F(ETLServiceTests, RunWithEmptyDatabase) TEST_F(ETLServiceTests, RunWithPopulatedDatabase) { - EXPECT_TRUE(systemState_->isLoadingCache); + EXPECT_FALSE(systemState_->etlStarted); backend_->cache().update({}, kSEQ, false); EXPECT_CALL(*backend_, hardFetchLedgerRange) .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, kSEQ + 1, testing::_)) .WillOnce([this](auto, auto, auto, auto, auto) { - EXPECT_TRUE(systemState_->isLoadingCache); + EXPECT_FALSE(systemState_->etlStarted); return std::make_unique>(); }); EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ)); @@ -375,21 +374,14 @@ TEST_F(ETLServiceTests, RunWithPopulatedDatabase) TEST_F(ETLServiceTests, SyncCacheWithDbBeforeStartingMonitor) { - EXPECT_TRUE(systemState_->isLoadingCache); + EXPECT_FALSE(systemState_->etlStarted); backend_->cache().update({}, kSEQ - 2, false); EXPECT_CALL(*backend_, hardFetchLedgerRange) .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); - EXPECT_CALL(*backend_, fetchLedgerDiff(kSEQ - 1, testing::_)); - EXPECT_CALL(*cacheUpdater_, update(kSEQ - 1, std::vector())) - .WillOnce([this](auto const seq, auto&&...) { backend_->cache().update({}, seq, false); }); - EXPECT_CALL(*backend_, fetchLedgerDiff(kSEQ, testing::_)); - EXPECT_CALL(*cacheUpdater_, update(kSEQ, std::vector())) - .WillOnce([this](auto const seq, auto&&...) { backend_->cache().update({}, seq, false); }); - EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, kSEQ + 1, testing::_)) .WillOnce([this](auto, auto, auto, auto, auto) { - EXPECT_TRUE(systemState_->isLoadingCache); + EXPECT_FALSE(systemState_->etlStarted); return std::make_unique>(); }); EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ)); @@ -433,7 +425,6 @@ TEST_F(ETLServiceTests, HandlesWriteConflictInMonitorSubscription) // Set cache to be in sync with DB to avoid syncCacheWithDb loop backend_->cache().update({}, kSEQ, false); EXPECT_CALL(*backend_, hardFetchLedgerRange) - .Times(2) .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); EXPECT_CALL(*cacheLoader_, load(kSEQ)); @@ -470,7 +461,6 @@ TEST_F(ETLServiceTests, NormalFlowInMonitorSubscription) // Set cache to be in sync with DB to avoid syncCacheWithDb loop backend_->cache().update({}, kSEQ, false); EXPECT_CALL(*backend_, hardFetchLedgerRange) - .Times(2) .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); EXPECT_CALL(*cacheLoader_, load(kSEQ)); @@ -565,7 +555,6 @@ TEST_F(ETLServiceTests, GiveUpWriterAfterWriteConflict) // Set cache to be in sync with DB to avoid syncCacheWithDb loop backend_->cache().update({}, kSEQ, false); EXPECT_CALL(*backend_, hardFetchLedgerRange) - .Times(2) .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); EXPECT_CALL(*cacheLoader_, load(kSEQ)); @@ -782,13 +771,8 @@ TEST_F(ETLServiceTests, OnlyCacheUpdatesWhenBackendIsCurrent) EXPECT_CALL(mockMonitorRef, subscribeToDbStalled); EXPECT_CALL(mockMonitorRef, run); - // Set backend range to be at kSEQ + 1 (already current) EXPECT_CALL(*backend_, hardFetchLedgerRange) - .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})) - .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})) - .WillRepeatedly( - testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ + 1}) - ); + .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); EXPECT_CALL(*cacheLoader_, load(kSEQ)); @@ -831,13 +815,8 @@ TEST_F(ETLServiceTests, NoUpdatesWhenBothCacheAndBackendAreCurrent) EXPECT_CALL(mockMonitorRef, subscribeToDbStalled); EXPECT_CALL(mockMonitorRef, run); - // Set backend range to be at kSEQ + 1 (already current) EXPECT_CALL(*backend_, hardFetchLedgerRange) - .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})) - .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})) - .WillRepeatedly( - testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ + 1}) - ); + .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); EXPECT_CALL(*cacheLoader_, load(kSEQ)); diff --git a/tests/unit/etl/SystemStateTests.cpp b/tests/unit/etl/SystemStateTests.cpp index 5705ddbe9..7b4cf008a 100644 --- a/tests/unit/etl/SystemStateTests.cpp +++ b/tests/unit/etl/SystemStateTests.cpp @@ -42,7 +42,7 @@ TEST_F(SystemStateTest, InitialValuesAreCorrect) EXPECT_FALSE(state.isStrictReadonly); EXPECT_FALSE(state.isWriting); - EXPECT_TRUE(state.isLoadingCache); + EXPECT_FALSE(state.etlStarted); EXPECT_FALSE(state.isAmendmentBlocked); EXPECT_FALSE(state.isCorruptionDetected); EXPECT_FALSE(state.isWriterDecidingFallback); @@ -66,7 +66,7 @@ TEST_P(SystemStateReadOnlyTest, MakeSystemStateWithReadOnly) EXPECT_EQ(state->isStrictReadonly, readOnlyValue); EXPECT_FALSE(state->isWriting); - EXPECT_TRUE(state->isLoadingCache); + EXPECT_FALSE(state->etlStarted); EXPECT_FALSE(state->isAmendmentBlocked); EXPECT_FALSE(state->isCorruptionDetected); EXPECT_FALSE(state->isWriterDecidingFallback); diff --git a/tests/unit/etl/WriterStateTests.cpp b/tests/unit/etl/WriterStateTests.cpp index 872d578fd..a7555e978 100644 --- a/tests/unit/etl/WriterStateTests.cpp +++ b/tests/unit/etl/WriterStateTests.cpp @@ -19,6 +19,7 @@ #include "etl/SystemState.hpp" #include "etl/WriterState.hpp" +#include "util/MockLedgerCache.hpp" #include "util/MockPrometheus.hpp" #include @@ -32,7 +33,8 @@ using namespace testing; struct WriterStateTest : util::prometheus::WithPrometheus { std::shared_ptr systemState = std::make_shared(); StrictMock> mockWriteCommand; - WriterState writerState{systemState}; + NiceMock cache; + WriterState writerState{systemState, cache}; WriterStateTest() { @@ -117,27 +119,36 @@ TEST_F(WriterStateTest, IsReadOnlyReturnsSystemStateValue) EXPECT_TRUE(writerState.isReadOnly()); } -TEST_F(WriterStateTest, IsLoadingCacheReturnsSystemStateValue) +TEST_F(WriterStateTest, IsEtlStartedReturnsSystemStateValue) { - systemState->isLoadingCache = false; - EXPECT_FALSE(writerState.isLoadingCache()); + systemState->etlStarted = false; + EXPECT_FALSE(writerState.isEtlStarted()); - systemState->isLoadingCache = true; - EXPECT_TRUE(writerState.isLoadingCache()); + systemState->etlStarted = true; + EXPECT_TRUE(writerState.isEtlStarted()); +} + +TEST_F(WriterStateTest, IsCacheFullReturnsCacheValue) +{ + EXPECT_CALL(cache, isFull()).WillOnce(Return(false)); + EXPECT_FALSE(writerState.isCacheFull()); + + EXPECT_CALL(cache, isFull()).WillOnce(Return(true)); + EXPECT_TRUE(writerState.isCacheFull()); } TEST_F(WriterStateTest, CloneCreatesNewInstanceWithSameSystemState) { systemState->isWriting = true; systemState->isStrictReadonly = true; - systemState->isLoadingCache = false; + systemState->etlStarted = false; auto cloned = writerState.clone(); ASSERT_NE(cloned.get(), &writerState); EXPECT_TRUE(cloned->isWriting()); EXPECT_TRUE(cloned->isReadOnly()); - EXPECT_FALSE(cloned->isLoadingCache()); + EXPECT_FALSE(cloned->isEtlStarted()); } TEST_F(WriterStateTest, ClonedInstanceSharesSystemState)