feat: Migration framework (#1768)

This PR implemented the migration framework, which contains the command
line interface to execute migration and helps to migrate data easily.
Please read README.md for more information about this framework.
This commit is contained in:
cyan317
2024-12-17 14:50:51 +00:00
committed by GitHub
parent 15a441b084
commit 8dc7f16ef1
69 changed files with 4395 additions and 17 deletions

View File

@@ -0,0 +1,348 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp"
#include "data/cassandra/Handle.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "migration/MigrationManagerInterface.hpp"
#include "migration/MigratiorStatus.hpp"
#include "migration/cassandra/CassandraMigrationTestBackend.hpp"
#include "migration/cassandra/DBRawData.hpp"
#include "migration/cassandra/ExampleDropTableMigrator.hpp"
#include "migration/cassandra/ExampleLedgerMigrator.hpp"
#include "migration/cassandra/ExampleObjectsMigrator.hpp"
#include "migration/cassandra/ExampleTransactionsMigrator.hpp"
#include "migration/impl/MigrationManagerBase.hpp"
#include "migration/impl/MigratorsRegister.hpp"
#include "util/CassandraDBHelper.hpp"
#include "util/LoggerFixtures.hpp"
#include "util/MockPrometheus.hpp"
#include "util/newconfig/ConfigConstraints.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
#include "util/newconfig/ConfigValue.hpp"
#include "util/newconfig/Types.hpp"
#include <TestGlobals.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/value.hpp>
#include <fmt/core.h>
#include <gtest/gtest.h>
#include <xrpl/basics/base_uint.h>
#include <algorithm>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <thread>
#include <utility>
using namespace util;
using namespace std;
using namespace prometheus;
using namespace data::cassandra;
using namespace migration;
using namespace util::config;
// Register the migrators
using CassandraSupportedTestMigrators = migration::impl::MigratorsRegister<
CassandraMigrationTestBackend,
ExampleObjectsMigrator,
ExampleTransactionsMigrator,
ExampleLedgerMigrator,
ExampleDropTableMigrator>;
// Define the test migration manager
using CassandraMigrationTestManager = migration::impl::MigrationManagerBase<CassandraSupportedTestMigrators>;
namespace {
std::pair<std::shared_ptr<migration::MigrationManagerInterface>, std::shared_ptr<CassandraMigrationTestBackend>>
make_MigrationTestManagerAndBackend(ClioConfigDefinition const& config)
{
auto const cfg = config.getObject("database.cassandra");
auto const backendPtr = std::make_shared<CassandraMigrationTestBackend>(data::cassandra::SettingsProvider{cfg});
return std::make_pair(
std::make_shared<CassandraMigrationTestManager>(backendPtr, config.getObject("migration")), backendPtr
);
}
} // namespace
class MigrationCassandraSimpleTest : public WithPrometheus, public NoLoggerFixture {
// This function is used to prepare the database before running the tests
// It is called in the SetUp function. Different tests can override this function to prepare the database
// differently
virtual void
setupDatabase()
{
}
protected:
ClioConfigDefinition cfg{
{{"database.type", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
{"database.cassandra.contact_points",
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)},
{"database.cassandra.keyspace",
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)},
{"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.port", ConfigValue{ConfigType::Integer}.withConstraint(validatePort).optional()},
{"database.cassandra.replication_factor",
ConfigValue{ConfigType::Integer}.defaultValue(3u).withConstraint(validateUint16)},
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.max_write_requests_outstanding",
ConfigValue{ConfigType::Integer}.defaultValue(10'000).withConstraint(validateUint32)},
{"database.cassandra.max_read_requests_outstanding",
ConfigValue{ConfigType::Integer}.defaultValue(100'000).withConstraint(validateUint32)},
{"database.cassandra.threads",
ConfigValue{ConfigType::Integer}
.defaultValue(static_cast<uint32_t>(std::thread::hardware_concurrency()))
.withConstraint(validateUint32)},
{"database.cassandra.core_connections_per_host",
ConfigValue{ConfigType::Integer}.defaultValue(1).withConstraint(validateUint16)},
{"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional().withConstraint(validateUint16)
},
{"database.cassandra.write_batch_size",
ConfigValue{ConfigType::Integer}.defaultValue(20).withConstraint(validateUint16)},
{"database.cassandra.connect_timeout",
ConfigValue{ConfigType::Integer}.optional().withConstraint(validateUint32)},
{"database.cassandra.request_timeout",
ConfigValue{ConfigType::Integer}.optional().withConstraint(validateUint32)},
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()},
{"migration.full_scan_threads", ConfigValue{ConfigType::Integer}.defaultValue(2).withConstraint(validateUint32)
},
{"migration.full_scan_jobs", ConfigValue{ConfigType::Integer}.defaultValue(4).withConstraint(validateUint32)},
{"migration.cursors_per_job", ConfigValue{ConfigType::Integer}.defaultValue(100).withConstraint(validateUint32)
}}
};
std::shared_ptr<migration::MigrationManagerInterface> testMigrationManager;
std::shared_ptr<CassandraMigrationTestBackend> testMigrationBackend;
MigrationCassandraSimpleTest()
{
auto const testBundle = make_MigrationTestManagerAndBackend(cfg);
testMigrationManager = testBundle.first;
testMigrationBackend = testBundle.second;
}
void
SetUp() override
{
setupDatabase();
}
void
TearDown() override
{
// drop the keyspace
Handle const handle{TestGlobals::instance().backendHost};
EXPECT_TRUE(handle.connect());
handle.execute("DROP KEYSPACE " + TestGlobals::instance().backendKeyspace);
}
};
// The test suite for testing the migration manager without any data in the database
struct MigrationCassandraManagerCleanDBTest : public MigrationCassandraSimpleTest {};
TEST_F(MigrationCassandraManagerCleanDBTest, GetAllMigratorNames)
{
auto const names = testMigrationManager->allMigratorsNames();
EXPECT_EQ(names.size(), 4);
EXPECT_EQ(names[0], "ExampleObjectsMigrator");
EXPECT_EQ(names[1], "ExampleTransactionsMigrator");
EXPECT_EQ(names[2], "ExampleLedgerMigrator");
EXPECT_EQ(names[3], "ExampleDropTableMigrator");
}
TEST_F(MigrationCassandraManagerCleanDBTest, AllMigratorStatusBeforeAnyMigration)
{
auto const status = testMigrationManager->allMigratorsStatusPairs();
EXPECT_EQ(status.size(), 4);
EXPECT_EQ(std::get<1>(status[0]), MigratorStatus::NotMigrated);
EXPECT_EQ(std::get<1>(status[1]), MigratorStatus::NotMigrated);
EXPECT_EQ(std::get<1>(status[2]), MigratorStatus::NotMigrated);
EXPECT_EQ(std::get<1>(status[3]), MigratorStatus::NotMigrated);
}
TEST_F(MigrationCassandraManagerCleanDBTest, MigratorStatus)
{
auto status = testMigrationManager->getMigratorStatusByName("ExampleObjectsMigrator");
EXPECT_EQ(status, MigratorStatus::NotMigrated);
status = testMigrationManager->getMigratorStatusByName("ExampleTransactionsMigrator");
EXPECT_EQ(status, MigratorStatus::NotMigrated);
status = testMigrationManager->getMigratorStatusByName("ExampleLedgerMigrator");
EXPECT_EQ(status, MigratorStatus::NotMigrated);
status = testMigrationManager->getMigratorStatusByName("ExampleDropTableMigrator");
EXPECT_EQ(status, MigratorStatus::NotMigrated);
status = testMigrationManager->getMigratorStatusByName("NonExistentMigrator");
EXPECT_EQ(status, MigratorStatus::NotKnown);
}
// The test suite for testing migration process for ExampleTransactionsMigrator. In this test suite, the transactions
// are written to the database before running the migration
class MigrationCassandraManagerTxTableTest : public MigrationCassandraSimpleTest {
void
setupDatabase() override
{
Handle const handle{TestGlobals::instance().backendHost};
EXPECT_TRUE(handle.connect());
std::for_each(std::begin(TransactionsRawData), std::end(TransactionsRawData), [&](auto const& value) {
writeTxFromCSVString(TestGlobals::instance().backendKeyspace, value, handle);
});
}
};
TEST_F(MigrationCassandraManagerTxTableTest, MigrateExampleTransactionsMigrator)
{
auto constexpr TransactionsMigratorName = "ExampleTransactionsMigrator";
EXPECT_EQ(testMigrationManager->getMigratorStatusByName(TransactionsMigratorName), MigratorStatus::NotMigrated);
ExampleTransactionsMigrator::count = 0;
testMigrationManager->runMigration(TransactionsMigratorName);
EXPECT_EQ(ExampleTransactionsMigrator::count, TransactionsRawData.size());
auto const newTableSize =
data::synchronous([&](auto ctx) { return testMigrationBackend->fetchTxIndexTableSize(ctx); });
EXPECT_TRUE(newTableSize.has_value());
EXPECT_EQ(newTableSize, TransactionsRawData.size());
// check a few tx types
auto const getTxType = [&](ripple::uint256 const& txHash) -> std::optional<std::string> {
return data::synchronous([&](auto ctx) {
return testMigrationBackend->fetchTxTypeViaID(uint256ToString(txHash), ctx);
});
};
auto txType = getTxType(ripple::uint256("CEECF7E516F8A53C5D32A357B737ED54D3186FDD510B1973D908AD8D93AD8E00"));
EXPECT_TRUE(txType.has_value());
EXPECT_EQ(txType.value(), "OracleSet");
txType = getTxType(ripple::uint256("35DBFB1A88DE17EBD2BCE37F6E1FD6D3B9887C92B7933ED2FCF2A84E9138B7CA"));
EXPECT_TRUE(txType.has_value());
EXPECT_EQ(txType.value(), "Payment");
txType = getTxType(ripple::uint256("FCACE9D00625FA3BCC5316078324EA153EC8551243AC1701D496CC1CA2B8A474"));
EXPECT_TRUE(txType.has_value());
EXPECT_EQ(txType.value(), "AMMCreate");
EXPECT_EQ(testMigrationManager->getMigratorStatusByName(TransactionsMigratorName), MigratorStatus::Migrated);
}
// The test suite for testing migration process for ExampleObjectsMigrator. In this test suite, the objects are written
// to the database before running the migration
class MigrationCassandraManagerObjectsTableTest : public MigrationCassandraSimpleTest {
void
setupDatabase() override
{
Handle const handle{TestGlobals::instance().backendHost};
EXPECT_TRUE(handle.connect());
for (auto const& value : ObjectsRawData) {
writeObjectFromCSVString(TestGlobals::instance().backendKeyspace, value, handle);
}
}
};
TEST_F(MigrationCassandraManagerObjectsTableTest, MigrateExampleObjectsMigrator)
{
auto constexpr ObjectsMigratorName = "ExampleObjectsMigrator";
EXPECT_EQ(testMigrationManager->getMigratorStatusByName(ObjectsMigratorName), MigratorStatus::NotMigrated);
testMigrationManager->runMigration(ObjectsMigratorName);
EXPECT_EQ(ExampleObjectsMigrator::count, ObjectsRawData.size());
EXPECT_EQ(ExampleObjectsMigrator::accountCount, 37);
EXPECT_EQ(testMigrationManager->getMigratorStatusByName(ObjectsMigratorName), MigratorStatus::Migrated);
}
// The test suite for testing migration process for ExampleLedgerMigrator. In this test suite, the ledger headers are
// written to ledgers table before running the migration
class MigrationCassandraManagerLedgerTableTest : public MigrationCassandraSimpleTest {
void
setupDatabase() override
{
Handle const handle{TestGlobals::instance().backendHost};
EXPECT_TRUE(handle.connect());
for (auto const& value : LedgerHeaderRawData) {
writeLedgerFromCSVString(TestGlobals::instance().backendKeyspace, value, handle);
}
writeLedgerRange(TestGlobals::instance().backendKeyspace, 5619393, 5619442, handle);
}
};
TEST_F(MigrationCassandraManagerLedgerTableTest, MigrateExampleLedgerMigrator)
{
auto constexpr HeaderMigratorName = "ExampleLedgerMigrator";
EXPECT_EQ(testMigrationManager->getMigratorStatusByName(HeaderMigratorName), MigratorStatus::NotMigrated);
testMigrationManager->runMigration(HeaderMigratorName);
EXPECT_EQ(testMigrationManager->getMigratorStatusByName(HeaderMigratorName), MigratorStatus::Migrated);
auto const newTableSize =
data::synchronous([&](auto ctx) { return testMigrationBackend->fetchLedgerTableSize(ctx); });
EXPECT_EQ(newTableSize, LedgerHeaderRawData.size());
auto const getAccountHash = [this](std::uint32_t seq) {
return data::synchronous([&](auto ctx) { return testMigrationBackend->fetchAccountHashViaSequence(seq, ctx); });
};
EXPECT_EQ(
getAccountHash(5619393), ripple::uint256("D1DE0F83A6858DF52811E31FE97B8449A4DD55A7D9E0023FE5DC2B335E4C49E8")
);
EXPECT_EQ(
getAccountHash(5619394), ripple::uint256("3FEF485204772F03842AA8757B4631E8F146E17AD9762E0552540A517DD38A24")
);
EXPECT_EQ(
getAccountHash(5619395), ripple::uint256("D0A61C158AD8941868666AD51C4662EEAAA2A141BF0F4435BC22B9BC6783AF65")
);
}
// The test suite for testing migration process for ExampleDropTableMigrator.
class MigrationCassandraManagerDropTableTest : public MigrationCassandraSimpleTest {};
TEST_F(MigrationCassandraManagerDropTableTest, MigrateDropTableMigrator)
{
auto constexpr DropTableMigratorName = "ExampleDropTableMigrator";
EXPECT_EQ(testMigrationManager->getMigratorStatusByName(DropTableMigratorName), MigratorStatus::NotMigrated);
auto const beforeDropSize =
data::synchronous([&](auto ctx) { return testMigrationBackend->fetchDiffTableSize(ctx); });
EXPECT_EQ(beforeDropSize, 0);
testMigrationManager->runMigration(DropTableMigratorName);
EXPECT_EQ(testMigrationManager->getMigratorStatusByName(DropTableMigratorName), MigratorStatus::Migrated);
auto const newTableSize =
data::synchronous([&](auto ctx) { return testMigrationBackend->fetchDiffTableSize(ctx); });
EXPECT_EQ(newTableSize, std::nullopt);
}

View File

@@ -0,0 +1,342 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/cassandra/Handle.hpp"
#include "data/cassandra/Schema.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "data/cassandra/Types.hpp"
#include "migration/cassandra/CassandraMigrationBackend.hpp"
#include <boost/asio/spawn.hpp>
#include <fmt/core.h>
#include <xrpl/basics/base_uint.h>
#include <cstdint>
#include <exception>
#include <optional>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>
/**
* @brief Test backend for Cassandra migration. The class is mainly to provide an example of how to add the needed
* backend for the migrator. It is used in integration tests to provide the backend for the example migrators. In
* production, the backend code should be added to CassandraMigrationBackend directly.
*/
class CassandraMigrationTestBackend : public migration::cassandra::CassandraMigrationBackend {
data::cassandra::SettingsProvider settingsProvider_;
public:
/**
* @brief Construct a new Cassandra Migration Test Backend object
*
* @param settingsProvider The settings provider for the Cassandra backend
*/
CassandraMigrationTestBackend(data::cassandra::SettingsProvider settingsProvider)
: migration::cassandra::CassandraMigrationBackend(settingsProvider)
, settingsProvider_(std::move(settingsProvider))
{
if (auto const res = handle_.executeEach(createTablesSchema()); not res)
throw std::runtime_error("Could not create schema: " + res.error());
}
/**
* @brief Write a transaction hash and its transaction type to the tx_index_example table. It's used by
* ExampleTransactionsMigrator.
*
* @param hash The transaction hash
* @param txType The transaction type
*/
void
writeTxIndexExample(std::string const& hash, std::string const& txType)
{
auto static insertTxIndexExample = [this]() {
return handle_.prepare(fmt::format(
R"(
INSERT INTO {}
(hash, tx_type)
VALUES (?, ?)
)",
data::cassandra::qualifiedTableName(settingsProvider_, "tx_index_example")
));
}();
executor_.writeSync(insertTxIndexExample.bind(hash, data::cassandra::Text(txType)));
}
/**
* @brief Fetch the transaction type via transaction hash from the tx_index_example table. It's used by
* ExampleTransactionsMigrator validation.
*
* @param hash The transaction hash
* @param ctx The boost asio context
* @return The transaction type if found, otherwise std::nullopt
*/
std::optional<std::string>
fetchTxTypeViaID(std::string const& hash, boost::asio::yield_context ctx)
{
auto static fetchTxType = [this]() {
return handle_.prepare(fmt::format(
R"(
SELECT tx_type FROM {} WHERE hash = ?
)",
data::cassandra::qualifiedTableName(settingsProvider_, "tx_index_example")
));
}();
auto const res = executor_.read(ctx, fetchTxType.bind(hash));
if (not res) {
return std::nullopt;
}
auto const& result = res.value();
if (not result.hasRows()) {
return std::nullopt;
}
for (auto const& [txType] : data::cassandra::extract<std::string>(result)) {
return txType;
}
return std::nullopt;
}
/**
* @brief Fetch the transaction index table size. It's used by ExampleTransactionsMigrator validation.
*
* @param ctx The boost asio context
* @return The size of the transaction index table if found, otherwise std::nullopt
*/
std::optional<std::uint64_t>
fetchTxIndexTableSize(boost::asio::yield_context ctx)
{
auto static insertTxIndexExample = [this]() {
return handle_.prepare(fmt::format(
R"(
SELECT COUNT(*) FROM {}
)",
data::cassandra::qualifiedTableName(settingsProvider_, "tx_index_example")
));
}();
// This function will be called after table being dropped, catch the exception
try {
auto const res = executor_.read(ctx, insertTxIndexExample);
if (not res) {
return std::nullopt;
}
auto const& result = res.value();
if (not result.hasRows()) {
return std::nullopt;
}
for (auto const& [size] : data::cassandra::extract<std::uint64_t>(result)) {
return size;
}
} catch (std::exception& e) {
return std::nullopt;
}
return std::nullopt;
}
/**
*@brief Write the ledger account hash to the ledger_example table. It's used by ExampleLedgerMigrator.
*
* @param sequence The ledger sequence
* @param accountHash The account hash
*/
void
writeLedgerAccountHash(std::uint64_t sequence, std::string const& accountHash)
{
auto static insertLedgerExample = [this]() {
return handle_.prepare(fmt::format(
R"(
INSERT INTO {}
(sequence, account_hash)
VALUES (?, ?)
)",
data::cassandra::qualifiedTableName(settingsProvider_, "ledger_example")
));
}();
executor_.writeSync(insertLedgerExample.bind(sequence, accountHash));
}
/**
* @brief Fetch the account hash via ledger sequence from the ledger_example table. It's used by
* ExampleLedgerMigrator validation.
*
* @param sequence The ledger sequence
* @param ctx The boost asio context
* @return The account hash if found, otherwise std::nullopt
*/
std::optional<ripple::uint256>
fetchAccountHashViaSequence(std::uint64_t sequence, boost::asio::yield_context ctx)
{
auto static fetchAccountHash = [this]() {
return handle_.prepare(fmt::format(
R"(
SELECT account_hash FROM {} WHERE sequence = ?
)",
data::cassandra::qualifiedTableName(settingsProvider_, "ledger_example")
));
}();
auto const res = executor_.read(ctx, fetchAccountHash.bind(sequence));
if (not res) {
return std::nullopt;
}
auto const& result = res.value();
if (not result.hasRows()) {
return std::nullopt;
}
for (auto const& [accountHash] : data::cassandra::extract<ripple::uint256>(result)) {
return accountHash;
}
return std::nullopt;
}
/**
* @brief Fetch the ledger example table size. It's used by ExampleLedgerMigrator validation.
*
* @param ctx The boost asio context
* @return The size of the ledger example table if found, otherwise std::nullopt
*/
std::optional<std::uint64_t>
fetchLedgerTableSize(boost::asio::yield_context ctx)
{
auto static insertLedgerExample = [this]() {
return handle_.prepare(fmt::format(
R"(
SELECT COUNT(*) FROM {}
)",
data::cassandra::qualifiedTableName(settingsProvider_, "ledger_example")
));
}();
// This function will be called after table being dropped, catch the exception
try {
auto const res = executor_.read(ctx, insertLedgerExample);
if (not res) {
return std::nullopt;
}
auto const& result = res.value();
if (not result.hasRows()) {
return std::nullopt;
}
for (auto const& [size] : data::cassandra::extract<std::uint64_t>(result)) {
return size;
}
} catch (std::exception& e) {
return std::nullopt;
}
return std::nullopt;
}
/**
* @brief Drop the diff table. It's used by ExampleDropTableMigrator.
*
* @return The result of the operation
*/
auto
dropDiffTable()
{
return handle_.execute(fmt::format(
R"(
DROP TABLE IF EXISTS {}
)",
data::cassandra::qualifiedTableName(settingsProvider_, "diff")
));
}
/**
* @brief Fetch the diff table size. It's used by ExampleDropTableMigrator validation.
*
* @param ctx The boost asio context
* @return The size of the diff table if found, otherwise std::nullopt
*/
std::optional<std::uint64_t>
fetchDiffTableSize(boost::asio::yield_context ctx)
{
auto static countDiff = [this]() {
return handle_.prepare(fmt::format(
R"(
SELECT COUNT(*) FROM {}
)",
data::cassandra::qualifiedTableName(settingsProvider_, "diff")
));
}();
// This function will be called after table being dropped, catch the exception
try {
auto const res = executor_.read(ctx, countDiff);
if (not res) {
return std::nullopt;
}
auto const& result = res.value();
if (not result.hasRows()) {
return std::nullopt;
}
for (auto const& [size] : data::cassandra::extract<std::uint64_t>(result)) {
return size;
}
} catch (std::exception& e) {
return std::nullopt;
}
return std::nullopt;
}
private:
std::vector<data::cassandra::Statement>
createTablesSchema()
{
std::vector<data::cassandra::Statement> statements;
statements.emplace_back(fmt::format(
R"(
CREATE TABLE IF NOT EXISTS {}
(
hash blob,
tx_type text,
PRIMARY KEY (hash)
)
)",
data::cassandra::qualifiedTableName(settingsProvider_, "tx_index_example")
));
statements.emplace_back(fmt::format(
R"(
CREATE TABLE IF NOT EXISTS {}
(
sequence bigint,
account_hash blob,
PRIMARY KEY (sequence)
)
)",
data::cassandra::qualifiedTableName(settingsProvider_, "ledger_example")
));
return statements;
}
};

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,29 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <array>
#include <string>
extern std::array<std::string, 100> TransactionsRawData;
extern std::array<std::string, 100> ObjectsRawData;
extern std::array<std::string, 50> LedgerHeaderRawData;

