Merge branch 'develop' into release/2.6.0

This commit is contained in:
Alex Kremer
2025-10-03 16:30:41 +01:00
22 changed files with 1823 additions and 1023 deletions

View File

@@ -37,13 +37,13 @@ repos:
exclude: LICENSE.md
- repo: https://github.com/hadolint/hadolint
rev: c3dc18df7a501f02a560a2cc7ba3c69a85ca01d3 # frozen: v2.13.1-beta
rev: 4e697ba704fd23b2409b947a319c19c3ee54d24f # frozen: v2.14.0
hooks:
- id: hadolint-docker
# hadolint-docker is a special hook that runs hadolint in a Docker container
# Docker is not installed in the environment where pre-commit is run
stages: [manual]
entry: hadolint/hadolint:v2.12.1-beta hadolint
entry: hadolint/hadolint:v2.14 hadolint
- repo: https://github.com/codespell-project/codespell
rev: 63c8f8312b7559622c0d82815639671ae42132ac # frozen: v2.4.1
@@ -80,7 +80,7 @@ repos:
language: script
- repo: https://github.com/pre-commit/mirrors-clang-format
rev: 86fdcc9bd34d6afbbd29358b97436c8ffe3aa3b2 # frozen: v21.1.0
rev: 719856d56a62953b8d2839fb9e851f25c3cfeef8 # frozen: v21.1.2
hooks:
- id: clang-format
args: [--style=file]

View File

@@ -89,6 +89,14 @@ This document provides a list of all available Clio configuration properties in
- **Constraints**: The minimum value is `1`. The maximum value is `4294967295`.
- **Description**: Represents the number of threads that will be used for database operations.
### database.cassandra.provider
- **Required**: True
- **Type**: string
- **Default value**: `cassandra`
- **Constraints**: The value must be one of the following: `cassandra`, `aws_keyspace`.
- **Description**: The specific database backend provider we are using.
### database.cassandra.core_connections_per_host
- **Required**: True

View File

@@ -21,6 +21,7 @@
#include "data/BackendInterface.hpp"
#include "data/CassandraBackend.hpp"
#include "data/KeyspaceBackend.hpp"
#include "data/LedgerCacheInterface.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "util/config/ConfigDefinition.hpp"
@@ -55,10 +56,16 @@ makeBackend(util::config::ClioConfigDefinition const& config, data::LedgerCacheI
if (boost::iequals(type, "cassandra")) {
auto const cfg = config.getObject("database." + type);
if (cfg.getValueView("provider").asString() == toString(cassandra::impl::Provider::Keyspace)) {
backend = std::make_shared<data::cassandra::KeyspaceBackend>(
data::cassandra::SettingsProvider{cfg}, cache, readOnly
);
} else {
backend = std::make_shared<data::cassandra::CassandraBackend>(
data::cassandra::SettingsProvider{cfg}, cache, readOnly
);
}
}
if (!backend)
throw std::runtime_error("Invalid database type");

View File

@@ -295,7 +295,7 @@ public:
* @param account The account to fetch transactions for
* @param limit The maximum number of transactions per result page
* @param forward Whether to fetch the page forwards or backwards from the given cursor
* @param cursor The cursor to resume fetching from
* @param txnCursor The cursor to resume fetching from
* @param yield The coroutine context
* @return Results and a cursor to resume from
*/
@@ -304,7 +304,7 @@ public:
ripple::AccountID const& account,
std::uint32_t limit,
bool forward,
std::optional<TransactionsCursor> const& cursor,
std::optional<TransactionsCursor> const& txnCursor,
boost::asio::yield_context yield
) const = 0;

View File

