Files
clio/tests/integration/migration/cassandra/CassandraMigrationTestBackend.hpp
2026-03-24 15:25:32 +00:00

352 lines
11 KiB
C++

#pragma once
#include "data/LedgerCacheInterface.hpp"
#include "data/cassandra/Handle.hpp"
#include "data/cassandra/Schema.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "data/cassandra/Types.hpp"
#include "migration/cassandra/CassandraMigrationBackend.hpp"
#include <boost/asio/spawn.hpp>
#include <fmt/format.h>
#include <xrpl/basics/base_uint.h>
#include <cstdint>
#include <exception>
#include <optional>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>
/**
* @brief Test backend for Cassandra migration. The class is mainly to provide an example of how to
* add the needed backend for the migrator. It is used in integration tests to provide the backend
* for the example migrators. In production, the backend code should be added to
* CassandraMigrationBackend directly.
*/
class CassandraMigrationTestBackend : public migration::cassandra::CassandraMigrationBackend {
data::cassandra::SettingsProvider settingsProvider_;
public:
/**
* @brief Construct a new Cassandra Migration Test Backend object
*
* @param settingsProvider The settings provider for the Cassandra backend
* @param cache The ledger cache to use
*/
CassandraMigrationTestBackend(
data::cassandra::SettingsProvider settingsProvider,
data::LedgerCacheInterface& cache
)
: migration::cassandra::CassandraMigrationBackend(settingsProvider, cache)
, settingsProvider_(std::move(settingsProvider))
{
if (auto const res = handle_.executeEach(createTablesSchema()); not res)
throw std::runtime_error("Could not create schema: " + res.error());
}
/**
* @brief Write a transaction hash and its transaction type to the tx_index_example table. It's
* used by ExampleTransactionsMigrator.
*
* @param hash The transaction hash
* @param txType The transaction type
*/
void
writeTxIndexExample(std::string const& hash, std::string const& txType)
{
static auto kINSERT_TX_INDEX_EXAMPLE = [this]() {
return handle_.prepare(
fmt::format(
R"(
INSERT INTO {}
(hash, tx_type)
VALUES (?, ?)
)",
data::cassandra::qualifiedTableName(settingsProvider_, "tx_index_example")
)
);
}();
executor_.writeSync(kINSERT_TX_INDEX_EXAMPLE.bind(hash, data::cassandra::Text(txType)));
}
/**
* @brief Fetch the transaction type via transaction hash from the tx_index_example table. It's
* used by ExampleTransactionsMigrator validation.
*
* @param hash The transaction hash
* @param ctx The boost asio context
* @return The transaction type if found, otherwise std::nullopt
*/
std::optional<std::string>
fetchTxTypeViaID(std::string const& hash, boost::asio::yield_context ctx)
{
static auto kFETCH_TX_TYPE = [this]() {
return handle_.prepare(
fmt::format(
R"(
SELECT tx_type FROM {} WHERE hash = ?
)",
data::cassandra::qualifiedTableName(settingsProvider_, "tx_index_example")
)
);
}();
auto const res = executor_.read(ctx, kFETCH_TX_TYPE.bind(hash));
if (not res) {
return std::nullopt;
}
auto const& result = res.value();
if (not result.hasRows()) {
return std::nullopt;
}
for (auto const& [txType] : data::cassandra::extract<std::string>(result)) {
return txType;
}
return std::nullopt;
}
/**
* @brief Fetch the transaction index table size. It's used by ExampleTransactionsMigrator
* validation.
*
* @param ctx The boost asio context
* @return The size of the transaction index table if found, otherwise std::nullopt
*/
std::optional<std::uint64_t>
fetchTxIndexTableSize(boost::asio::yield_context ctx)
{
static auto kINSERT_TX_INDEX_EXAMPLE = [this]() {
return handle_.prepare(
fmt::format(
R"(
SELECT COUNT(*) FROM {}
)",
data::cassandra::qualifiedTableName(settingsProvider_, "tx_index_example")
)
);
}();
// This function will be called after table being dropped, catch the exception
try {
auto const res = executor_.read(ctx, kINSERT_TX_INDEX_EXAMPLE);
if (not res) {
return std::nullopt;
}
auto const& result = res.value();
if (not result.hasRows()) {
return std::nullopt;
}
for (auto const& [size] : data::cassandra::extract<std::uint64_t>(result)) {
return size;
}
} catch (std::exception& e) {
return std::nullopt;
}
return std::nullopt;
}
/**
*@brief Write the ledger account hash to the ledger_example table. It's used by
* ExampleLedgerMigrator.
*
* @param sequence The ledger sequence
* @param accountHash The account hash
*/
void
writeLedgerAccountHash(std::uint64_t sequence, std::string const& accountHash)
{
static auto kINSERT_LEDGER_EXAMPLE = [this]() {
return handle_.prepare(
fmt::format(
R"(
INSERT INTO {}
(sequence, account_hash)
VALUES (?, ?)
)",
data::cassandra::qualifiedTableName(settingsProvider_, "ledger_example")
)
);
}();
executor_.writeSync(kINSERT_LEDGER_EXAMPLE.bind(sequence, accountHash));
}
/**
* @brief Fetch the account hash via ledger sequence from the ledger_example table. It's used by
* ExampleLedgerMigrator validation.
*
* @param sequence The ledger sequence
* @param ctx The boost asio context
* @return The account hash if found, otherwise std::nullopt
*/
std::optional<ripple::uint256>
fetchAccountHashViaSequence(std::uint64_t sequence, boost::asio::yield_context ctx)
{
static auto kFETCH_ACCOUNT_HASH = [this]() {
return handle_.prepare(
fmt::format(
R"(
SELECT account_hash FROM {} WHERE sequence = ?
)",
data::cassandra::qualifiedTableName(settingsProvider_, "ledger_example")
)
);
}();
auto const res = executor_.read(ctx, kFETCH_ACCOUNT_HASH.bind(sequence));
if (not res) {
return std::nullopt;
}
auto const& result = res.value();
if (not result.hasRows()) {
return std::nullopt;
}
for (auto const& [accountHash] : data::cassandra::extract<ripple::uint256>(result)) {
return accountHash;
}
return std::nullopt;
}
/**
* @brief Fetch the ledger example table size. It's used by ExampleLedgerMigrator validation.
*
* @param ctx The boost asio context
* @return The size of the ledger example table if found, otherwise std::nullopt
*/
std::optional<std::uint64_t>
fetchLedgerTableSize(boost::asio::yield_context ctx)
{
static auto kINSERT_LEDGER_EXAMPLE = [this]() {
return handle_.prepare(
fmt::format(
R"(
SELECT COUNT(*) FROM {}
)",
data::cassandra::qualifiedTableName(settingsProvider_, "ledger_example")
)
);
}();
// This function will be called after table being dropped, catch the exception
try {
auto const res = executor_.read(ctx, kINSERT_LEDGER_EXAMPLE);
if (not res) {
return std::nullopt;
}
auto const& result = res.value();
if (not result.hasRows()) {
return std::nullopt;
}
for (auto const& [size] : data::cassandra::extract<std::uint64_t>(result)) {
return size;
}
} catch (std::exception& e) {
return std::nullopt;
}
return std::nullopt;
}
/**
* @brief Drop the diff table. It's used by ExampleDropTableMigrator.
*
* @return The result of the operation
*/
auto
dropDiffTable()
{
return handle_.execute(
fmt::format(
R"(
DROP TABLE IF EXISTS {}
)",
data::cassandra::qualifiedTableName(settingsProvider_, "diff")
)
);
}
/**
* @brief Fetch the diff table size. It's used by ExampleDropTableMigrator validation.
*
* @param ctx The boost asio context
* @return The size of the diff table if found, otherwise std::nullopt
*/
std::optional<std::uint64_t>
fetchDiffTableSize(boost::asio::yield_context ctx)
{
static auto kCOUNT_DIFF = [this]() {
return handle_.prepare(
fmt::format(
R"(
SELECT COUNT(*) FROM {}
)",
data::cassandra::qualifiedTableName(settingsProvider_, "diff")
)
);
}();
// This function will be called after table being dropped, catch the exception
try {
auto const res = executor_.read(ctx, kCOUNT_DIFF);
if (not res) {
return std::nullopt;
}
auto const& result = res.value();
if (not result.hasRows()) {
return std::nullopt;
}
for (auto const& [size] : data::cassandra::extract<std::uint64_t>(result)) {
return size;
}
} catch (std::exception& e) {
return std::nullopt;
}
return std::nullopt;
}
private:
std::vector<data::cassandra::Statement>
createTablesSchema()
{
std::vector<data::cassandra::Statement> statements;
statements.emplace_back(
fmt::format(
R"(
CREATE TABLE IF NOT EXISTS {}
(
hash blob,
tx_type text,
PRIMARY KEY (hash)
)
)",
data::cassandra::qualifiedTableName(settingsProvider_, "tx_index_example")
)
);
statements.emplace_back(
fmt::format(
R"(
CREATE TABLE IF NOT EXISTS {}
(
sequence bigint,
account_hash blob,
PRIMARY KEY (sequence)
)
)",
data::cassandra::qualifiedTableName(settingsProvider_, "ledger_example")
)
);
return statements;
}
};