#include "cluster/Backend.hpp" #include "cluster/ClioNode.hpp" #include "data/BackendInterface.hpp" #include "util/MockBackendTestFixture.hpp" #include "util/MockLedgerCacheLoadingState.hpp" #include "util/MockPrometheus.hpp" #include "util/MockWriterState.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace cluster; struct ClusterBackendTest : util::prometheus::WithPrometheus, MockBackendTestStrict { ~ClusterBackendTest() override { ctx.stop(); ctx.join(); } boost::asio::thread_pool ctx; std::unique_ptr writerState = std::make_unique(); MockWriterState& writerStateRef = *writerState; std::unique_ptr cacheLoadingState = std::make_unique(); testing::StrictMock< testing::MockFunction)>> callbackMock; std::binary_semaphore semaphore{0}; class SemaphoreReleaseGuard { std::binary_semaphore& semaphore_; public: SemaphoreReleaseGuard(std::binary_semaphore& s) : semaphore_(s) { } ~SemaphoreReleaseGuard() { semaphore_.release(); } }; }; TEST_F(ClusterBackendTest, SubscribeToNewState) { Backend clusterBackend{ ctx, backend_, std::move(writerState), std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; clusterBackend.subscribeToNewState(callbackMock.AsStdFunction()); EXPECT_CALL(*backend_, fetchClioNodesData) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{})); EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1)); EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly([this]( ClioNode::CUuid selfId, std::shared_ptr clusterData ) { SemaphoreReleaseGuard const guard{semaphore}; ASSERT_TRUE(clusterData->has_value()); EXPECT_EQ(clusterData->value().size(), 1); auto const& nodeData = clusterData->value().front(); EXPECT_EQ(nodeData.uuid, selfId); EXPECT_EQ(nodeData.dbRole, ClioNode::DbRole::ReadOnly); EXPECT_LE(nodeData.updateTime, std::chrono::system_clock::now()); }); clusterBackend.run(); semaphore.acquire(); } TEST_F(ClusterBackendTest, Stop) { Backend clusterBackend{ ctx, backend_, std::move(writerState), std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; EXPECT_CALL(*backend_, fetchClioNodesData) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{})); EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1)); EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); clusterBackend.run(); std::this_thread::sleep_for(std::chrono::milliseconds{20}); clusterBackend.stop(); testing::Mock::VerifyAndClearExpectations(&(*backend_)); // Wait to make sure there is no new calls of mockDbBackend std::this_thread::sleep_for(std::chrono::milliseconds{20}); } TEST_F(ClusterBackendTest, FetchClioNodesDataThrowsException) { Backend clusterBackend{ ctx, backend_, std::move(writerState), std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; clusterBackend.subscribeToNewState(callbackMock.AsStdFunction()); EXPECT_CALL(*backend_, fetchClioNodesData) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Throw(std::runtime_error("Database connection failed"))); EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1)); EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly( [this](ClioNode::CUuid, std::shared_ptr clusterData) { SemaphoreReleaseGuard const guard{semaphore}; ASSERT_FALSE(clusterData->has_value()); EXPECT_EQ(clusterData->error(), "Failed to fetch Clio nodes data"); } ); clusterBackend.run(); semaphore.acquire(); } TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsDataWithOtherNodes) { Backend clusterBackend{ ctx, backend_, std::move(writerState), std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; clusterBackend.subscribeToNewState(callbackMock.AsStdFunction()); auto const otherUuid = boost::uuids::random_generator{}(); auto const otherNodeJson = R"JSON({ "db_role": 2, "update_time": "2025-01-15T10:30:00Z", "etl_started": false, "cache_is_full": false, "cache_is_currently_loading": false })JSON"; EXPECT_CALL(*backend_, fetchClioNodesData) .Times(testing::AtLeast(1)) .WillRepeatedly( testing::Return( BackendInterface::ClioNodesDataFetchResult{ std::vector>{ {otherUuid, otherNodeJson} } } ) ); EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1)); EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isFallback) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isFallbackRecovery) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isWriting) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly([&](ClioNode::CUuid selfId, std::shared_ptr clusterData) { SemaphoreReleaseGuard const guard{semaphore}; ASSERT_TRUE(clusterData->has_value()) << clusterData->error(); EXPECT_EQ(clusterData->value().size(), 2); EXPECT_EQ(selfId, clusterBackend.selfId()); bool foundSelf = false; bool foundOther = false; for (auto const& node : clusterData->value()) { if (*node.uuid == *selfId) { foundSelf = true; EXPECT_EQ(node.dbRole, ClioNode::DbRole::NotWriter); } else if (*node.uuid == otherUuid) { foundOther = true; EXPECT_EQ(node.dbRole, ClioNode::DbRole::Writer); } EXPECT_LE(node.updateTime, std::chrono::system_clock::now()); } EXPECT_TRUE(foundSelf); EXPECT_TRUE(foundOther); }); clusterBackend.run(); semaphore.acquire(); } TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsOnlySelfData) { Backend clusterBackend{ ctx, backend_, std::move(writerState), std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; clusterBackend.subscribeToNewState(callbackMock.AsStdFunction()); auto const selfNodeJson = R"JSON({ "db_role": 1, "update_time": "2025-01-16T10:30:00Z", "etl_started": false, "cache_is_full": false, "cache_is_currently_loading": false })JSON"; EXPECT_CALL(*backend_, fetchClioNodesData).Times(testing::AtLeast(1)).WillRepeatedly([&]() { return BackendInterface::ClioNodesDataFetchResult{ std::vector>{ {*clusterBackend.selfId(), selfNodeJson} } }; }); EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1)); EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly([this]( ClioNode::CUuid selfId, std::shared_ptr clusterData ) { SemaphoreReleaseGuard const guard{semaphore}; ASSERT_TRUE(clusterData->has_value()); EXPECT_EQ(clusterData->value().size(), 1); auto const& nodeData = clusterData->value().front(); EXPECT_EQ(nodeData.uuid, selfId); EXPECT_EQ(nodeData.dbRole, ClioNode::DbRole::ReadOnly); EXPECT_LE(nodeData.updateTime, std::chrono::system_clock::now()); }); clusterBackend.run(); semaphore.acquire(); } TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsInvalidJson) { Backend clusterBackend{ ctx, backend_, std::move(writerState), std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; clusterBackend.subscribeToNewState(callbackMock.AsStdFunction()); auto const otherUuid = boost::uuids::random_generator{}(); auto const invalidJson = "{ invalid json"; EXPECT_CALL(*backend_, fetchClioNodesData) .Times(testing::AtLeast(1)) .WillRepeatedly( testing::Return( BackendInterface::ClioNodesDataFetchResult{ std::vector>{ {otherUuid, invalidJson} } } ) ); EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1)); EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly([this, invalidJson]( ClioNode::CUuid, std::shared_ptr clusterData ) { SemaphoreReleaseGuard const guard{semaphore}; ASSERT_FALSE(clusterData->has_value()); EXPECT_THAT(clusterData->error(), testing::HasSubstr("Error parsing json from DB")); EXPECT_THAT(clusterData->error(), testing::HasSubstr(invalidJson)); }); clusterBackend.run(); semaphore.acquire(); } TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsValidJsonButCannotConvertToClioNode) { Backend clusterBackend{ ctx, backend_, std::move(writerState), std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; clusterBackend.subscribeToNewState(callbackMock.AsStdFunction()); auto const otherUuid = boost::uuids::random_generator{}(); // Valid JSON but db_role has wrong type (string instead of integer) auto const validJsonMissingField = R"JSON({ "update_time": "2025-01-16T10:30:00Z", "db_role": "writer" })JSON"; EXPECT_CALL(*backend_, fetchClioNodesData) .Times(testing::AtLeast(1)) .WillRepeatedly( testing::Return( BackendInterface::ClioNodesDataFetchResult{ std::vector>{ {otherUuid, validJsonMissingField} } } ) ); EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1)); EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly( [this](ClioNode::CUuid, std::shared_ptr clusterData) { SemaphoreReleaseGuard const guard{semaphore}; ASSERT_FALSE(clusterData->has_value()); EXPECT_THAT( clusterData->error(), testing::HasSubstr("Error converting json to ClioNode") ); } ); clusterBackend.run(); semaphore.acquire(); } TEST_F(ClusterBackendTest, WriteNodeMessageWritesSelfDataWithRecentTimestampAndDbRole) { Backend clusterBackend{ ctx, backend_, std::move(writerState), std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; auto const beforeRun = std::chrono::floor(std::chrono::system_clock::now()); EXPECT_CALL(*backend_, fetchClioNodesData) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{})); EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isFallback) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isFallbackRecovery) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isWriting) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(*backend_, writeNodeMessage) .Times(testing::AtLeast(1)) .WillRepeatedly([&](boost::uuids::uuid const& uuid, std::string message) { SemaphoreReleaseGuard const guard{semaphore}; auto const afterWrite = std::chrono::system_clock::now(); EXPECT_EQ(uuid, *clusterBackend.selfId()); auto const json = boost::json::parse(message); auto const node = boost::json::try_value_to(json); ASSERT_TRUE(node.has_value()); EXPECT_EQ(node->dbRole, ClioNode::DbRole::NotWriter); EXPECT_GE(node->updateTime, beforeRun); EXPECT_LE(node->updateTime, afterWrite); }); clusterBackend.run(); semaphore.acquire(); } TEST_F(ClusterBackendTest, WriteNodeMessageReflectsCacheIsCurrentlyLoading) { auto& cacheLoadingStateRef = *cacheLoadingState; Backend clusterBackend{ ctx, backend_, std::move(writerState), std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; EXPECT_CALL(*backend_, fetchClioNodesData) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{})); EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(cacheLoadingStateRef, isCurrentlyLoading) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(*backend_, writeNodeMessage) .Times(testing::AtLeast(1)) .WillRepeatedly([&](boost::uuids::uuid const&, std::string message) { SemaphoreReleaseGuard const guard{semaphore}; auto const json = boost::json::parse(message); auto const node = boost::json::try_value_to(json); ASSERT_TRUE(node.has_value()); EXPECT_TRUE(node->cacheIsCurrentlyLoading); }); clusterBackend.run(); semaphore.acquire(); } TEST_F(ClusterBackendTest, SubscribeToNewStateReflectsCacheIsCurrentlyLoading) { auto& cacheLoadingStateRef = *cacheLoadingState; Backend clusterBackend{ ctx, backend_, std::move(writerState), std::move(cacheLoadingState), std::chrono::milliseconds(1), std::chrono::milliseconds(1) }; clusterBackend.subscribeToNewState(callbackMock.AsStdFunction()); EXPECT_CALL(*backend_, fetchClioNodesData) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{})); EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1)); EXPECT_CALL(writerStateRef, isReadOnly) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(writerStateRef, isEtlStarted) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(writerStateRef, isCacheFull) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(false)); EXPECT_CALL(cacheLoadingStateRef, isCurrentlyLoading) .Times(testing::AtLeast(1)) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(callbackMock, Call) .Times(testing::AtLeast(1)) .WillRepeatedly([this]( ClioNode::CUuid selfId, std::shared_ptr clusterData ) { SemaphoreReleaseGuard const guard{semaphore}; ASSERT_TRUE(clusterData->has_value()); ASSERT_EQ(clusterData->value().size(), 1); auto const& selfNode = clusterData->value().front(); EXPECT_EQ(selfNode.uuid, selfId); EXPECT_TRUE(selfNode.cacheIsCurrentlyLoading); }); clusterBackend.run(); semaphore.acquire(); }