@@ -19,20 +19,15 @@
#pragma once
#include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp"
#include "data/LedgerCacheInterface.hpp"
#include "data/LedgerHeaderCache.hpp"
#include "data/Types.hpp"
#include "data/cassandra/CassandraBackendFamily.hpp"
#include "data/cassandra/CassandraSchema.hpp"
#include "data/cassandra/Concepts.hpp"
#include "data/cassandra/Handle.hpp"
#include "data/cassandra/Schema.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "data/cassandra/Types.hpp"
#include "data/cassandra/impl/ExecutionStrategy.hpp"
#include "util/Assert.hpp"
#include "util/LedgerUtils.hpp"
#include "util/Profiler.hpp"
#include "util/log/Logger.hpp"
#include <boost/asio/spawn.hpp>
@@ -50,27 +45,18 @@
#include <xrpl/protocol/nft.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <limits>
#include <optional>
#include <stdexcept>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
class CacheBackendCassandraTest;
namespace data::cassandra {
/**
* @brief Implements @ref BackendInterface for Cassandra/ScyllaDB.
*
* Note: This is a safer and more correct rewrite of the original implementation of the backend.
* @brief Implements @ref CassandraBackendFamily for Cassandra/ScyllaDB.
*
* @tparam SettingsProviderType The settings provider type to use
* @tparam ExecutionStrategyType The execution strategy type to use
@@ -80,156 +66,46 @@ template <
SomeSettingsProvider SettingsProviderType,
SomeExecutionStrategy ExecutionStrategyType,
typename FetchLedgerCacheType = FetchLedgerCache>
class BasicCassandraBackend : public BackendInterface {
util::Logger log_{"Backend"};
SettingsProviderType settingsProvider_;
Schema<SettingsProviderType> schema_;
std::atomic_uint32_t ledgerSequence_ = 0u;
friend class ::CacheBackendCassandraTest;
class BasicCassandraBackend : public CassandraBackendFamily<
SettingsProviderType,
ExecutionStrategyType,
CassandraSchema<SettingsProviderType>,
FetchLedgerCacheType> {
using DefaultCassandraFamily = CassandraBackendFamily<
SettingsProviderType,
ExecutionStrategyType,
CassandraSchema<SettingsProviderType>,
FetchLedgerCacheType>;
// protected because CassandraMigrationBackend inherits from this class
protected:
Handle handle_;
// have to be mutable because BackendInterface constness :(
mutable ExecutionStrategyType executor_;
// TODO: move to interface level
mutable FetchLedgerCacheType ledgerCache_{};
using DefaultCassandraFamily::executor_;
using DefaultCassandraFamily::ledgerSequence_;
using DefaultCassandraFamily::log_;
using DefaultCassandraFamily::range_;
using DefaultCassandraFamily::schema_;
public:
/**
* @brief Create a new cassandra/scylla backend instance.
*
* @param settingsProvider The settings provider to use
* @param cache The ledger cache to use
* @param readOnly Whether the database should be in readonly mode
* @brief Inherit the constructors of the base class.
*/
BasicCassandraBackend(SettingsProviderType settingsProvider, data::LedgerCacheInterface& cache, bool readOnly)
: BackendInterface(cache)
, settingsProvider_{std::move(settingsProvider)}
, schema_{settingsProvider_}
, handle_{settingsProvider_.getSettings()}
, executor_{settingsProvider_.getSettings(), handle_}
{
if (auto const res = handle_.connect(); not res)
throw std::runtime_error("Could not connect to database: " + res.error());
if (not readOnly) {
if (auto const res = handle_.execute(schema_.createKeyspace); not res) {
// on datastax, creation of keyspaces can be configured to only be done thru the admin
// interface. this does not mean that the keyspace does not already exist tho.
if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED)
throw std::runtime_error("Could not create keyspace: " + res.error());
}
if (auto const res = handle_.executeEach(schema_.createSchema); not res)
throw std::runtime_error("Could not create schema: " + res.error());
}
try {
schema_.prepareStatements(handle_);
} catch (std::runtime_error const& ex) {
auto const error = fmt::format(
"Failed to prepare the statements: {}; readOnly: {}. ReadOnly should be turned off or another Clio "
"node with write access to DB should be started first.",
ex.what(),
readOnly
);
LOG(log_.error()) << error;
throw std::runtime_error(error);
}
LOG(log_.info()) << "Created (revamped) CassandraBackend";
}
using DefaultCassandraFamily::DefaultCassandraFamily;
/*
* @brief Move constructor is deleted because handle_ is shared by reference with executor
*/
BasicCassandraBackend(BasicCassandraBackend&&) = delete;
TransactionsAndCursor
fetchAccountTransactions(
ripple::AccountID const& account,
std::uint32_t const limit,
bool forward,
std::optional<TransactionsCursor> const& cursorIn,
boost::asio::yield_context yield
) const override
{
auto rng = fetchLedgerRange();
if (!rng)
return {.txns = {}, .cursor = {}};
Statement const statement = [this, forward, &account]() {
if (forward)
return schema_->selectAccountTxForward.bind(account);
return schema_->selectAccountTx.bind(account);
}();
auto cursor = cursorIn;
if (cursor) {
statement.bindAt(1, cursor->asTuple());
LOG(log_.debug()) << "account = " << ripple::strHex(account) << " tuple = " << cursor->ledgerSequence
<< cursor->transactionIndex;
} else {
auto const seq = forward ? rng->minSequence : rng->maxSequence;
auto const placeHolder = forward ? 0u : std::numeric_limits<std::uint32_t>::max();
statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
LOG(log_.debug()) << "account = " << ripple::strHex(account) << " idx = " << seq
<< " tuple = " << placeHolder;
}
// FIXME: Limit is a hack to support uint32_t properly for the time
// being. Should be removed later and schema updated to use proper
// types.
statement.bindAt(2, Limit{limit});
auto const res = executor_.read(yield, statement);
auto const& results = res.value();
if (not results.hasRows()) {
LOG(log_.debug()) << "No rows returned";
return {};
}
std::vector<ripple::uint256> hashes = {};
auto numRows = results.numRows();
LOG(log_.info()) << "num_rows = " << numRows;
for (auto [hash, data] : extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
hashes.push_back(hash);
if (--numRows == 0) {
LOG(log_.debug()) << "Setting cursor";
cursor = data;
}
}
auto const txns = fetchTransactions(hashes, yield);
LOG(log_.debug()) << "Txns = " << txns.size();
if (txns.size() == limit) {
LOG(log_.debug()) << "Returning cursor";
return {txns, cursor};
}
return {txns, {}};
}
void
waitForWritesToFinish() override
{
executor_.sync();
}
bool
doFinishWrites() override
{
waitForWritesToFinish();
this->waitForWritesToFinish();
if (!range_) {
executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_);
}
if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
if (not this->executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
LOG(log_.warn()) << "Update failed for ledger " << ledgerSequence_;
return false;
}
@@ -238,271 +114,6 @@ public:
return true;
}
void
writeLedger(ripple::LedgerHeader const& ledgerHeader, std::string&& blob) override
{
executor_.write(schema_->insertLedgerHeader, ledgerHeader.seq, std::move(blob));
executor_.write(schema_->insertLedgerHash, ledgerHeader.hash, ledgerHeader.seq);
ledgerSequence_ = ledgerHeader.seq;
}
std::optional<std::uint32_t>
fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
{
if (auto const res = executor_.read(yield, schema_->selectLatestLedger); res) {
if (auto const& result = res.value(); result) {
if (auto const maybeValue = result.template get<uint32_t>(); maybeValue)
return maybeValue;
LOG(log_.error()) << "Could not fetch latest ledger - no rows";
return std::nullopt;
}
LOG(log_.error()) << "Could not fetch latest ledger - no result";
} else {
LOG(log_.error()) << "Could not fetch latest ledger: " << res.error();
}
return std::nullopt;
}
std::optional<ripple::LedgerHeader>
fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override
{
if (auto const lock = ledgerCache_.get(); lock.has_value() && lock->seq == sequence)
return lock->ledger;
auto const res = executor_.read(yield, schema_->selectLedgerBySeq, sequence);
if (res) {
if (auto const& result = res.value(); result) {
if (auto const maybeValue = result.template get<std::vector<unsigned char>>(); maybeValue) {
auto const header = util::deserializeHeader(ripple::makeSlice(*maybeValue));
ledgerCache_.put(FetchLedgerCache::CacheEntry{header, sequence});
return header;
}
LOG(log_.error()) << "Could not fetch ledger by sequence - no rows";
return std::nullopt;
}
LOG(log_.error()) << "Could not fetch ledger by sequence - no result";
} else {
LOG(log_.error()) << "Could not fetch ledger by sequence: " << res.error();
}
return std::nullopt;
}
std::optional<ripple::LedgerHeader>
fetchLedgerByHash(ripple::uint256 const& hash, boost::asio::yield_context yield) const override
{
if (auto const res = executor_.read(yield, schema_->selectLedgerByHash, hash); res) {
if (auto const& result = res.value(); result) {
if (auto const maybeValue = result.template get<uint32_t>(); maybeValue)
return fetchLedgerBySequence(*maybeValue, yield);
LOG(log_.error()) << "Could not fetch ledger by hash - no rows";
return std::nullopt;
}
LOG(log_.error()) << "Could not fetch ledger by hash - no result";
} else {
LOG(log_.error()) << "Could not fetch ledger by hash: " << res.error();
}
return std::nullopt;
}
std::optional<LedgerRange>
hardFetchLedgerRange(boost::asio::yield_context yield) const override
{
auto const res = executor_.read(yield, schema_->selectLedgerRange);
if (res) {
auto const& results = res.value();
if (not results.hasRows()) {
LOG(log_.debug()) << "Could not fetch ledger range - no rows";
return std::nullopt;
}
// TODO: this is probably a good place to use user type in
// cassandra instead of having two rows with bool flag. or maybe at
// least use tuple<int, int>?
LedgerRange range;
std::size_t idx = 0;
for (auto [seq] : extract<uint32_t>(results)) {
if (idx == 0) {
range.maxSequence = range.minSequence = seq;
} else if (idx == 1) {
range.maxSequence = seq;
}
++idx;
}
if (range.minSequence > range.maxSequence)
std::swap(range.minSequence, range.maxSequence);
LOG(log_.debug()) << "After hardFetchLedgerRange range is " << range.minSequence << ":"
<< range.maxSequence;
return range;
}
LOG(log_.error()) << "Could not fetch ledger range: " << res.error();
return std::nullopt;
}
std::vector<TransactionAndMetadata>
fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
{
auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence, yield);
return fetchTransactions(hashes, yield);
}
std::vector<ripple::uint256>
fetchAllTransactionHashesInLedger(
std::uint32_t const ledgerSequence,
boost::asio::yield_context yield
) const override
{
auto start = std::chrono::system_clock::now();
auto const res = executor_.read(yield, schema_->selectAllTransactionHashesInLedger, ledgerSequence);
if (not res) {
LOG(log_.error()) << "Could not fetch all transaction hashes: " << res.error();
return {};
}
auto const& result = res.value();
if (not result.hasRows()) {
LOG(log_.warn()) << "Could not fetch all transaction hashes - no rows; ledger = "
<< std::to_string(ledgerSequence);
return {};
}
std::vector<ripple::uint256> hashes;
for (auto [hash] : extract<ripple::uint256>(result))
hashes.push_back(std::move(hash));
auto end = std::chrono::system_clock::now();
LOG(log_.debug()) << "Fetched " << hashes.size() << " transaction hashes from database in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< " milliseconds";
return hashes;
}
std::optional<NFT>
fetchNFT(
ripple::uint256 const& tokenID,
std::uint32_t const ledgerSequence,
boost::asio::yield_context yield
) const override
{
auto const res = executor_.read(yield, schema_->selectNFT, tokenID, ledgerSequence);
if (not res)
return std::nullopt;
if (auto const maybeRow = res->template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
auto [seq, owner, isBurned] = *maybeRow;
auto result = std::make_optional<NFT>(tokenID, seq, owner, isBurned);
// now fetch URI. Usually we will have the URI even for burned NFTs,
// but if the first ledger on this clio included NFTokenBurn
// transactions we will not have the URIs for any of those tokens.
// In any other case not having the URI indicates something went
// wrong with our data.
//
// TODO - in the future would be great for any handlers that use
// this could inject a warning in this case (the case of not having
// a URI because it was burned in the first ledger) to indicate that
// even though we are returning a blank URI, the NFT might have had
// one.
auto uriRes = executor_.read(yield, schema_->selectNFTURI, tokenID, ledgerSequence);
if (uriRes) {
if (auto const maybeUri = uriRes->template get<ripple::Blob>(); maybeUri)
result->uri = *maybeUri;
}
return result;
}
LOG(log_.error()) << "Could not fetch NFT - no rows";
return std::nullopt;
}
TransactionsAndCursor
fetchNFTTransactions(
ripple::uint256 const& tokenID,
std::uint32_t const limit,
bool const forward,
std::optional<TransactionsCursor> const& cursorIn,
boost::asio::yield_context yield
) const override
{
auto rng = fetchLedgerRange();
if (!rng)
return {.txns = {}, .cursor = {}};
Statement const statement = [this, forward, &tokenID]() {
if (forward)
return schema_->selectNFTTxForward.bind(tokenID);
return schema_->selectNFTTx.bind(tokenID);
}();
auto cursor = cursorIn;
if (cursor) {
statement.bindAt(1, cursor->asTuple());
LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " tuple = " << cursor->ledgerSequence
<< cursor->transactionIndex;
} else {
auto const seq = forward ? rng->minSequence : rng->maxSequence;
auto const placeHolder = forward ? 0 : std::numeric_limits<std::uint32_t>::max();
statement.bindAt(1, std::make_tuple(placeHolder, placeHolder));
LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq
<< " tuple = " << placeHolder;
}
statement.bindAt(2, Limit{limit});
auto const res = executor_.read(yield, statement);
auto const& results = res.value();
if (not results.hasRows()) {
LOG(log_.debug()) << "No rows returned";
return {};
}
std::vector<ripple::uint256> hashes = {};
auto numRows = results.numRows();
LOG(log_.info()) << "num_rows = " << numRows;
for (auto [hash, data] : extract<ripple::uint256, std::tuple<uint32_t, uint32_t>>(results)) {
hashes.push_back(hash);
if (--numRows == 0) {
LOG(log_.debug()) << "Setting cursor";
cursor = data;
// forward queries by ledger/tx sequence `>=`
// so we have to advance the index by one
if (forward)
++cursor->transactionIndex;
}
}
auto const txns = fetchTransactions(hashes, yield);
LOG(log_.debug()) << "NFT Txns = " << txns.size();
if (txns.size() == limit) {
LOG(log_.debug()) << "Returning cursor";
return {txns, cursor};
}
return {txns, {}};
}
NFTsAndCursor
fetchNFTsByIssuer(
ripple::AccountID const& issuer,
@@ -589,208 +200,6 @@ public:
return ret;
}
MPTHoldersAndCursor
fetchMPTHolders(
ripple::uint192 const& mptID,
std::uint32_t const limit,
std::optional<ripple::AccountID> const& cursorIn,
std::uint32_t const ledgerSequence,
boost::asio::yield_context yield
) const override
{
auto const holderEntries = executor_.read(
yield, schema_->selectMPTHolders, mptID, cursorIn.value_or(ripple::AccountID(0)), Limit{limit}
);
auto const& holderResults = holderEntries.value();
if (not holderResults.hasRows()) {
LOG(log_.debug()) << "No rows returned";
return {};
}
std::vector<ripple::uint256> mptKeys;
std::optional<ripple::AccountID> cursor;
for (auto const [holder] : extract<ripple::AccountID>(holderResults)) {
mptKeys.push_back(ripple::keylet::mptoken(mptID, holder).key);
cursor = holder;
}
auto mptObjects = doFetchLedgerObjects(mptKeys, ledgerSequence, yield);
auto it = std::remove_if(mptObjects.begin(), mptObjects.end(), [](Blob const& mpt) { return mpt.empty(); });
mptObjects.erase(it, mptObjects.end());
ASSERT(mptKeys.size() <= limit, "Number of keys can't exceed the limit");
if (mptKeys.size() == limit)
return {mptObjects, cursor};
return {mptObjects, {}};
}
std::optional<Blob>
doFetchLedgerObject(
ripple::uint256 const& key,
std::uint32_t const sequence,
boost::asio::yield_context yield
) const override
{
LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key);
if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
if (auto const result = res->template get<Blob>(); result) {
if (result->size())
return result;
} else {
LOG(log_.debug()) << "Could not fetch ledger object - no rows";
}
} else {
LOG(log_.error()) << "Could not fetch ledger object: " << res.error();
}
return std::nullopt;
}
std::optional<std::uint32_t>
doFetchLedgerObjectSeq(
ripple::uint256 const& key,
std::uint32_t const sequence,
boost::asio::yield_context yield
) const override
{
LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key);
if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) {
if (auto const result = res->template get<Blob, std::uint32_t>(); result) {
auto [_, seq] = result.value();
return seq;
}
LOG(log_.debug()) << "Could not fetch ledger object sequence - no rows";
} else {
LOG(log_.error()) << "Could not fetch ledger object sequence: " << res.error();
}
return std::nullopt;
}
std::optional<TransactionAndMetadata>
fetchTransaction(ripple::uint256 const& hash, boost::asio::yield_context yield) const override
{
if (auto const res = executor_.read(yield, schema_->selectTransaction, hash); res) {
if (auto const maybeValue = res->template get<Blob, Blob, uint32_t, uint32_t>(); maybeValue) {
auto [transaction, meta, seq, date] = *maybeValue;
return std::make_optional<TransactionAndMetadata>(transaction, meta, seq, date);
}
LOG(log_.debug()) << "Could not fetch transaction - no rows";
} else {
LOG(log_.error()) << "Could not fetch transaction: " << res.error();
}
return std::nullopt;
}
std::optional<ripple::uint256>
doFetchSuccessorKey(
ripple::uint256 key,
std::uint32_t const ledgerSequence,
boost::asio::yield_context yield
) const override
{
if (auto const res = executor_.read(yield, schema_->selectSuccessor, key, ledgerSequence); res) {
if (auto const result = res->template get<ripple::uint256>(); result) {
if (*result == kLAST_KEY)
return std::nullopt;
return result;
}
LOG(log_.debug()) << "Could not fetch successor - no rows";
} else {
LOG(log_.error()) << "Could not fetch successor: " << res.error();
}
return std::nullopt;
}
std::vector<TransactionAndMetadata>
fetchTransactions(std::vector<ripple::uint256> const& hashes, boost::asio::yield_context yield) const override
{
if (hashes.empty())
return {};
auto const numHashes = hashes.size();
std::vector<TransactionAndMetadata> results;
results.reserve(numHashes);
std::vector<Statement> statements;
statements.reserve(numHashes);
auto const timeDiff = util::timed([this, yield, &results, &hashes, &statements]() {
// TODO: seems like a job for "hash IN (list of hashes)" instead?
std::transform(
std::cbegin(hashes), std::cend(hashes), std::back_inserter(statements), [this](auto const& hash) {
return schema_->selectTransaction.bind(hash);
}
);
auto const entries = executor_.readEach(yield, statements);
std::transform(
std::cbegin(entries),
std::cend(entries),
std::back_inserter(results),
[](auto const& res) -> TransactionAndMetadata {
if (auto const maybeRow = res.template get<Blob, Blob, uint32_t, uint32_t>(); maybeRow)
return *maybeRow;
return {};
}
);
});
ASSERT(numHashes == results.size(), "Number of hashes and results must match");
LOG(log_.debug()) << "Fetched " << numHashes << " transactions from database in " << timeDiff
<< " milliseconds";
return results;
}
std::vector<Blob>
doFetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
std::uint32_t const sequence,
boost::asio::yield_context yield
) const override
{
if (keys.empty())
return {};
auto const numKeys = keys.size();
LOG(log_.trace()) << "Fetching " << numKeys << " objects";
std::vector<Blob> results;
results.reserve(numKeys);
std::vector<Statement> statements;
statements.reserve(numKeys);
// TODO: seems like a job for "key IN (list of keys)" instead?
std::transform(
std::cbegin(keys), std::cend(keys), std::back_inserter(statements), [this, &sequence](auto const& key) {
return schema_->selectObject.bind(key, sequence);
}
);
auto const entries = executor_.readEach(yield, statements);
std::transform(
std::cbegin(entries), std::cend(entries), std::back_inserter(results), [](auto const& res) -> Blob {
if (auto const maybeValue = res.template get<Blob>(); maybeValue)
return *maybeValue;
return {};
}
);
LOG(log_.trace()) << "Fetched " << numKeys << " objects";
return results;
}
std::vector<ripple::uint256>
fetchAccountRoots(
std::uint32_t number,
@@ -819,7 +228,7 @@ public:
fullAccounts.push_back(ripple::keylet::account(account).key);
lastItem = account;
}
auto const objs = doFetchLedgerObjects(fullAccounts, seq, yield);
auto const objs = this->doFetchLedgerObjects(fullAccounts, seq, yield);
for (auto i = 0u; i < fullAccounts.size(); i++) {
if (not objs[i].empty()) {
@@ -838,284 +247,6 @@ public:
return liveAccounts;
}
std::vector<LedgerObject>
fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override
{
auto const [keys, timeDiff] = util::timed([this, &ledgerSequence, yield]() -> std::vector<ripple::uint256> {
auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence);
if (not res) {
LOG(log_.error()) << "Could not fetch ledger diff: " << res.error() << "; ledger = " << ledgerSequence;
return {};
}
auto const& results = res.value();
if (not results) {
LOG(log_.error()) << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence;
return {};
}
std::vector<ripple::uint256> resultKeys;
for (auto [key] : extract<ripple::uint256>(results))
resultKeys.push_back(key);
return resultKeys;
});
// one of the above errors must have happened
if (keys.empty())
return {};
LOG(log_.debug()) << "Fetched " << keys.size() << " diff hashes from database in " << timeDiff
<< " milliseconds";
auto const objs = fetchLedgerObjects(keys, ledgerSequence, yield);
std::vector<LedgerObject> results;
results.reserve(keys.size());
std::transform(
std::cbegin(keys),
std::cend(keys),
std::cbegin(objs),
std::back_inserter(results),
[](auto const& key, auto const& obj) { return LedgerObject{key, obj}; }
);
return results;
}
std::optional<std::string>
fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const override
{
auto const res = executor_.read(yield, schema_->selectMigratorStatus, Text(migratorName));
if (not res) {
LOG(log_.error()) << "Could not fetch migrator status: " << res.error();
return {};
}
auto const& results = res.value();
if (not results) {
return {};
}
for (auto [statusString] : extract<std::string>(results))
return statusString;
return {};
}
std::expected<std::vector<std::pair<boost::uuids::uuid, std::string>>, std::string>
fetchClioNodesData(boost::asio::yield_context yield) const override
{
auto const readResult = executor_.read(yield, schema_->selectClioNodesData);
if (not readResult)
return std::unexpected{readResult.error().message()};
std::vector<std::pair<boost::uuids::uuid, std::string>> result;
for (auto [uuid, message] : extract<boost::uuids::uuid, std::string>(*readResult)) {
result.emplace_back(uuid, std::move(message));
}
return result;
}
void
doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override
{
LOG(log_.trace()) << " Writing ledger object " << key.size() << ":" << seq << " [" << blob.size() << " bytes]";
if (range_)
executor_.write(schema_->insertDiff, seq, key);
executor_.write(schema_->insertObject, std::move(key), seq, std::move(blob));
}
void
writeSuccessor(std::string&& key, std::uint32_t const seq, std::string&& successor) override
{
LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. "
<< " seq = " << std::to_string(seq) << " successor = " << successor.size() << " bytes.";
ASSERT(!key.empty(), "Key must not be empty");
ASSERT(!successor.empty(), "Successor must not be empty");
executor_.write(schema_->insertSuccessor, std::move(key), seq, std::move(successor));
}
void
writeAccountTransactions(std::vector<AccountTransactionsData> data) override
{
std::vector<Statement> statements;
statements.reserve(data.size() * 10); // assume 10 transactions avg
for (auto& record : data) {
std::ranges::transform(record.accounts, std::back_inserter(statements), [this, &record](auto&& account) {
return schema_->insertAccountTx.bind(
std::forward<decltype(account)>(account),
std::make_tuple(record.ledgerSequence, record.transactionIndex),
record.txHash
);
});
}
executor_.write(std::move(statements));
}
void
writeAccountTransaction(AccountTransactionsData record) override
{
std::vector<Statement> statements;
statements.reserve(record.accounts.size());
std::ranges::transform(record.accounts, std::back_inserter(statements), [this, &record](auto&& account) {
return schema_->insertAccountTx.bind(
std::forward<decltype(account)>(account),
std::make_tuple(record.ledgerSequence, record.transactionIndex),
record.txHash
);
});
executor_.write(std::move(statements));
}
void
writeNFTTransactions(std::vector<NFTTransactionsData> const& data) override
{
std::vector<Statement> statements;
statements.reserve(data.size());
std::ranges::transform(data, std::back_inserter(statements), [this](auto const& record) {
return schema_->insertNFTTx.bind(
record.tokenID, std::make_tuple(record.ledgerSequence, record.transactionIndex), record.txHash
);
});
executor_.write(std::move(statements));
}
void
writeTransaction(
std::string&& hash,
std::uint32_t const seq,
std::uint32_t const date,
std::string&& transaction,
std::string&& metadata
) override
{
LOG(log_.trace()) << "Writing txn to database";
executor_.write(schema_->insertLedgerTransaction, seq, hash);
executor_.write(
schema_->insertTransaction, std::move(hash), seq, date, std::move(transaction), std::move(metadata)
);
}
void
writeNFTs(std::vector<NFTsData> const& data) override
{
std::vector<Statement> statements;
statements.reserve(data.size() * 3);
for (NFTsData const& record : data) {
if (!record.onlyUriChanged) {
statements.push_back(
schema_->insertNFT.bind(record.tokenID, record.ledgerSequence, record.owner, record.isBurned)
);
// If `uri` is set (and it can be set to an empty uri), we know this
// is a net-new NFT. That is, this NFT has not been seen before by
// us _OR_ it is in the extreme edge case of a re-minted NFT ID with
// the same NFT ID as an already-burned token. In this case, we need
// to record the URI and link to the issuer_nf_tokens table.
if (record.uri) {
statements.push_back(schema_->insertIssuerNFT.bind(
ripple::nft::getIssuer(record.tokenID),
static_cast<uint32_t>(ripple::nft::getTaxon(record.tokenID)),
record.tokenID
));
statements.push_back(
schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value())
);
}
} else {
// only uri changed, we update the uri table only
statements.push_back(
schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value())
);
}
}
executor_.writeEach(std::move(statements));
}
void
writeMPTHolders(std::vector<MPTHolderData> const& data) override
{
std::vector<Statement> statements;
statements.reserve(data.size());
for (auto [mptId, holder] : data)
statements.push_back(schema_->insertMPTHolder.bind(mptId, holder));
executor_.write(std::move(statements));
}
void
startWrites() const override
{
// Note: no-op in original implementation too.
// probably was used in PG to start a transaction or smth.
}
void
writeMigratorStatus(std::string const& migratorName, std::string const& status) override
{
executor_.writeSync(
schema_->insertMigratorStatus, data::cassandra::Text{migratorName}, data::cassandra::Text(status)
);
}
void
writeNodeMessage(boost::uuids::uuid const& uuid, std::string message) override
{
executor_.writeSync(schema_->updateClioNodeMessage, data::cassandra::Text{std::move(message)}, uuid);
}
bool
isTooBusy() const override
{
return executor_.isTooBusy();
}
boost::json::object
stats() const override
{
return executor_.stats();
}
private:
bool
executeSyncUpdate(Statement statement)
{
auto const res = executor_.writeSync(statement);
auto maybeSuccess = res->template get<bool>();
if (not maybeSuccess) {
LOG(log_.error()) << "executeSyncUpdate - error getting result - no row";
return false;
}
if (not maybeSuccess.value()) {
LOG(log_.warn()) << "Update failed. Checking if DB state is what we expect";
// error may indicate that another writer wrote something.
// in this case let's just compare the current state of things
// against what we were trying to write in the first place and
// use that as the source of truth for the result.
auto rng = hardFetchLedgerRangeNoThrow();
return rng && rng->maxSequence == ledgerSequence_;
}
return true;
}
};
using CassandraBackend = BasicCassandraBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>;