View File

@@ -0,0 +1,30 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "migration/cassandra/ExampleDropTableMigrator.hpp"
#include "util/newconfig/ObjectView.hpp"
#include <memory>
void
ExampleDropTableMigrator::runMigration(std::shared_ptr<Backend> const& backend, util::config::ObjectView const&)
{
backend->dropDiffTable();
}

View File

@@ -0,0 +1,39 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "migration/cassandra/CassandraMigrationTestBackend.hpp"
#include "util/newconfig/ObjectView.hpp"
#include <memory>
/**
* @brief Example migrator for dropping the table. In this example, our migrator will drop the table. The table removal
* is not reversible.
*/
struct ExampleDropTableMigrator {
using Backend = CassandraMigrationTestBackend;
static constexpr char const* name = "ExampleDropTableMigrator";
static constexpr char const* description = "The migrator for dropping the table";
static void
runMigration(std::shared_ptr<Backend> const& backend, util::config::ObjectView const& config);
};

View File

@@ -0,0 +1,49 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "migration/cassandra/ExampleLedgerMigrator.hpp"
#include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp"
#include "util/Assert.hpp"
#include "util/newconfig/ObjectView.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <memory>
void
ExampleLedgerMigrator::runMigration(std::shared_ptr<Backend> const& backend, util::config::ObjectView const&)
{
auto const range =
data::synchronous([&](boost::asio::yield_context yield) { return backend->hardFetchLedgerRange(yield); });
if (!range.has_value())
return;
data::synchronous([&](boost::asio::yield_context yield) {
for (auto seq = range->minSequence; seq <= range->maxSequence; seq++) {
auto const ledgerHeader = backend->fetchLedgerBySequence(seq, yield);
ASSERT(ledgerHeader.has_value(), "Can not find the sequence: {}", seq);
backend->writeLedgerAccountHash(seq, uint256ToString(ledgerHeader->accountHash));
}
});
}

