#include "cluster/ClioNode.hpp" #include "cluster/ClusterCommunicationService.hpp" #include "data/BackendInterface.hpp" #include "etl/SystemState.hpp" #include "util/MockBackendTestFixture.hpp" #include "util/MockLedgerCacheLoadingState.hpp" #include "util/MockPrometheus.hpp" #include "util/MockWriterState.hpp" #include "util/NameGenerator.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigValue.hpp" #include "util/config/Types.hpp" #include "util/prometheus/Prometheus.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace cluster; struct ClusterCommunicationServiceTest : util::prometheus::WithPrometheus, MockBackendTest { std::unique_ptr writerState = std::make_unique(); NiceMockWriterState& writerStateRef = *writerState; std::unique_ptr cacheLoadingState = std::make_unique(); static constexpr std::chrono::milliseconds kSHORT_INTERVAL{1}; static boost::uuids::uuid makeUuid(uint8_t value) { boost::uuids::uuid uuid{}; std::ranges::fill(uuid, value); return uuid; } static ClioNode makeNode(boost::uuids::uuid const& uuid, ClioNode::DbRole role) { return ClioNode{ .uuid = std::make_shared(uuid), .updateTime = std::chrono::system_clock::now(), .dbRole = role, .etlStarted = true, .cacheIsFull = true, .cacheIsCurrentlyLoading = false, }; } static std::string nodeToJson(ClioNode const& node) { boost::json::value const v = boost::json::value_from(node); return boost::json::serialize(v); } ClusterCommunicationServiceTest() { ON_CALL(writerStateRef, clone()).WillByDefault(testing::Invoke([]() { auto state = std::make_unique(); ON_CALL(*state, isReadOnly()).WillByDefault(testing::Return(false)); ON_CALL(*state, isWriting()).WillByDefault(testing::Return(true)); return state; })); ON_CALL(writerStateRef, isReadOnly()).WillByDefault(testing::Return(false)); ON_CALL(writerStateRef, isWriting()).WillByDefault(testing::Return(true)); ON_CALL(*cacheLoadingState, clone()).WillByDefault(testing::Invoke([]() { return std::make_unique(); })); } static bool waitForSignal( std::binary_semaphore& sem, std::chrono::milliseconds timeout = std::chrono::milliseconds{1000} ) { return sem.try_acquire_for(timeout); } }; TEST_F(ClusterCommunicationServiceTest, BackendReadsAndWritesData) { auto const otherUuid = makeUuid(0x02); std::binary_semaphore fetchSemaphore{0}; std::binary_semaphore writeSemaphore{0}; BackendInterface::ClioNodesDataFetchResult fetchResult{ std::vector>{ {otherUuid, nodeToJson(makeNode(otherUuid, ClioNode::DbRole::Writer))} } }; ON_CALL(*backend_, fetchClioNodesData).WillByDefault(testing::Invoke([&](auto) { fetchSemaphore.release(); return fetchResult; })); ON_CALL(*backend_, writeNodeMessage).WillByDefault(testing::Invoke([&](auto, auto) { writeSemaphore.release(); })); ClusterCommunicationService service{ backend_, std::move(writerState), std::move(cacheLoadingState), kSHORT_INTERVAL, kSHORT_INTERVAL }; service.run(); EXPECT_TRUE(waitForSignal(fetchSemaphore)); EXPECT_TRUE(waitForSignal(writeSemaphore)); service.stop(); } TEST_F(ClusterCommunicationServiceTest, MetricsGetsNewStateFromBackend) { auto const otherUuid = makeUuid(0x02); std::binary_semaphore writerActionSemaphore{0}; BackendInterface::ClioNodesDataFetchResult fetchResult{ std::vector>{ {otherUuid, nodeToJson(makeNode(otherUuid, ClioNode::DbRole::Writer))} } }; ON_CALL(*backend_, fetchClioNodesData).WillByDefault(testing::Invoke([&](auto) { return fetchResult; })); ON_CALL(writerStateRef, clone()).WillByDefault(testing::Invoke([&]() mutable { auto state = std::make_unique(); ON_CALL(*state, startWriting()).WillByDefault(testing::Invoke([&]() { writerActionSemaphore.release(); })); ON_CALL(*state, giveUpWriting()).WillByDefault(testing::Invoke([&]() { writerActionSemaphore.release(); })); return state; })); auto& nodesInClusterMetric = PrometheusService::gaugeInt("cluster_nodes_total_number", {}); auto isHealthyMetric = PrometheusService::boolMetric("cluster_communication_is_healthy", {}); ClusterCommunicationService service{ backend_, std::move(writerState), std::move(cacheLoadingState), kSHORT_INTERVAL, kSHORT_INTERVAL }; service.run(); // WriterDecider is called after metrics are updated so we could use it as a signal to stop EXPECT_TRUE(waitForSignal(writerActionSemaphore)); service.stop(); EXPECT_EQ(nodesInClusterMetric.value(), 2); EXPECT_TRUE(static_cast(isHealthyMetric)); } TEST_F(ClusterCommunicationServiceTest, WriterDeciderCallsWriterStateMethodsAccordingly) { auto const smallerUuid = makeUuid(0x00); std::binary_semaphore fetchSemaphore{0}; std::binary_semaphore writerActionSemaphore{0}; BackendInterface::ClioNodesDataFetchResult fetchResult{ std::vector>{ {smallerUuid, nodeToJson(makeNode(smallerUuid, ClioNode::DbRole::Writer))} } }; ON_CALL(*backend_, fetchClioNodesData).WillByDefault(testing::Invoke([&](auto) { fetchSemaphore.release(); return fetchResult; })); ON_CALL(*backend_, writeNodeMessage).WillByDefault(testing::Return()); ON_CALL(writerStateRef, clone()).WillByDefault(testing::Invoke([&]() mutable { auto state = std::make_unique(); ON_CALL(*state, startWriting()).WillByDefault(testing::Invoke([&]() { writerActionSemaphore.release(); })); ON_CALL(*state, giveUpWriting()).WillByDefault(testing::Invoke([&]() { writerActionSemaphore.release(); })); return state; })); ClusterCommunicationService service{ backend_, std::move(writerState), std::move(cacheLoadingState), kSHORT_INTERVAL, kSHORT_INTERVAL }; service.run(); EXPECT_TRUE(waitForSignal(fetchSemaphore)); EXPECT_TRUE(waitForSignal(writerActionSemaphore)); service.stop(); } TEST_F(ClusterCommunicationServiceTest, StopHaltsBackendOperations) { std::atomic backendOperationsCount{0}; std::binary_semaphore fetchSemaphore{0}; BackendInterface::ClioNodesDataFetchResult fetchResult{ std::vector>{} }; ON_CALL(*backend_, fetchClioNodesData).WillByDefault(testing::Invoke([&](auto) { backendOperationsCount++; fetchSemaphore.release(); return fetchResult; })); ON_CALL(*backend_, writeNodeMessage).WillByDefault(testing::Invoke([&](auto&&, auto&&) { backendOperationsCount++; })); ClusterCommunicationService service{ backend_, std::move(writerState), std::move(cacheLoadingState), kSHORT_INTERVAL, kSHORT_INTERVAL }; service.run(); EXPECT_TRUE(waitForSignal(fetchSemaphore)); service.stop(); auto const countAfterStop = backendOperationsCount.load(); std::this_thread::sleep_for(std::chrono::milliseconds{50}); EXPECT_EQ(backendOperationsCount.load(), countAfterStop); } struct ClusterCommunicationServiceMakeTestBundle { std::string testName; bool limitLoadInCluster; }; struct ClusterCommunicationServiceMakeTest : util::prometheus::WithPrometheus, MockBackendTest, testing::WithParamInterface { std::shared_ptr systemState = std::make_shared(); }; INSTANTIATE_TEST_SUITE_P( LimitLoadInCluster, ClusterCommunicationServiceMakeTest, testing::Values( ClusterCommunicationServiceMakeTestBundle{ .testName = "AllowsLoadingWhenTrue", .limitLoadInCluster = true }, ClusterCommunicationServiceMakeTestBundle{ .testName = "DoesNotAllowLoadingWhenFalse", .limitLoadInCluster = false } ), tests::util::kNAME_GENERATOR ); TEST_P(ClusterCommunicationServiceMakeTest, LoadingAllowedMatchesConfig) { auto const& param = GetParam(); util::config::ClioConfigDefinition const config{ {{"cache.limit_load_in_cluster", util::config::ConfigValue{util::config::ConfigType::Boolean}.defaultValue( param.limitLoadInCluster )}} }; auto result = ClusterCommunicationService::make(config, backend_, systemState); ASSERT_NE(result.cacheLoadingState, nullptr); EXPECT_EQ(result.cacheLoadingState->isLoadingAllowed(), not param.limitLoadInCluster); }