View File

@@ -0,0 +1,308 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/LedgerHeaderCache.hpp"
#include "data/Types.hpp"
#include "data/cassandra/CassandraBackendFamily.hpp"
#include "data/cassandra/Concepts.hpp"
#include "data/cassandra/KeyspaceSchema.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "data/cassandra/Types.hpp"
#include "data/cassandra/impl/ExecutionStrategy.hpp"
#include "util/Assert.hpp"
#include "util/log/Logger.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <cassandra.h>
#include <fmt/format.h>
#include <xrpl/basics/Blob.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/protocol/AccountID.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/nft.h>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <optional>
#include <stdexcept>
#include <utility>
#include <vector>
namespace data::cassandra {
/**
* @brief Implements @ref CassandraBackendFamily for Keyspace
*
* @tparam SettingsProviderType The settings provider type to use
* @tparam ExecutionStrategyType The execution strategy type to use
* @tparam FetchLedgerCacheType The ledger header cache type to use
*/
template <
SomeSettingsProvider SettingsProviderType,
SomeExecutionStrategy ExecutionStrategyType,
typename FetchLedgerCacheType = FetchLedgerCache>
class BasicKeyspaceBackend : public CassandraBackendFamily<
SettingsProviderType,
ExecutionStrategyType,
KeyspaceSchema<SettingsProviderType>,
FetchLedgerCacheType> {
using DefaultCassandraFamily = CassandraBackendFamily<
SettingsProviderType,
ExecutionStrategyType,
KeyspaceSchema<SettingsProviderType>,
FetchLedgerCacheType>;
using DefaultCassandraFamily::executor_;
using DefaultCassandraFamily::ledgerSequence_;
using DefaultCassandraFamily::log_;
using DefaultCassandraFamily::range_;
using DefaultCassandraFamily::schema_;
public:
/**
* @brief Inherit the constructors of the base class.
*/
using DefaultCassandraFamily::DefaultCassandraFamily;
/**
* @brief Move constructor is deleted because handle_ is shared by reference with executor
*/
BasicKeyspaceBackend(BasicKeyspaceBackend&&) = delete;
bool
doFinishWrites() override
{
this->waitForWritesToFinish();
// !range_.has_value() means the table 'ledger_range' is not populated;
// This would be the first write to the table.
// In this case, insert both min_sequence/max_sequence range into the table.
if (not(range_.has_value())) {
executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_);
executor_.writeSync(schema_->insertLedgerRange, true, ledgerSequence_);
}
if (not this->executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
log_.warn() << "Update failed for ledger " << ledgerSequence_;
return false;
}
log_.info() << "Committed ledger " << ledgerSequence_;
return true;
}
NFTsAndCursor
fetchNFTsByIssuer(
ripple::AccountID const& issuer,
std::optional<std::uint32_t> const& taxon,
std::uint32_t const ledgerSequence,
std::uint32_t const limit,
std::optional<ripple::uint256> const& cursorIn,
boost::asio::yield_context yield
) const override
{
std::vector<ripple::uint256> nftIDs;
if (taxon.has_value()) {
// Keyspace and ScyllaDB uses the same logic for taxon-filtered queries
nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield);
} else {
// --- Amazon Keyspaces Workflow for non-taxon queries ---
auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
firstQuery.bindAt(1, startTaxon);
firstQuery.bindAt(2, startTokenID);
firstQuery.bindAt(3, Limit{limit});
auto const firstRes = executor_.read(yield, firstQuery);
if (firstRes) {
for (auto const [nftID] : extract<ripple::uint256>(firstRes.value()))
nftIDs.push_back(nftID);
}
if (nftIDs.size() < limit) {
auto const remainingLimit = limit - nftIDs.size();
Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces.bind(issuer);
secondQuery.bindAt(1, startTaxon);
secondQuery.bindAt(2, Limit{remainingLimit});
auto const secondRes = executor_.read(yield, secondQuery);
if (secondRes) {
for (auto const [nftID] : extract<ripple::uint256>(secondRes.value()))
nftIDs.push_back(nftID);
}
}
}
return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield);
}
/**
* @brief (Unsupported in Keyspaces) Fetches account root object indexes by page.
* * @note Loading the cache by enumerating all accounts is currently unsupported by the AWS Keyspaces backend.
* This function's logic relies on "PER PARTITION LIMIT 1", which Keyspaces does not support, and there is
* no efficient alternative. This is acceptable as the cache is primarily loaded via diffs. Calling this
* function will throw an exception.
*
* @param number The total number of accounts to fetch.
* @param pageSize The maximum number of accounts per page.
* @param seq The accounts need to exist at this ledger sequence.
* @param yield The coroutine context.
* @return A vector of ripple::uint256 representing the account root hashes.
*/
std::vector<ripple::uint256>
fetchAccountRoots(
[[maybe_unused]] std::uint32_t number,
[[maybe_unused]] std::uint32_t pageSize,
[[maybe_unused]] std::uint32_t seq,
[[maybe_unused]] boost::asio::yield_context yield
) const override
{
ASSERT(false, "Fetching account roots is not supported by the Keyspaces backend.");
std::unreachable();
}
private:
std::vector<ripple::uint256>
fetchNFTIDsByTaxon(
ripple::AccountID const& issuer,
std::uint32_t const taxon,
std::uint32_t const limit,
std::optional<ripple::uint256> const& cursorIn,
boost::asio::yield_context yield
) const
{
std::vector<ripple::uint256> nftIDs;
Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
statement.bindAt(1, taxon);
statement.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
statement.bindAt(3, Limit{limit});
auto const res = executor_.read(yield, statement);
if (res && res.value().hasRows()) {
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
nftIDs.push_back(nftID);
}
return nftIDs;
}
std::vector<ripple::uint256>
fetchNFTIDsWithoutTaxon(
ripple::AccountID const& issuer,
std::uint32_t const limit,
std::optional<ripple::uint256> const& cursorIn,
boost::asio::yield_context yield
) const
{
std::vector<ripple::uint256> nftIDs;
auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
firstQuery.bindAt(1, startTaxon);
firstQuery.bindAt(2, startTokenID);
firstQuery.bindAt(3, Limit{limit});
auto const firstRes = executor_.read(yield, firstQuery);
if (firstRes) {
for (auto const [nftID] : extract<ripple::uint256>(firstRes.value()))
nftIDs.push_back(nftID);
}
if (nftIDs.size() < limit) {
auto const remainingLimit = limit - nftIDs.size();
Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces.bind(issuer);
secondQuery.bindAt(1, startTaxon);
secondQuery.bindAt(2, Limit{remainingLimit});
auto const secondRes = executor_.read(yield, secondQuery);
if (secondRes) {
for (auto const [nftID] : extract<ripple::uint256>(secondRes.value()))
nftIDs.push_back(nftID);
}
}
return nftIDs;
}
/**
* @brief Takes a list of NFT IDs, fetches their full data, and assembles the final result with a cursor.
*/
NFTsAndCursor
populateNFTsAndCreateCursor(
std::vector<ripple::uint256> const& nftIDs,
std::uint32_t const ledgerSequence,
std::uint32_t const limit,
boost::asio::yield_context yield
) const
{
if (nftIDs.empty()) {
LOG(log_.debug()) << "No rows returned";
return {};
}
NFTsAndCursor ret;
if (nftIDs.size() == limit)
ret.cursor = nftIDs.back();
// Prepare and execute queries to fetch NFT info and URIs in parallel.
std::vector<Statement> selectNFTStatements;
selectNFTStatements.reserve(nftIDs.size());
std::transform(
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
return schema_->selectNFT.bind(nftID, ledgerSequence);
}
);
std::vector<Statement> selectNFTURIStatements;
selectNFTURIStatements.reserve(nftIDs.size());
std::transform(
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
return schema_->selectNFTURI.bind(nftID, ledgerSequence);
}
);
auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
// Combine the results into final NFT objects.
for (auto i = 0u; i < nftIDs.size(); ++i) {
if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
auto [seq, owner, isBurned] = *maybeRow;
NFT nft(nftIDs[i], seq, owner, isBurned);
if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
nft.uri = *maybeUri;
ret.nfts.push_back(nft);
}
}
return ret;
}
};
using KeyspaceBackend = BasicKeyspaceBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>;
} // namespace data::cassandra

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,178 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/cassandra/Concepts.hpp"
#include "data/cassandra/Handle.hpp"
#include "data/cassandra/Schema.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "data/cassandra/Types.hpp"
#include "util/log/Logger.hpp"
#include <boost/json/string.hpp>
#include <fmt/compile.h>
#include <functional>
#include <memory>
namespace data::cassandra {
/**
* @brief Manages the DB schema and provides access to prepared statements.
*/
template <SomeSettingsProvider SettingsProviderType>
class CassandraSchema : public Schema<SettingsProvider> {
using Schema::Schema;
public:
/**
* @brief Construct a new Cassandra Schema object
*
* @param settingsProvider The settings provider
*/
struct CassandraStatements : public Schema<SettingsProvider>::Statements {
using Schema<SettingsProvider>::Statements::Statements;
//
// Update (and "delete") queries
//
PreparedStatement updateLedgerRange = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
UPDATE {}
SET sequence = ?
WHERE is_latest = ?
IF sequence IN (?, null)
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
)
);
}();
//
// Select queries
//
PreparedStatement selectNFTIDsByIssuer = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT token_id
FROM {}
WHERE issuer = ?
AND (taxon, token_id) > ?
ORDER BY taxon ASC, token_id ASC
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
)
);
}();
PreparedStatement selectAccountFromBeginning = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > 0
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
PreparedStatement selectAccountFromToken = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > token(?)
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
PreparedStatement selectLedgerPageKeys = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT key
FROM {}
WHERE TOKEN(key) >= ?
AND sequence <= ?
PER PARTITION LIMIT 1
LIMIT ?
ALLOW FILTERING
)",
qualifiedTableName(settingsProvider_.get(), "objects")
)
);
}();
PreparedStatement selectLedgerPage = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT object, key
FROM {}
WHERE TOKEN(key) >= ?
AND sequence <= ?
PER PARTITION LIMIT 1
LIMIT ?
ALLOW FILTERING
)",
qualifiedTableName(settingsProvider_.get(), "objects")
)
);
}();
};
void
prepareStatements(Handle const& handle) override
{
LOG(log_.info()) << "Preparing cassandra statements";
statements_ = std::make_unique<CassandraStatements>(settingsProvider_, handle);
LOG(log_.info()) << "Finished preparing statements";
}
/**
* @brief Provides access to statements.
*
* @return The statements
*/
std::unique_ptr<CassandraStatements> const&
operator->() const
{
return statements_;
}
private:
std::unique_ptr<CassandraStatements> statements_{nullptr};
};
} // namespace data::cassandra