View File

@@ -0,0 +1,41 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "migration/cassandra/CassandraMigrationTestBackend.hpp"
#include "util/newconfig/ObjectView.hpp"
#include <memory>
/**
* @brief Example migrator for the ledgers table. In this example, we show how to migrate the data from table without
* full table scan. We create an index table called "ledger_example" which maintains the map of ledger
* sequence to account hash. Because ledger sequence is the partition key of ledgers table, we can just fetch the data
* via ledger sequence without full table scan.
*/
struct ExampleLedgerMigrator {
static constexpr char const* name = "ExampleLedgerMigrator";
static constexpr char const* description = "The migrator for ledgers table";
using Backend = CassandraMigrationTestBackend;
static void
runMigration(std::shared_ptr<Backend> const& backend, util::config::ObjectView const& config);
};

View File

@@ -0,0 +1,65 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "migration/cassandra/ExampleObjectsMigrator.hpp"
#include "migration/cassandra/impl/ObjectsAdapter.hpp"
#include "migration/cassandra/impl/Types.hpp"
#include "util/newconfig/ObjectView.hpp"
#include <xrpl/basics/base_uint.h>
#include <xrpl/protocol/LedgerFormats.h>
#include <xrpl/protocol/STLedgerEntry.h>
#include <atomic>
#include <cstdint>
#include <memory>
#include <optional>
#include <unordered_set>
std::atomic_int64_t ExampleObjectsMigrator::count;
std::atomic_int64_t ExampleObjectsMigrator::accountCount;
void
ExampleObjectsMigrator::runMigration(std::shared_ptr<Backend> const& backend, util::config::ObjectView const& config)
{
auto const ctxFullScanThreads = config.get<std::uint32_t>("full_scan_threads");
auto const jobsFullScan = config.get<std::uint32_t>("full_scan_jobs");
auto const cursorPerJobsFullScan = config.get<std::uint32_t>("cursors_per_job");
std::unordered_set<ripple::uint256> idx;
migration::cassandra::impl::ObjectsScanner scaner(
{.ctxThreadsNum = ctxFullScanThreads, .jobsNum = jobsFullScan, .cursorsPerJob = cursorPerJobsFullScan},
migration::cassandra::impl::ObjectsAdapter(
backend,
[&](std::uint32_t, std::optional<ripple::SLE> sle) {
if (sle.has_value()) {
if (sle->getType() == ripple::ltACCOUNT_ROOT) {
if (!idx.contains(sle->key())) {
ExampleObjectsMigrator::accountCount++;
}
}
idx.insert(sle->key());
ExampleObjectsMigrator::count++;
}
}
)
);
scaner.wait();
}

