Files
clio/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp
2026-03-24 15:25:32 +00:00

377 lines
15 KiB
C++

#include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp"
#include "data/LedgerCache.hpp"
#include "data/cassandra/Handle.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "migration/MigrationManagerInterface.hpp"
#include "migration/MigratiorStatus.hpp"
#include "migration/cassandra/CassandraMigrationTestBackend.hpp"
#include "migration/cassandra/DBRawData.hpp"
#include "migration/cassandra/ExampleDropTableMigrator.hpp"
#include "migration/cassandra/ExampleLedgerMigrator.hpp"
#include "migration/cassandra/ExampleObjectsMigrator.hpp"
#include "migration/cassandra/ExampleTransactionsMigrator.hpp"
#include "migration/impl/MigrationManagerBase.hpp"
#include "migration/impl/MigratorsRegister.hpp"
#include "util/CassandraDBHelper.hpp"
#include "util/MockPrometheus.hpp"
#include "util/config/ConfigConstraints.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/config/ConfigValue.hpp"
#include "util/config/Types.hpp"
#include <TestGlobals.hpp>
#include <gtest/gtest.h>
#include <xrpl/basics/base_uint.h>
#include <algorithm>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <thread>
#include <utility>
using namespace util;
using namespace std;
using namespace prometheus;
using namespace data::cassandra;
using namespace migration;
using namespace util::config;
// Register the migrators
using CassandraSupportedTestMigrators = migration::impl::MigratorsRegister<
CassandraMigrationTestBackend,
ExampleObjectsMigrator,
ExampleTransactionsMigrator,
ExampleLedgerMigrator,
ExampleDropTableMigrator>;
// Define the test migration manager
using CassandraMigrationTestManager =
migration::impl::MigrationManagerBase<CassandraSupportedTestMigrators>;
namespace {
std::pair<
std::shared_ptr<migration::MigrationManagerInterface>,
std::shared_ptr<CassandraMigrationTestBackend>>
makeMigrationTestManagerAndBackend(ClioConfigDefinition const& config)
{
auto const cfg = config.getObject("database.cassandra");
auto cache = data::LedgerCache{};
auto const backendPtr = std::make_shared<CassandraMigrationTestBackend>(
data::cassandra::SettingsProvider{cfg}, cache
);
return std::make_pair(
std::make_shared<CassandraMigrationTestManager>(backendPtr, config.getObject("migration")),
backendPtr
);
}
} // namespace
class MigrationCassandraSimpleTest : public WithPrometheus {
// This function is used to prepare the database before running the tests
// It is called in the SetUp function. Different tests can override this function to prepare the
// database differently
virtual void
setupDatabase()
{
}
protected:
static constexpr auto kCASSANDRA = "cassandra";
ClioConfigDefinition cfg_{
{{"database.type", ConfigValue{ConfigType::String}.defaultValue(kCASSANDRA)},
{"database.cassandra.contact_points",
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)},
{"database.cassandra.keyspace",
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue(kCASSANDRA)},
{"database.cassandra.replication_factor",
ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.replication_factor",
ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)},
{"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.port",
ConfigValue{ConfigType::Integer}.withConstraint(gValidatePort).optional()},
{"database.cassandra.replication_factor",
ConfigValue{ConfigType::Integer}.defaultValue(3u).withConstraint(gValidateUint16)},
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.max_write_requests_outstanding",
ConfigValue{ConfigType::Integer}.defaultValue(10'000).withConstraint(gValidateUint32)},
{"database.cassandra.max_read_requests_outstanding",
ConfigValue{ConfigType::Integer}.defaultValue(100'000).withConstraint(gValidateUint32)},
{"database.cassandra.threads",
ConfigValue{ConfigType::Integer}
.defaultValue(static_cast<uint32_t>(std::thread::hardware_concurrency()))
.withConstraint(gValidateUint32)},
{"database.cassandra.core_connections_per_host",
ConfigValue{ConfigType::Integer}.defaultValue(1).withConstraint(gValidateUint16)},
{"database.cassandra.queue_size_io",
ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint16)},
{"database.cassandra.write_batch_size",
ConfigValue{ConfigType::Integer}.defaultValue(20).withConstraint(gValidateUint16)},
{"database.cassandra.connect_timeout",
ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint32)},
{"database.cassandra.request_timeout",
ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint32)},
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()},
{"migration.full_scan_threads",
ConfigValue{ConfigType::Integer}.defaultValue(2).withConstraint(gValidateUint32)},
{"migration.full_scan_jobs",
ConfigValue{ConfigType::Integer}.defaultValue(4).withConstraint(gValidateUint32)},
{"migration.cursors_per_job",
ConfigValue{ConfigType::Integer}.defaultValue(100).withConstraint(gValidateUint32)}}
};
std::shared_ptr<migration::MigrationManagerInterface> testMigrationManager_;
std::shared_ptr<CassandraMigrationTestBackend> testMigrationBackend_;
void
SetUp() override
{
setupDatabase();
}
public:
MigrationCassandraSimpleTest()
{
auto const testBundle = makeMigrationTestManagerAndBackend(cfg_);
testMigrationManager_ = testBundle.first;
testMigrationBackend_ = testBundle.second;
}
~MigrationCassandraSimpleTest() override
{
// drop the keyspace
Handle const handle{TestGlobals::instance().backendHost};
EXPECT_TRUE(handle.connect());
handle.execute("DROP KEYSPACE " + TestGlobals::instance().backendKeyspace);
}
};
// The test suite for testing the migration manager without any data in the database
struct MigrationCassandraManagerCleanDBTest : public MigrationCassandraSimpleTest {};
TEST_F(MigrationCassandraManagerCleanDBTest, GetAllMigratorNames)
{
auto const names = testMigrationManager_->allMigratorsNames();
EXPECT_EQ(names.size(), 4);
EXPECT_EQ(names[0], "ExampleObjectsMigrator");
EXPECT_EQ(names[1], "ExampleTransactionsMigrator");
EXPECT_EQ(names[2], "ExampleLedgerMigrator");
EXPECT_EQ(names[3], "ExampleDropTableMigrator");
}
TEST_F(MigrationCassandraManagerCleanDBTest, AllMigratorStatusBeforeAnyMigration)
{
auto const status = testMigrationManager_->allMigratorsStatusPairs();
EXPECT_EQ(status.size(), 4);
EXPECT_EQ(std::get<1>(status[0]), MigratorStatus::NotMigrated);
EXPECT_EQ(std::get<1>(status[1]), MigratorStatus::NotMigrated);
EXPECT_EQ(std::get<1>(status[2]), MigratorStatus::NotMigrated);
EXPECT_EQ(std::get<1>(status[3]), MigratorStatus::NotMigrated);
}
TEST_F(MigrationCassandraManagerCleanDBTest, MigratorStatus)
{
auto status = testMigrationManager_->getMigratorStatusByName("ExampleObjectsMigrator");
EXPECT_EQ(status, MigratorStatus::NotMigrated);
status = testMigrationManager_->getMigratorStatusByName("ExampleTransactionsMigrator");
EXPECT_EQ(status, MigratorStatus::NotMigrated);
status = testMigrationManager_->getMigratorStatusByName("ExampleLedgerMigrator");
EXPECT_EQ(status, MigratorStatus::NotMigrated);
status = testMigrationManager_->getMigratorStatusByName("ExampleDropTableMigrator");
EXPECT_EQ(status, MigratorStatus::NotMigrated);
status = testMigrationManager_->getMigratorStatusByName("NonExistentMigrator");
EXPECT_EQ(status, MigratorStatus::NotKnown);
}
// The test suite for testing migration process for ExampleTransactionsMigrator. In this test suite,
// the transactions are written to the database before running the migration
class MigrationCassandraManagerTxTableTest : public MigrationCassandraSimpleTest {
void
setupDatabase() override
{
Handle const handle{TestGlobals::instance().backendHost};
EXPECT_TRUE(handle.connect());
std::ranges::for_each(gTransactionsRawData, [&](auto const& value) {
writeTxFromCSVString(TestGlobals::instance().backendKeyspace, value, handle);
});
}
};
TEST_F(MigrationCassandraManagerTxTableTest, MigrateExampleTransactionsMigrator)
{
constexpr auto kTRANSACTIONS_MIGRATOR_NAME = "ExampleTransactionsMigrator";
EXPECT_EQ(
testMigrationManager_->getMigratorStatusByName(kTRANSACTIONS_MIGRATOR_NAME),
MigratorStatus::NotMigrated
);
ExampleTransactionsMigrator::count = 0;
testMigrationManager_->runMigration(kTRANSACTIONS_MIGRATOR_NAME);
EXPECT_EQ(ExampleTransactionsMigrator::count, gTransactionsRawData.size());
auto const newTableSize = data::synchronous([&](auto ctx) {
return testMigrationBackend_->fetchTxIndexTableSize(ctx);
});
EXPECT_TRUE(newTableSize.has_value());
EXPECT_EQ(newTableSize, gTransactionsRawData.size());
// check a few tx types
auto const getTxType = [&](ripple::uint256 const& txHash) -> std::optional<std::string> {
return data::synchronous([&](auto ctx) {
return testMigrationBackend_->fetchTxTypeViaID(uint256ToString(txHash), ctx);
});
};
auto txType = getTxType(
ripple::uint256("CEECF7E516F8A53C5D32A357B737ED54D3186FDD510B1973D908AD8D93AD8E00")
);
EXPECT_TRUE(txType.has_value());
EXPECT_EQ(txType.value(), "OracleSet");
txType = getTxType(
ripple::uint256("35DBFB1A88DE17EBD2BCE37F6E1FD6D3B9887C92B7933ED2FCF2A84E9138B7CA")
);
EXPECT_TRUE(txType.has_value());
EXPECT_EQ(txType.value(), "Payment");
txType = getTxType(
ripple::uint256("FCACE9D00625FA3BCC5316078324EA153EC8551243AC1701D496CC1CA2B8A474")
);
EXPECT_TRUE(txType.has_value());
EXPECT_EQ(txType.value(), "AMMCreate");
EXPECT_EQ(
testMigrationManager_->getMigratorStatusByName(kTRANSACTIONS_MIGRATOR_NAME),
MigratorStatus::Migrated
);
}
// The test suite for testing migration process for ExampleObjectsMigrator. In this test suite, the
// objects are written to the database before running the migration
class MigrationCassandraManagerObjectsTableTest : public MigrationCassandraSimpleTest {
void
setupDatabase() override
{
Handle const handle{TestGlobals::instance().backendHost};
EXPECT_TRUE(handle.connect());
for (auto const& value : gObjectsRawData) {
writeObjectFromCSVString(TestGlobals::instance().backendKeyspace, value, handle);
}
}
};
TEST_F(MigrationCassandraManagerObjectsTableTest, MigrateExampleObjectsMigrator)
{
constexpr auto kOBJECTS_MIGRATOR_NAME = "ExampleObjectsMigrator";
EXPECT_EQ(
testMigrationManager_->getMigratorStatusByName(kOBJECTS_MIGRATOR_NAME),
MigratorStatus::NotMigrated
);
testMigrationManager_->runMigration(kOBJECTS_MIGRATOR_NAME);
EXPECT_EQ(ExampleObjectsMigrator::count, gObjectsRawData.size());
EXPECT_EQ(ExampleObjectsMigrator::accountCount, 37);
EXPECT_EQ(
testMigrationManager_->getMigratorStatusByName(kOBJECTS_MIGRATOR_NAME),
MigratorStatus::Migrated
);
}
// The test suite for testing migration process for ExampleLedgerMigrator. In this test suite, the
// ledger headers are written to ledgers table before running the migration
class MigrationCassandraManagerLedgerTableTest : public MigrationCassandraSimpleTest {
void
setupDatabase() override
{
Handle const handle{TestGlobals::instance().backendHost};
EXPECT_TRUE(handle.connect());
for (auto const& value : gLedgerHeaderRawData) {
writeLedgerFromCSVString(TestGlobals::instance().backendKeyspace, value, handle);
}
writeLedgerRange(TestGlobals::instance().backendKeyspace, 5619393, 5619442, handle);
}
};
TEST_F(MigrationCassandraManagerLedgerTableTest, MigrateExampleLedgerMigrator)
{
constexpr auto kHEADER_MIGRATOR_NAME = "ExampleLedgerMigrator";
EXPECT_EQ(
testMigrationManager_->getMigratorStatusByName(kHEADER_MIGRATOR_NAME),
MigratorStatus::NotMigrated
);
testMigrationManager_->runMigration(kHEADER_MIGRATOR_NAME);
EXPECT_EQ(
testMigrationManager_->getMigratorStatusByName(kHEADER_MIGRATOR_NAME),
MigratorStatus::Migrated
);
auto const newTableSize = data::synchronous([&](auto ctx) {
return testMigrationBackend_->fetchLedgerTableSize(ctx);
});
EXPECT_EQ(newTableSize, gLedgerHeaderRawData.size());
auto const getAccountHash = [this](std::uint32_t seq) {
return data::synchronous([&](auto ctx) {
return testMigrationBackend_->fetchAccountHashViaSequence(seq, ctx);
});
};
EXPECT_EQ(
getAccountHash(5619393),
ripple::uint256("D1DE0F83A6858DF52811E31FE97B8449A4DD55A7D9E0023FE5DC2B335E4C49E8")
);
EXPECT_EQ(
getAccountHash(5619394),
ripple::uint256("3FEF485204772F03842AA8757B4631E8F146E17AD9762E0552540A517DD38A24")
);
EXPECT_EQ(
getAccountHash(5619395),
ripple::uint256("D0A61C158AD8941868666AD51C4662EEAAA2A141BF0F4435BC22B9BC6783AF65")
);
}
// The test suite for testing migration process for ExampleDropTableMigrator.
class MigrationCassandraManagerDropTableTest : public MigrationCassandraSimpleTest {};
TEST_F(MigrationCassandraManagerDropTableTest, MigrateDropTableMigrator)
{
constexpr auto kDROP_TABLE_MIGRATOR_NAME = "ExampleDropTableMigrator";
EXPECT_EQ(
testMigrationManager_->getMigratorStatusByName(kDROP_TABLE_MIGRATOR_NAME),
MigratorStatus::NotMigrated
);
auto const beforeDropSize =
data::synchronous([&](auto ctx) { return testMigrationBackend_->fetchDiffTableSize(ctx); });
EXPECT_EQ(beforeDropSize, 0);
testMigrationManager_->runMigration(kDROP_TABLE_MIGRATOR_NAME);
EXPECT_EQ(
testMigrationManager_->getMigratorStatusByName(kDROP_TABLE_MIGRATOR_NAME),
MigratorStatus::Migrated
);
auto const newTableSize =
data::synchronous([&](auto ctx) { return testMigrationBackend_->fetchDiffTableSize(ctx); });
EXPECT_EQ(newTableSize, std::nullopt);
}