View File

@@ -0,0 +1,140 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/cassandra/Concepts.hpp"
#include "data/cassandra/Handle.hpp"
#include "data/cassandra/Schema.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "data/cassandra/Types.hpp"
#include "util/log/Logger.hpp"
#include <boost/json/string.hpp>
#include <fmt/compile.h>
#include <functional>
#include <memory>
namespace data::cassandra {
/**
* @brief Manages the DB schema and provides access to prepared statements.
*/
template <SomeSettingsProvider SettingsProviderType>
class KeyspaceSchema : public Schema<SettingsProvider> {
public:
using Schema::Schema;
/**
* @brief Construct a new Keyspace Schema object
*
* @param settingsProvider The settings provider
*/
struct KeyspaceStatements : public Schema<SettingsProvider>::Statements {
using Schema<SettingsProvider>::Statements::Statements;
//
// Insert queries
//
PreparedStatement insertLedgerRange = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
INSERT INTO {} (is_latest, sequence) VALUES (?, ?) IF NOT EXISTS
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
)
);
}();
//
// Update (and "delete") queries
//
PreparedStatement updateLedgerRange = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
UPDATE {}
SET sequence = ?
WHERE is_latest = ?
IF sequence = ?
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
)
);
}();
PreparedStatement selectLedgerRange = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT sequence
FROM {}
WHERE is_latest in (True, False)
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
)
);
}();
//
// Select queries
//
PreparedStatement selectNFTsAfterTaxonKeyspaces = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT token_id
FROM {}
WHERE issuer = ?
AND taxon > ?
ORDER BY taxon ASC, token_id ASC
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
)
);
}();
};
void
prepareStatements(Handle const& handle) override
{
LOG(log_.info()) << "Preparing aws keyspace statements";
statements_ = std::make_unique<KeyspaceStatements>(settingsProvider_, handle);
LOG(log_.info()) << "Finished preparing statements";
}
/**
* @brief Provides access to statements.
*
* @return The statements
*/
std::unique_ptr<KeyspaceStatements> const&
operator->() const
{
return statements_;
}
private:
std::unique_ptr<KeyspaceStatements> statements_{nullptr};
};
} // namespace data::cassandra