View File

@@ -0,0 +1,46 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "migration/cassandra/CassandraMigrationTestBackend.hpp"
#include "util/newconfig/ObjectView.hpp"
#include <xrpl/protocol/STLedgerEntry.h>
#include <xrpl/protocol/STObject.h>
#include <atomic>
#include <memory>
/**
* @brief Example migrator for the objects table. In this example, we show how to traverse objects table.
* We will count the number of account root in the objects table.
*/
struct ExampleObjectsMigrator {
using Backend = CassandraMigrationTestBackend;
static constexpr char const* name = "ExampleObjectsMigrator";
static constexpr char const* description = "The migrator for objects table";
static std::atomic_int64_t count;
static std::atomic_int64_t accountCount;
static void
runMigration(std::shared_ptr<Backend> const& backend, util::config::ObjectView const& config);
};

View File

@@ -0,0 +1,69 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "migration/cassandra/ExampleTransactionsMigrator.hpp"
#include "data/DBHelpers.hpp"
#include "migration/cassandra/impl/TransactionsAdapter.hpp"
#include "migration/cassandra/impl/Types.hpp"
#include "util/newconfig/ObjectView.hpp"
#include <xrpl/basics/base_uint.h>
#include <xrpl/protocol/STBase.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TxMeta.h>
#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_set>
std::uint64_t ExampleTransactionsMigrator::count;
void
ExampleTransactionsMigrator::runMigration(
std::shared_ptr<Backend> const& backend,
util::config::ObjectView const& config
)
{
auto const ctxFullScanThreads = config.get<std::uint32_t>("full_scan_threads");
auto const jobsFullScan = config.get<std::uint32_t>("full_scan_jobs");
auto const cursorPerJobsFullScan = config.get<std::uint32_t>("cursors_per_job");
std::unordered_set<std::string> hashSet;
std::mutex mtx; // protect hashSet
migration::cassandra::impl::TransactionsScanner scaner(
{.ctxThreadsNum = ctxFullScanThreads, .jobsNum = jobsFullScan, .cursorsPerJob = cursorPerJobsFullScan},
migration::cassandra::impl::TransactionsAdapter(
backend,
[&](ripple::STTx const& tx, ripple::TxMeta const&) {
{
std::lock_guard<std::mutex> lock(mtx);
hashSet.insert(ripple::to_string(tx.getTransactionID()));
}
auto const json = tx.getJson(ripple::JsonOptions::none);
auto const txType = json["TransactionType"].asString();
backend->writeTxIndexExample(uint256ToString(tx.getTransactionID()), txType);
}
)
);
scaner.wait();
count = hashSet.size();
}

View File

@@ -0,0 +1,41 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022-2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "migration/cassandra/CassandraMigrationTestBackend.hpp"
#include "util/newconfig/ObjectView.hpp"
#include <cstdint>
#include <memory>
/**
* @brief Example migrator for the transactions table. In this example, we show how to traverse the transactions table
* and migrate the data to index table. We create an index table for transaction hash to transaction type string.
*/
struct ExampleTransactionsMigrator {
static constexpr char const* name = "ExampleTransactionsMigrator";
static constexpr char const* description = "The migrator for transactions table";
using Backend = CassandraMigrationTestBackend;
static std::uint64_t count;
static void
runMigration(std::shared_ptr<Backend> const& backend, util::config::ObjectView const& config);
};