Files
clio/tests/unit/cluster/BackendTests.cpp
2026-03-24 15:25:32 +00:00

569 lines
20 KiB
C++

#include "cluster/Backend.hpp"
#include "cluster/ClioNode.hpp"
#include "data/BackendInterface.hpp"
#include "util/MockBackendTestFixture.hpp"
#include "util/MockLedgerCacheLoadingState.hpp"
#include "util/MockPrometheus.hpp"
#include "util/MockWriterState.hpp"
#include <boost/asio/thread_pool.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/value_to.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include <memory>
#include <semaphore>
#include <stdexcept>
#include <string>
#include <thread>
#include <utility>
#include <vector>
using namespace cluster;
struct ClusterBackendTest : util::prometheus::WithPrometheus, MockBackendTestStrict {
~ClusterBackendTest() override
{
ctx.stop();
ctx.join();
}
boost::asio::thread_pool ctx;
std::unique_ptr<MockWriterState> writerState = std::make_unique<MockWriterState>();
MockWriterState& writerStateRef = *writerState;
std::unique_ptr<NiceMockLedgerCacheLoadingState> cacheLoadingState =
std::make_unique<NiceMockLedgerCacheLoadingState>();
testing::StrictMock<
testing::MockFunction<void(ClioNode::CUuid, std::shared_ptr<Backend::ClusterData const>)>>
callbackMock;
std::binary_semaphore semaphore{0};
class SemaphoreReleaseGuard {
std::binary_semaphore& semaphore_;
public:
SemaphoreReleaseGuard(std::binary_semaphore& s) : semaphore_(s)
{
}
~SemaphoreReleaseGuard()
{
semaphore_.release();
}
};
};
TEST_F(ClusterBackendTest, SubscribeToNewState)
{
Backend clusterBackend{
ctx,
backend_,
std::move(writerState),
std::move(cacheLoadingState),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
};
clusterBackend.subscribeToNewState(callbackMock.AsStdFunction());
EXPECT_CALL(*backend_, fetchClioNodesData)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{}));
EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1));
EXPECT_CALL(writerStateRef, isReadOnly)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(writerStateRef, isEtlStarted)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isCacheFull)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(callbackMock, Call)
.Times(testing::AtLeast(1))
.WillRepeatedly([this](
ClioNode::CUuid selfId,
std::shared_ptr<Backend::ClusterData const> clusterData
) {
SemaphoreReleaseGuard const guard{semaphore};
ASSERT_TRUE(clusterData->has_value());
EXPECT_EQ(clusterData->value().size(), 1);
auto const& nodeData = clusterData->value().front();
EXPECT_EQ(nodeData.uuid, selfId);
EXPECT_EQ(nodeData.dbRole, ClioNode::DbRole::ReadOnly);
EXPECT_LE(nodeData.updateTime, std::chrono::system_clock::now());
});
clusterBackend.run();
semaphore.acquire();
}
TEST_F(ClusterBackendTest, Stop)
{
Backend clusterBackend{
ctx,
backend_,
std::move(writerState),
std::move(cacheLoadingState),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
};
EXPECT_CALL(*backend_, fetchClioNodesData)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{}));
EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1));
EXPECT_CALL(writerStateRef, isReadOnly)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(writerStateRef, isEtlStarted)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isCacheFull)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
clusterBackend.run();
std::this_thread::sleep_for(std::chrono::milliseconds{20});
clusterBackend.stop();
testing::Mock::VerifyAndClearExpectations(&(*backend_));
// Wait to make sure there is no new calls of mockDbBackend
std::this_thread::sleep_for(std::chrono::milliseconds{20});
}
TEST_F(ClusterBackendTest, FetchClioNodesDataThrowsException)
{
Backend clusterBackend{
ctx,
backend_,
std::move(writerState),
std::move(cacheLoadingState),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
};
clusterBackend.subscribeToNewState(callbackMock.AsStdFunction());
EXPECT_CALL(*backend_, fetchClioNodesData)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Throw(std::runtime_error("Database connection failed")));
EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1));
EXPECT_CALL(writerStateRef, isReadOnly)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(writerStateRef, isEtlStarted)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isCacheFull)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(callbackMock, Call)
.Times(testing::AtLeast(1))
.WillRepeatedly(
[this](ClioNode::CUuid, std::shared_ptr<Backend::ClusterData const> clusterData) {
SemaphoreReleaseGuard const guard{semaphore};
ASSERT_FALSE(clusterData->has_value());
EXPECT_EQ(clusterData->error(), "Failed to fetch Clio nodes data");
}
);
clusterBackend.run();
semaphore.acquire();
}
TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsDataWithOtherNodes)
{
Backend clusterBackend{
ctx,
backend_,
std::move(writerState),
std::move(cacheLoadingState),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
};
clusterBackend.subscribeToNewState(callbackMock.AsStdFunction());
auto const otherUuid = boost::uuids::random_generator{}();
auto const otherNodeJson = R"JSON({
"db_role": 2,
"update_time": "2025-01-15T10:30:00Z",
"etl_started": false,
"cache_is_full": false,
"cache_is_currently_loading": false
})JSON";
EXPECT_CALL(*backend_, fetchClioNodesData)
.Times(testing::AtLeast(1))
.WillRepeatedly(
testing::Return(
BackendInterface::ClioNodesDataFetchResult{
std::vector<std::pair<boost::uuids::uuid, std::string>>{
{otherUuid, otherNodeJson}
}
}
)
);
EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1));
EXPECT_CALL(writerStateRef, isReadOnly)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isFallback)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isFallbackRecovery)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isEtlStarted)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isCacheFull)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isWriting)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(callbackMock, Call)
.Times(testing::AtLeast(1))
.WillRepeatedly([&](ClioNode::CUuid selfId,
std::shared_ptr<Backend::ClusterData const> clusterData) {
SemaphoreReleaseGuard const guard{semaphore};
ASSERT_TRUE(clusterData->has_value()) << clusterData->error();
EXPECT_EQ(clusterData->value().size(), 2);
EXPECT_EQ(selfId, clusterBackend.selfId());
bool foundSelf = false;
bool foundOther = false;
for (auto const& node : clusterData->value()) {
if (*node.uuid == *selfId) {
foundSelf = true;
EXPECT_EQ(node.dbRole, ClioNode::DbRole::NotWriter);
} else if (*node.uuid == otherUuid) {
foundOther = true;
EXPECT_EQ(node.dbRole, ClioNode::DbRole::Writer);
}
EXPECT_LE(node.updateTime, std::chrono::system_clock::now());
}
EXPECT_TRUE(foundSelf);
EXPECT_TRUE(foundOther);
});
clusterBackend.run();
semaphore.acquire();
}
TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsOnlySelfData)
{
Backend clusterBackend{
ctx,
backend_,
std::move(writerState),
std::move(cacheLoadingState),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
};
clusterBackend.subscribeToNewState(callbackMock.AsStdFunction());
auto const selfNodeJson = R"JSON({
"db_role": 1,
"update_time": "2025-01-16T10:30:00Z",
"etl_started": false,
"cache_is_full": false,
"cache_is_currently_loading": false
})JSON";
EXPECT_CALL(*backend_, fetchClioNodesData).Times(testing::AtLeast(1)).WillRepeatedly([&]() {
return BackendInterface::ClioNodesDataFetchResult{
std::vector<std::pair<boost::uuids::uuid, std::string>>{
{*clusterBackend.selfId(), selfNodeJson}
}
};
});
EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1));
EXPECT_CALL(writerStateRef, isReadOnly)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(writerStateRef, isEtlStarted)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isCacheFull)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(callbackMock, Call)
.Times(testing::AtLeast(1))
.WillRepeatedly([this](
ClioNode::CUuid selfId,
std::shared_ptr<Backend::ClusterData const> clusterData
) {
SemaphoreReleaseGuard const guard{semaphore};
ASSERT_TRUE(clusterData->has_value());
EXPECT_EQ(clusterData->value().size(), 1);
auto const& nodeData = clusterData->value().front();
EXPECT_EQ(nodeData.uuid, selfId);
EXPECT_EQ(nodeData.dbRole, ClioNode::DbRole::ReadOnly);
EXPECT_LE(nodeData.updateTime, std::chrono::system_clock::now());
});
clusterBackend.run();
semaphore.acquire();
}
TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsInvalidJson)
{
Backend clusterBackend{
ctx,
backend_,
std::move(writerState),
std::move(cacheLoadingState),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
};
clusterBackend.subscribeToNewState(callbackMock.AsStdFunction());
auto const otherUuid = boost::uuids::random_generator{}();
auto const invalidJson = "{ invalid json";
EXPECT_CALL(*backend_, fetchClioNodesData)
.Times(testing::AtLeast(1))
.WillRepeatedly(
testing::Return(
BackendInterface::ClioNodesDataFetchResult{
std::vector<std::pair<boost::uuids::uuid, std::string>>{
{otherUuid, invalidJson}
}
}
)
);
EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1));
EXPECT_CALL(writerStateRef, isReadOnly)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(writerStateRef, isEtlStarted)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isCacheFull)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(callbackMock, Call)
.Times(testing::AtLeast(1))
.WillRepeatedly([this, invalidJson](
ClioNode::CUuid, std::shared_ptr<Backend::ClusterData const> clusterData
) {
SemaphoreReleaseGuard const guard{semaphore};
ASSERT_FALSE(clusterData->has_value());
EXPECT_THAT(clusterData->error(), testing::HasSubstr("Error parsing json from DB"));
EXPECT_THAT(clusterData->error(), testing::HasSubstr(invalidJson));
});
clusterBackend.run();
semaphore.acquire();
}
TEST_F(ClusterBackendTest, FetchClioNodesDataReturnsValidJsonButCannotConvertToClioNode)
{
Backend clusterBackend{
ctx,
backend_,
std::move(writerState),
std::move(cacheLoadingState),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
};
clusterBackend.subscribeToNewState(callbackMock.AsStdFunction());
auto const otherUuid = boost::uuids::random_generator{}();
// Valid JSON but db_role has wrong type (string instead of integer)
auto const validJsonMissingField = R"JSON({
"update_time": "2025-01-16T10:30:00Z",
"db_role": "writer"
})JSON";
EXPECT_CALL(*backend_, fetchClioNodesData)
.Times(testing::AtLeast(1))
.WillRepeatedly(
testing::Return(
BackendInterface::ClioNodesDataFetchResult{
std::vector<std::pair<boost::uuids::uuid, std::string>>{
{otherUuid, validJsonMissingField}
}
}
)
);
EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1));
EXPECT_CALL(writerStateRef, isReadOnly)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(writerStateRef, isEtlStarted)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isCacheFull)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(callbackMock, Call)
.Times(testing::AtLeast(1))
.WillRepeatedly(
[this](ClioNode::CUuid, std::shared_ptr<Backend::ClusterData const> clusterData) {
SemaphoreReleaseGuard const guard{semaphore};
ASSERT_FALSE(clusterData->has_value());
EXPECT_THAT(
clusterData->error(), testing::HasSubstr("Error converting json to ClioNode")
);
}
);
clusterBackend.run();
semaphore.acquire();
}
TEST_F(ClusterBackendTest, WriteNodeMessageWritesSelfDataWithRecentTimestampAndDbRole)
{
Backend clusterBackend{
ctx,
backend_,
std::move(writerState),
std::move(cacheLoadingState),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
};
auto const beforeRun =
std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now());
EXPECT_CALL(*backend_, fetchClioNodesData)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{}));
EXPECT_CALL(writerStateRef, isReadOnly)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isFallback)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isFallbackRecovery)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isEtlStarted)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isCacheFull)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isWriting)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(*backend_, writeNodeMessage)
.Times(testing::AtLeast(1))
.WillRepeatedly([&](boost::uuids::uuid const& uuid, std::string message) {
SemaphoreReleaseGuard const guard{semaphore};
auto const afterWrite = std::chrono::system_clock::now();
EXPECT_EQ(uuid, *clusterBackend.selfId());
auto const json = boost::json::parse(message);
auto const node = boost::json::try_value_to<ClioNode>(json);
ASSERT_TRUE(node.has_value());
EXPECT_EQ(node->dbRole, ClioNode::DbRole::NotWriter);
EXPECT_GE(node->updateTime, beforeRun);
EXPECT_LE(node->updateTime, afterWrite);
});
clusterBackend.run();
semaphore.acquire();
}
TEST_F(ClusterBackendTest, WriteNodeMessageReflectsCacheIsCurrentlyLoading)
{
auto& cacheLoadingStateRef = *cacheLoadingState;
Backend clusterBackend{
ctx,
backend_,
std::move(writerState),
std::move(cacheLoadingState),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
};
EXPECT_CALL(*backend_, fetchClioNodesData)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{}));
EXPECT_CALL(writerStateRef, isReadOnly)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(writerStateRef, isEtlStarted)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isCacheFull)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(cacheLoadingStateRef, isCurrentlyLoading)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(*backend_, writeNodeMessage)
.Times(testing::AtLeast(1))
.WillRepeatedly([&](boost::uuids::uuid const&, std::string message) {
SemaphoreReleaseGuard const guard{semaphore};
auto const json = boost::json::parse(message);
auto const node = boost::json::try_value_to<ClioNode>(json);
ASSERT_TRUE(node.has_value());
EXPECT_TRUE(node->cacheIsCurrentlyLoading);
});
clusterBackend.run();
semaphore.acquire();
}
TEST_F(ClusterBackendTest, SubscribeToNewStateReflectsCacheIsCurrentlyLoading)
{
auto& cacheLoadingStateRef = *cacheLoadingState;
Backend clusterBackend{
ctx,
backend_,
std::move(writerState),
std::move(cacheLoadingState),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
};
clusterBackend.subscribeToNewState(callbackMock.AsStdFunction());
EXPECT_CALL(*backend_, fetchClioNodesData)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(BackendInterface::ClioNodesDataFetchResult{}));
EXPECT_CALL(*backend_, writeNodeMessage).Times(testing::AtLeast(1));
EXPECT_CALL(writerStateRef, isReadOnly)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(writerStateRef, isEtlStarted)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(writerStateRef, isCacheFull)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(false));
EXPECT_CALL(cacheLoadingStateRef, isCurrentlyLoading)
.Times(testing::AtLeast(1))
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(callbackMock, Call)
.Times(testing::AtLeast(1))
.WillRepeatedly([this](
ClioNode::CUuid selfId,
std::shared_ptr<Backend::ClusterData const> clusterData
) {
SemaphoreReleaseGuard const guard{semaphore};
ASSERT_TRUE(clusterData->has_value());
ASSERT_EQ(clusterData->value().size(), 1);
auto const& selfNode = clusterData->value().front();
EXPECT_EQ(selfNode.uuid, selfId);
EXPECT_TRUE(selfNode.cacheIsCurrentlyLoading);
});
clusterBackend.run();
semaphore.acquire();
}