View File

@@ -24,10 +24,10 @@
#include "data/cassandra/Types.hpp"
#include "util/log/Logger.hpp"
#include <boost/json/string.hpp>
#include <fmt/compile.h>
#include <functional>
#include <memory>
#include <string>
#include <string_view>
#include <vector>
@@ -53,12 +53,15 @@ template <SomeSettingsProvider SettingsProviderType>
*/
template <SomeSettingsProvider SettingsProviderType>
class Schema {
protected:
util::Logger log_{"Backend"};
std::reference_wrapper<SettingsProviderType const> settingsProvider_;
public:
virtual ~Schema() = default;
/**
* @brief Construct a new Schema object
* @brief Shared Schema's between all Schema classes (Cassandra and Keyspace)
*
* @param settingsProvider The settings provider
*/
@@ -334,6 +337,7 @@ public:
* @brief Prepared statements holder.
*/
class Statements {
protected:
std::reference_wrapper<SettingsProviderType const> settingsProvider_;
std::reference_wrapper<Handle const> handle_;
@@ -526,20 +530,6 @@ public:
// Update (and "delete") queries
//
PreparedStatement updateLedgerRange = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
UPDATE {}
SET sequence = ?
WHERE is_latest = ?
IF sequence IN (?, null)
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
)
);
}();
PreparedStatement deleteLedgerRange = [this]() {
return handle_.get().prepare(
fmt::format(
@@ -654,40 +644,6 @@ public:
);
}();
PreparedStatement selectLedgerPageKeys = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT key
FROM {}
WHERE TOKEN(key) >= ?
AND sequence <= ?
PER PARTITION LIMIT 1
LIMIT ?
ALLOW FILTERING
)",
qualifiedTableName(settingsProvider_.get(), "objects")
)
);
}();
PreparedStatement selectLedgerPage = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT object, key
FROM {}
WHERE TOKEN(key) >= ?
AND sequence <= ?
PER PARTITION LIMIT 1
LIMIT ?
ALLOW FILTERING
)",
qualifiedTableName(settingsProvider_.get(), "objects")
)
);
}();
PreparedStatement getToken = [this]() {
return handle_.get().prepare(
fmt::format(
@@ -717,36 +673,6 @@ public:
);
}();
PreparedStatement selectAccountFromBeginning = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > 0
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
PreparedStatement selectAccountFromToken = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > token(?)
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
PreparedStatement selectAccountTxForward = [this]() {
return handle_.get().prepare(
fmt::format(
@@ -827,22 +753,6 @@ public:
);
}();
PreparedStatement selectNFTIDsByIssuer = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT token_id
FROM {}
WHERE issuer = ?
AND (taxon, token_id) > ?
ORDER BY taxon ASC, token_id ASC
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
)
);
}();
PreparedStatement selectNFTIDsByIssuerTaxon = [this]() {
return handle_.get().prepare(
fmt::format(
@@ -960,27 +870,8 @@ public:
*
* @param handle The handle to the DB
*/
void
prepareStatements(Handle const& handle)
{
LOG(log_.info()) << "Preparing cassandra statements";
statements_ = std::make_unique<Statements>(settingsProvider_, handle);
LOG(log_.info()) << "Finished preparing statements";
}
/**
* @brief Provides access to statements.
*
* @return The statements
*/
std::unique_ptr<Statements> const&
operator->() const
{
return statements_;
}
private:
std::unique_ptr<Statements> statements_{nullptr};
virtual void
prepareStatements(Handle const& handle) = 0;
};
} // namespace data::cassandra

View File

@@ -97,6 +97,7 @@ SettingsProvider::parseSettings() const
settings.coreConnectionsPerHost = config_.get<uint32_t>("core_connections_per_host");
settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io");
settings.writeBatchSize = config_.get<std::size_t>("write_batch_size");
settings.provider = config_.get<std::string>("provider");
if (config_.getValueView("connect_timeout").hasValue()) {
auto const connectTimeoutSecond = config_.get<uint32_t>("connect_timeout");

View File

@@ -36,9 +36,18 @@ constexpr auto kBATCH_DELETER = [](CassBatch* ptr) { cass_batch_free(ptr); };
namespace data::cassandra::impl {
// TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases
/*
* There are 2 main batches of Cassandra Statements:
* LOGGED: Ensures all updates in the batch succeed together, or none do.
* Use this for critical, related changes (e.g., for the same user), but it is slower.
*
* UNLOGGED: For performance. Sends many separate updates in one network trip to be fast.
* Use this for bulk-loading unrelated data, but know there's NO all-or-nothing guarantee.
*
* More info here: https://docs.datastax.com/en/developer/cpp-driver-dse/1.10/features/basics/batches/index.html
*/
Batch::Batch(std::vector<Statement> const& statements)
: ManagedObject{cass_batch_new(CASS_BATCH_TYPE_LOGGED), kBATCH_DELETER}
: ManagedObject{cass_batch_new(CASS_BATCH_TYPE_UNLOGGED), kBATCH_DELETER}
{
cass_batch_set_is_idempotent(*this, cass_true);

View File

@@ -60,6 +60,17 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), k
cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count());
cass_cluster_set_request_timeout(*this, settings.requestTimeout.count());
// TODO: AWS keyspace reads should be local_one to save cost
if (settings.provider == toString(cassandra::impl::Provider::Keyspace)) {
if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); rc != CASS_OK) {
throw std::runtime_error(fmt::format("Error setting keyspace consistency: {}", cass_error_desc(rc)));
}
} else {
if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_QUORUM); rc != CASS_OK) {
throw std::runtime_error(fmt::format("Error setting cassandra consistency: {}", cass_error_desc(rc)));
}
}
if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost);
rc != CASS_OK) {
throw std::runtime_error(fmt::format("Could not set core connections per host: {}", cass_error_desc(rc)));

View File

@@ -31,10 +31,29 @@
#include <string>
#include <string_view>
#include <thread>
#include <utility>
#include <variant>
namespace data::cassandra::impl {
namespace {
enum class Provider { Cassandra, Keyspace };
inline std::string
toString(Provider provider)
{
switch (provider) {
case Provider::Cassandra:
return "cassandra";
case Provider::Keyspace:
return "aws_keyspace";
}
std::unreachable();
}
} // namespace
// TODO: move Settings to public interface, not impl
/**
@@ -45,6 +64,7 @@ struct Settings {
static constexpr uint32_t kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000;
static constexpr uint32_t kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000;
static constexpr std::size_t kDEFAULT_BATCH_SIZE = 20;
static constexpr Provider kDEFAULT_PROVIDER = Provider::Cassandra;
/**
* @brief Represents the configuration of contact points for cassandra.
@@ -83,11 +103,14 @@ struct Settings {
uint32_t maxReadRequestsOutstanding = kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING;
/** @brief The number of connection per host to always have active */
uint32_t coreConnectionsPerHost = 1u;
uint32_t coreConnectionsPerHost = 3u;
/** @brief Size of batches when writing */
std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE;
/** @brief Provider to know if we are using scylladb or keyspace */
std::string provider = toString(kDEFAULT_PROVIDER);
/** @brief Size of the IO queue */
std::optional<uint32_t> queueSizeIO = std::nullopt; // NOLINT(readability-redundant-member-init)

View File

@@ -58,14 +58,16 @@ public:
explicit Statement(std::string_view query, Args&&... args)
: ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), kDELETER}
{
cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM);
// TODO: figure out how to set consistency level in config
// NOTE: Keyspace doesn't support QUORUM at write level
// cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM);
cass_statement_set_is_idempotent(*this, cass_true);
bind<Args...>(std::forward<Args>(args)...);
}
/* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, kDELETER}
{
cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM);
// cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM);
cass_statement_set_is_idempotent(*this, cass_true);
}

View File

@@ -84,6 +84,11 @@ static constexpr std::array<char const*, 1> kDATABASE_TYPE = {"cassandra"};
*/
static constexpr std::array<char const*, 2> kPROCESSING_POLICY = {"parallel", "sequent"};
/**
* @brief specific values that are accepted for database provider in config.
*/
static constexpr std::array<char const*, 2> kPROVIDER = {"cassandra", "aws_keyspace"};
/**
* @brief An interface to enforce constraints on certain values within ClioConfigDefinition.
*/
@@ -470,6 +475,7 @@ static constinit OneOf gValidateCassandraName{"database.type", kDATABASE_TYPE};
static constinit OneOf gValidateLoadMode{"cache.load", kLOAD_CACHE_MODE};
static constinit OneOf gValidateLogTag{"log.tag_style", kLOG_TAGS};
static constinit OneOf gValidateProcessingPolicy{"server.processing_policy", kPROCESSING_POLICY};
static constinit OneOf gValidateProvider{"database.cassandra.provider", kPROVIDER};
static constinit PositiveDouble gValidatePositiveDouble{};

View File

@@ -285,6 +285,8 @@ getClioConfig()
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.provider",
ConfigValue{ConfigType::String}.defaultValue("cassandra").withConstraint(gValidateProvider)},
{"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
{"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},

View File

@@ -173,6 +173,7 @@ This document provides a list of all available Clio configuration properties in
"Maximum number of outstanding read requests. Read requests are API calls that read from the database."},
KV{.key = "database.cassandra.threads",
.value = "Represents the number of threads that will be used for database operations."},
KV{.key = "database.cassandra.provider", .value = "The specific database backend provider we are using."},
KV{.key = "database.cassandra.core_connections_per_host",
.value = "The number of core connections per host for the Cassandra database."},
KV{.key = "database.cassandra.queue_size_io",

View File

@@ -44,6 +44,7 @@ using namespace util::config;
struct BackendCassandraFactoryTest : SyncAsioContextTest, util::prometheus::WithPrometheus {
static constexpr auto kKEYSPACE = "factory_test";
static constexpr auto kPROVIDER = "cassandra";
protected:
ClioConfigDefinition cfg_{
@@ -53,6 +54,7 @@ protected:
{"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
{"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(kKEYSPACE)},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue(kPROVIDER)},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},

View File

@@ -93,6 +93,7 @@ protected:
{"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
{"database.cassandra.keyspace",
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},

View File

@@ -102,6 +102,7 @@ protected:
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)},
{"database.cassandra.keyspace",
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)},

View File

@@ -58,6 +58,7 @@ getParseSettingsConfig(boost::json::value val)
{"database.cassandra.certificate", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
{"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional()},
{"database.cassandra.write_batch_size", ConfigValue{ConfigType::Integer}.defaultValue(20)},
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.optional()},