diff --git a/docs/config-description.md b/docs/config-description.md index 57ed3aeb..afcf971b 100644 --- a/docs/config-description.md +++ b/docs/config-description.md @@ -89,6 +89,14 @@ This document provides a list of all available Clio configuration properties in - **Constraints**: The minimum value is `1`. The maximum value is `4294967295`. - **Description**: Represents the number of threads that will be used for database operations. +### database.cassandra.provider + +- **Required**: True +- **Type**: string +- **Default value**: `cassandra` +- **Constraints**: The value must be one of the following: `cassandra`, `aws_keyspace`. +- **Description**: The specific database backend provider we are using. + ### database.cassandra.core_connections_per_host - **Required**: True diff --git a/src/data/BackendFactory.hpp b/src/data/BackendFactory.hpp index 2d247f81..3c7710ec 100644 --- a/src/data/BackendFactory.hpp +++ b/src/data/BackendFactory.hpp @@ -21,6 +21,7 @@ #include "data/BackendInterface.hpp" #include "data/CassandraBackend.hpp" +#include "data/KeyspaceBackend.hpp" #include "data/LedgerCacheInterface.hpp" #include "data/cassandra/SettingsProvider.hpp" #include "util/config/ConfigDefinition.hpp" @@ -55,9 +56,15 @@ makeBackend(util::config::ClioConfigDefinition const& config, data::LedgerCacheI if (boost::iequals(type, "cassandra")) { auto const cfg = config.getObject("database." + type); - backend = std::make_shared( - data::cassandra::SettingsProvider{cfg}, cache, readOnly - ); + if (cfg.getValueView("provider").asString() == toString(cassandra::impl::Provider::Keyspace)) { + backend = std::make_shared( + data::cassandra::SettingsProvider{cfg}, cache, readOnly + ); + } else { + backend = std::make_shared( + data::cassandra::SettingsProvider{cfg}, cache, readOnly + ); + } } if (!backend) diff --git a/src/data/BackendInterface.hpp b/src/data/BackendInterface.hpp index 898552f9..c10f8880 100644 --- a/src/data/BackendInterface.hpp +++ b/src/data/BackendInterface.hpp @@ -295,7 +295,7 @@ public: * @param account The account to fetch transactions for * @param limit The maximum number of transactions per result page * @param forward Whether to fetch the page forwards or backwards from the given cursor - * @param cursor The cursor to resume fetching from + * @param txnCursor The cursor to resume fetching from * @param yield The coroutine context * @return Results and a cursor to resume from */ @@ -304,7 +304,7 @@ public: ripple::AccountID const& account, std::uint32_t limit, bool forward, - std::optional const& cursor, + std::optional const& txnCursor, boost::asio::yield_context yield ) const = 0; diff --git a/src/data/CassandraBackend.hpp b/src/data/CassandraBackend.hpp index 00e5be6e..d7de8b40 100644 --- a/src/data/CassandraBackend.hpp +++ b/src/data/CassandraBackend.hpp @@ -19,20 +19,15 @@ #pragma once -#include "data/BackendInterface.hpp" -#include "data/DBHelpers.hpp" -#include "data/LedgerCacheInterface.hpp" #include "data/LedgerHeaderCache.hpp" #include "data/Types.hpp" +#include "data/cassandra/CassandraBackendFamily.hpp" +#include "data/cassandra/CassandraSchema.hpp" #include "data/cassandra/Concepts.hpp" #include "data/cassandra/Handle.hpp" -#include "data/cassandra/Schema.hpp" #include "data/cassandra/SettingsProvider.hpp" #include "data/cassandra/Types.hpp" #include "data/cassandra/impl/ExecutionStrategy.hpp" -#include "util/Assert.hpp" -#include "util/LedgerUtils.hpp" -#include "util/Profiler.hpp" #include "util/log/Logger.hpp" #include @@ -50,27 +45,18 @@ #include #include -#include -#include #include #include #include -#include #include -#include #include #include -#include #include -class CacheBackendCassandraTest; - namespace data::cassandra { /** - * @brief Implements @ref BackendInterface for Cassandra/ScyllaDB. - * - * Note: This is a safer and more correct rewrite of the original implementation of the backend. + * @brief Implements @ref CassandraBackendFamily for Cassandra/ScyllaDB. * * @tparam SettingsProviderType The settings provider type to use * @tparam ExecutionStrategyType The execution strategy type to use @@ -80,156 +66,46 @@ template < SomeSettingsProvider SettingsProviderType, SomeExecutionStrategy ExecutionStrategyType, typename FetchLedgerCacheType = FetchLedgerCache> -class BasicCassandraBackend : public BackendInterface { - util::Logger log_{"Backend"}; - - SettingsProviderType settingsProvider_; - Schema schema_; - std::atomic_uint32_t ledgerSequence_ = 0u; - friend class ::CacheBackendCassandraTest; +class BasicCassandraBackend : public CassandraBackendFamily< + SettingsProviderType, + ExecutionStrategyType, + CassandraSchema, + FetchLedgerCacheType> { + using DefaultCassandraFamily = CassandraBackendFamily< + SettingsProviderType, + ExecutionStrategyType, + CassandraSchema, + FetchLedgerCacheType>; + // protected because CassandraMigrationBackend inherits from this class protected: - Handle handle_; - - // have to be mutable because BackendInterface constness :( - mutable ExecutionStrategyType executor_; - // TODO: move to interface level - mutable FetchLedgerCacheType ledgerCache_{}; + using DefaultCassandraFamily::executor_; + using DefaultCassandraFamily::ledgerSequence_; + using DefaultCassandraFamily::log_; + using DefaultCassandraFamily::range_; + using DefaultCassandraFamily::schema_; public: /** - * @brief Create a new cassandra/scylla backend instance. - * - * @param settingsProvider The settings provider to use - * @param cache The ledger cache to use - * @param readOnly Whether the database should be in readonly mode + * @brief Inherit the constructors of the base class. */ - BasicCassandraBackend(SettingsProviderType settingsProvider, data::LedgerCacheInterface& cache, bool readOnly) - : BackendInterface(cache) - , settingsProvider_{std::move(settingsProvider)} - , schema_{settingsProvider_} - , handle_{settingsProvider_.getSettings()} - , executor_{settingsProvider_.getSettings(), handle_} - { - if (auto const res = handle_.connect(); not res) - throw std::runtime_error("Could not connect to database: " + res.error()); - - if (not readOnly) { - if (auto const res = handle_.execute(schema_.createKeyspace); not res) { - // on datastax, creation of keyspaces can be configured to only be done thru the admin - // interface. this does not mean that the keyspace does not already exist tho. - if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED) - throw std::runtime_error("Could not create keyspace: " + res.error()); - } - - if (auto const res = handle_.executeEach(schema_.createSchema); not res) - throw std::runtime_error("Could not create schema: " + res.error()); - } - - try { - schema_.prepareStatements(handle_); - } catch (std::runtime_error const& ex) { - auto const error = fmt::format( - "Failed to prepare the statements: {}; readOnly: {}. ReadOnly should be turned off or another Clio " - "node with write access to DB should be started first.", - ex.what(), - readOnly - ); - LOG(log_.error()) << error; - throw std::runtime_error(error); - } - LOG(log_.info()) << "Created (revamped) CassandraBackend"; - } + using DefaultCassandraFamily::DefaultCassandraFamily; /* * @brief Move constructor is deleted because handle_ is shared by reference with executor */ BasicCassandraBackend(BasicCassandraBackend&&) = delete; - TransactionsAndCursor - fetchAccountTransactions( - ripple::AccountID const& account, - std::uint32_t const limit, - bool forward, - std::optional const& cursorIn, - boost::asio::yield_context yield - ) const override - { - auto rng = fetchLedgerRange(); - if (!rng) - return {.txns = {}, .cursor = {}}; - - Statement const statement = [this, forward, &account]() { - if (forward) - return schema_->selectAccountTxForward.bind(account); - - return schema_->selectAccountTx.bind(account); - }(); - - auto cursor = cursorIn; - if (cursor) { - statement.bindAt(1, cursor->asTuple()); - LOG(log_.debug()) << "account = " << ripple::strHex(account) << " tuple = " << cursor->ledgerSequence - << cursor->transactionIndex; - } else { - auto const seq = forward ? rng->minSequence : rng->maxSequence; - auto const placeHolder = forward ? 0u : std::numeric_limits::max(); - - statement.bindAt(1, std::make_tuple(placeHolder, placeHolder)); - LOG(log_.debug()) << "account = " << ripple::strHex(account) << " idx = " << seq - << " tuple = " << placeHolder; - } - - // FIXME: Limit is a hack to support uint32_t properly for the time - // being. Should be removed later and schema updated to use proper - // types. - statement.bindAt(2, Limit{limit}); - auto const res = executor_.read(yield, statement); - auto const& results = res.value(); - if (not results.hasRows()) { - LOG(log_.debug()) << "No rows returned"; - return {}; - } - - std::vector hashes = {}; - auto numRows = results.numRows(); - LOG(log_.info()) << "num_rows = " << numRows; - - for (auto [hash, data] : extract>(results)) { - hashes.push_back(hash); - if (--numRows == 0) { - LOG(log_.debug()) << "Setting cursor"; - cursor = data; - } - } - - auto const txns = fetchTransactions(hashes, yield); - LOG(log_.debug()) << "Txns = " << txns.size(); - - if (txns.size() == limit) { - LOG(log_.debug()) << "Returning cursor"; - return {txns, cursor}; - } - - return {txns, {}}; - } - - void - waitForWritesToFinish() override - { - executor_.sync(); - } - bool doFinishWrites() override { - waitForWritesToFinish(); + this->waitForWritesToFinish(); if (!range_) { executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_); } - if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) { + if (not this->executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) { LOG(log_.warn()) << "Update failed for ledger " << ledgerSequence_; return false; } @@ -238,271 +114,6 @@ public: return true; } - void - writeLedger(ripple::LedgerHeader const& ledgerHeader, std::string&& blob) override - { - executor_.write(schema_->insertLedgerHeader, ledgerHeader.seq, std::move(blob)); - - executor_.write(schema_->insertLedgerHash, ledgerHeader.hash, ledgerHeader.seq); - - ledgerSequence_ = ledgerHeader.seq; - } - - std::optional - fetchLatestLedgerSequence(boost::asio::yield_context yield) const override - { - if (auto const res = executor_.read(yield, schema_->selectLatestLedger); res) { - if (auto const& result = res.value(); result) { - if (auto const maybeValue = result.template get(); maybeValue) - return maybeValue; - - LOG(log_.error()) << "Could not fetch latest ledger - no rows"; - return std::nullopt; - } - - LOG(log_.error()) << "Could not fetch latest ledger - no result"; - } else { - LOG(log_.error()) << "Could not fetch latest ledger: " << res.error(); - } - - return std::nullopt; - } - - std::optional - fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override - { - if (auto const lock = ledgerCache_.get(); lock.has_value() && lock->seq == sequence) - return lock->ledger; - - auto const res = executor_.read(yield, schema_->selectLedgerBySeq, sequence); - if (res) { - if (auto const& result = res.value(); result) { - if (auto const maybeValue = result.template get>(); maybeValue) { - auto const header = util::deserializeHeader(ripple::makeSlice(*maybeValue)); - ledgerCache_.put(FetchLedgerCache::CacheEntry{header, sequence}); - return header; - } - - LOG(log_.error()) << "Could not fetch ledger by sequence - no rows"; - return std::nullopt; - } - - LOG(log_.error()) << "Could not fetch ledger by sequence - no result"; - } else { - LOG(log_.error()) << "Could not fetch ledger by sequence: " << res.error(); - } - - return std::nullopt; - } - - std::optional - fetchLedgerByHash(ripple::uint256 const& hash, boost::asio::yield_context yield) const override - { - if (auto const res = executor_.read(yield, schema_->selectLedgerByHash, hash); res) { - if (auto const& result = res.value(); result) { - if (auto const maybeValue = result.template get(); maybeValue) - return fetchLedgerBySequence(*maybeValue, yield); - - LOG(log_.error()) << "Could not fetch ledger by hash - no rows"; - return std::nullopt; - } - - LOG(log_.error()) << "Could not fetch ledger by hash - no result"; - } else { - LOG(log_.error()) << "Could not fetch ledger by hash: " << res.error(); - } - - return std::nullopt; - } - - std::optional - hardFetchLedgerRange(boost::asio::yield_context yield) const override - { - auto const res = executor_.read(yield, schema_->selectLedgerRange); - if (res) { - auto const& results = res.value(); - if (not results.hasRows()) { - LOG(log_.debug()) << "Could not fetch ledger range - no rows"; - return std::nullopt; - } - - // TODO: this is probably a good place to use user type in - // cassandra instead of having two rows with bool flag. or maybe at - // least use tuple? - LedgerRange range; - std::size_t idx = 0; - for (auto [seq] : extract(results)) { - if (idx == 0) { - range.maxSequence = range.minSequence = seq; - } else if (idx == 1) { - range.maxSequence = seq; - } - - ++idx; - } - - if (range.minSequence > range.maxSequence) - std::swap(range.minSequence, range.maxSequence); - - LOG(log_.debug()) << "After hardFetchLedgerRange range is " << range.minSequence << ":" - << range.maxSequence; - return range; - } - LOG(log_.error()) << "Could not fetch ledger range: " << res.error(); - - return std::nullopt; - } - - std::vector - fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override - { - auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence, yield); - return fetchTransactions(hashes, yield); - } - - std::vector - fetchAllTransactionHashesInLedger( - std::uint32_t const ledgerSequence, - boost::asio::yield_context yield - ) const override - { - auto start = std::chrono::system_clock::now(); - auto const res = executor_.read(yield, schema_->selectAllTransactionHashesInLedger, ledgerSequence); - - if (not res) { - LOG(log_.error()) << "Could not fetch all transaction hashes: " << res.error(); - return {}; - } - - auto const& result = res.value(); - if (not result.hasRows()) { - LOG(log_.warn()) << "Could not fetch all transaction hashes - no rows; ledger = " - << std::to_string(ledgerSequence); - return {}; - } - - std::vector hashes; - for (auto [hash] : extract(result)) - hashes.push_back(std::move(hash)); - - auto end = std::chrono::system_clock::now(); - LOG(log_.debug()) << "Fetched " << hashes.size() << " transaction hashes from database in " - << std::chrono::duration_cast(end - start).count() - << " milliseconds"; - - return hashes; - } - - std::optional - fetchNFT( - ripple::uint256 const& tokenID, - std::uint32_t const ledgerSequence, - boost::asio::yield_context yield - ) const override - { - auto const res = executor_.read(yield, schema_->selectNFT, tokenID, ledgerSequence); - if (not res) - return std::nullopt; - - if (auto const maybeRow = res->template get(); maybeRow) { - auto [seq, owner, isBurned] = *maybeRow; - auto result = std::make_optional(tokenID, seq, owner, isBurned); - - // now fetch URI. Usually we will have the URI even for burned NFTs, - // but if the first ledger on this clio included NFTokenBurn - // transactions we will not have the URIs for any of those tokens. - // In any other case not having the URI indicates something went - // wrong with our data. - // - // TODO - in the future would be great for any handlers that use - // this could inject a warning in this case (the case of not having - // a URI because it was burned in the first ledger) to indicate that - // even though we are returning a blank URI, the NFT might have had - // one. - auto uriRes = executor_.read(yield, schema_->selectNFTURI, tokenID, ledgerSequence); - if (uriRes) { - if (auto const maybeUri = uriRes->template get(); maybeUri) - result->uri = *maybeUri; - } - - return result; - } - - LOG(log_.error()) << "Could not fetch NFT - no rows"; - return std::nullopt; - } - - TransactionsAndCursor - fetchNFTTransactions( - ripple::uint256 const& tokenID, - std::uint32_t const limit, - bool const forward, - std::optional const& cursorIn, - boost::asio::yield_context yield - ) const override - { - auto rng = fetchLedgerRange(); - if (!rng) - return {.txns = {}, .cursor = {}}; - - Statement const statement = [this, forward, &tokenID]() { - if (forward) - return schema_->selectNFTTxForward.bind(tokenID); - - return schema_->selectNFTTx.bind(tokenID); - }(); - - auto cursor = cursorIn; - if (cursor) { - statement.bindAt(1, cursor->asTuple()); - LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " tuple = " << cursor->ledgerSequence - << cursor->transactionIndex; - } else { - auto const seq = forward ? rng->minSequence : rng->maxSequence; - auto const placeHolder = forward ? 0 : std::numeric_limits::max(); - - statement.bindAt(1, std::make_tuple(placeHolder, placeHolder)); - LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq - << " tuple = " << placeHolder; - } - - statement.bindAt(2, Limit{limit}); - - auto const res = executor_.read(yield, statement); - auto const& results = res.value(); - if (not results.hasRows()) { - LOG(log_.debug()) << "No rows returned"; - return {}; - } - - std::vector hashes = {}; - auto numRows = results.numRows(); - LOG(log_.info()) << "num_rows = " << numRows; - - for (auto [hash, data] : extract>(results)) { - hashes.push_back(hash); - if (--numRows == 0) { - LOG(log_.debug()) << "Setting cursor"; - cursor = data; - - // forward queries by ledger/tx sequence `>=` - // so we have to advance the index by one - if (forward) - ++cursor->transactionIndex; - } - } - - auto const txns = fetchTransactions(hashes, yield); - LOG(log_.debug()) << "NFT Txns = " << txns.size(); - - if (txns.size() == limit) { - LOG(log_.debug()) << "Returning cursor"; - return {txns, cursor}; - } - - return {txns, {}}; - } - NFTsAndCursor fetchNFTsByIssuer( ripple::AccountID const& issuer, @@ -589,208 +200,6 @@ public: return ret; } - MPTHoldersAndCursor - fetchMPTHolders( - ripple::uint192 const& mptID, - std::uint32_t const limit, - std::optional const& cursorIn, - std::uint32_t const ledgerSequence, - boost::asio::yield_context yield - ) const override - { - auto const holderEntries = executor_.read( - yield, schema_->selectMPTHolders, mptID, cursorIn.value_or(ripple::AccountID(0)), Limit{limit} - ); - - auto const& holderResults = holderEntries.value(); - if (not holderResults.hasRows()) { - LOG(log_.debug()) << "No rows returned"; - return {}; - } - - std::vector mptKeys; - std::optional cursor; - for (auto const [holder] : extract(holderResults)) { - mptKeys.push_back(ripple::keylet::mptoken(mptID, holder).key); - cursor = holder; - } - - auto mptObjects = doFetchLedgerObjects(mptKeys, ledgerSequence, yield); - - auto it = std::remove_if(mptObjects.begin(), mptObjects.end(), [](Blob const& mpt) { return mpt.empty(); }); - - mptObjects.erase(it, mptObjects.end()); - - ASSERT(mptKeys.size() <= limit, "Number of keys can't exceed the limit"); - if (mptKeys.size() == limit) - return {mptObjects, cursor}; - - return {mptObjects, {}}; - } - - std::optional - doFetchLedgerObject( - ripple::uint256 const& key, - std::uint32_t const sequence, - boost::asio::yield_context yield - ) const override - { - LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key); - if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) { - if (auto const result = res->template get(); result) { - if (result->size()) - return result; - } else { - LOG(log_.debug()) << "Could not fetch ledger object - no rows"; - } - } else { - LOG(log_.error()) << "Could not fetch ledger object: " << res.error(); - } - - return std::nullopt; - } - - std::optional - doFetchLedgerObjectSeq( - ripple::uint256 const& key, - std::uint32_t const sequence, - boost::asio::yield_context yield - ) const override - { - LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key); - if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) { - if (auto const result = res->template get(); result) { - auto [_, seq] = result.value(); - return seq; - } - LOG(log_.debug()) << "Could not fetch ledger object sequence - no rows"; - } else { - LOG(log_.error()) << "Could not fetch ledger object sequence: " << res.error(); - } - - return std::nullopt; - } - - std::optional - fetchTransaction(ripple::uint256 const& hash, boost::asio::yield_context yield) const override - { - if (auto const res = executor_.read(yield, schema_->selectTransaction, hash); res) { - if (auto const maybeValue = res->template get(); maybeValue) { - auto [transaction, meta, seq, date] = *maybeValue; - return std::make_optional(transaction, meta, seq, date); - } - - LOG(log_.debug()) << "Could not fetch transaction - no rows"; - } else { - LOG(log_.error()) << "Could not fetch transaction: " << res.error(); - } - - return std::nullopt; - } - - std::optional - doFetchSuccessorKey( - ripple::uint256 key, - std::uint32_t const ledgerSequence, - boost::asio::yield_context yield - ) const override - { - if (auto const res = executor_.read(yield, schema_->selectSuccessor, key, ledgerSequence); res) { - if (auto const result = res->template get(); result) { - if (*result == kLAST_KEY) - return std::nullopt; - return result; - } - - LOG(log_.debug()) << "Could not fetch successor - no rows"; - } else { - LOG(log_.error()) << "Could not fetch successor: " << res.error(); - } - - return std::nullopt; - } - - std::vector - fetchTransactions(std::vector const& hashes, boost::asio::yield_context yield) const override - { - if (hashes.empty()) - return {}; - - auto const numHashes = hashes.size(); - std::vector results; - results.reserve(numHashes); - - std::vector statements; - statements.reserve(numHashes); - - auto const timeDiff = util::timed([this, yield, &results, &hashes, &statements]() { - // TODO: seems like a job for "hash IN (list of hashes)" instead? - std::transform( - std::cbegin(hashes), std::cend(hashes), std::back_inserter(statements), [this](auto const& hash) { - return schema_->selectTransaction.bind(hash); - } - ); - - auto const entries = executor_.readEach(yield, statements); - std::transform( - std::cbegin(entries), - std::cend(entries), - std::back_inserter(results), - [](auto const& res) -> TransactionAndMetadata { - if (auto const maybeRow = res.template get(); maybeRow) - return *maybeRow; - - return {}; - } - ); - }); - - ASSERT(numHashes == results.size(), "Number of hashes and results must match"); - LOG(log_.debug()) << "Fetched " << numHashes << " transactions from database in " << timeDiff - << " milliseconds"; - return results; - } - - std::vector - doFetchLedgerObjects( - std::vector const& keys, - std::uint32_t const sequence, - boost::asio::yield_context yield - ) const override - { - if (keys.empty()) - return {}; - - auto const numKeys = keys.size(); - LOG(log_.trace()) << "Fetching " << numKeys << " objects"; - - std::vector results; - results.reserve(numKeys); - - std::vector statements; - statements.reserve(numKeys); - - // TODO: seems like a job for "key IN (list of keys)" instead? - std::transform( - std::cbegin(keys), std::cend(keys), std::back_inserter(statements), [this, &sequence](auto const& key) { - return schema_->selectObject.bind(key, sequence); - } - ); - - auto const entries = executor_.readEach(yield, statements); - std::transform( - std::cbegin(entries), std::cend(entries), std::back_inserter(results), [](auto const& res) -> Blob { - if (auto const maybeValue = res.template get(); maybeValue) - return *maybeValue; - - return {}; - } - ); - - LOG(log_.trace()) << "Fetched " << numKeys << " objects"; - return results; - } - std::vector fetchAccountRoots( std::uint32_t number, @@ -819,7 +228,7 @@ public: fullAccounts.push_back(ripple::keylet::account(account).key); lastItem = account; } - auto const objs = doFetchLedgerObjects(fullAccounts, seq, yield); + auto const objs = this->doFetchLedgerObjects(fullAccounts, seq, yield); for (auto i = 0u; i < fullAccounts.size(); i++) { if (not objs[i].empty()) { @@ -838,284 +247,6 @@ public: return liveAccounts; } - - std::vector - fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override - { - auto const [keys, timeDiff] = util::timed([this, &ledgerSequence, yield]() -> std::vector { - auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence); - if (not res) { - LOG(log_.error()) << "Could not fetch ledger diff: " << res.error() << "; ledger = " << ledgerSequence; - return {}; - } - - auto const& results = res.value(); - if (not results) { - LOG(log_.error()) << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence; - return {}; - } - - std::vector resultKeys; - for (auto [key] : extract(results)) - resultKeys.push_back(key); - - return resultKeys; - }); - - // one of the above errors must have happened - if (keys.empty()) - return {}; - - LOG(log_.debug()) << "Fetched " << keys.size() << " diff hashes from database in " << timeDiff - << " milliseconds"; - - auto const objs = fetchLedgerObjects(keys, ledgerSequence, yield); - std::vector results; - results.reserve(keys.size()); - - std::transform( - std::cbegin(keys), - std::cend(keys), - std::cbegin(objs), - std::back_inserter(results), - [](auto const& key, auto const& obj) { return LedgerObject{key, obj}; } - ); - - return results; - } - - std::optional - fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const override - { - auto const res = executor_.read(yield, schema_->selectMigratorStatus, Text(migratorName)); - if (not res) { - LOG(log_.error()) << "Could not fetch migrator status: " << res.error(); - return {}; - } - - auto const& results = res.value(); - if (not results) { - return {}; - } - - for (auto [statusString] : extract(results)) - return statusString; - - return {}; - } - - std::expected>, std::string> - fetchClioNodesData(boost::asio::yield_context yield) const override - { - auto const readResult = executor_.read(yield, schema_->selectClioNodesData); - if (not readResult) - return std::unexpected{readResult.error().message()}; - - std::vector> result; - - for (auto [uuid, message] : extract(*readResult)) { - result.emplace_back(uuid, std::move(message)); - } - - return result; - } - - void - doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override - { - LOG(log_.trace()) << " Writing ledger object " << key.size() << ":" << seq << " [" << blob.size() << " bytes]"; - - if (range_) - executor_.write(schema_->insertDiff, seq, key); - - executor_.write(schema_->insertObject, std::move(key), seq, std::move(blob)); - } - - void - writeSuccessor(std::string&& key, std::uint32_t const seq, std::string&& successor) override - { - LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. " - << " seq = " << std::to_string(seq) << " successor = " << successor.size() << " bytes."; - ASSERT(!key.empty(), "Key must not be empty"); - ASSERT(!successor.empty(), "Successor must not be empty"); - - executor_.write(schema_->insertSuccessor, std::move(key), seq, std::move(successor)); - } - - void - writeAccountTransactions(std::vector data) override - { - std::vector statements; - statements.reserve(data.size() * 10); // assume 10 transactions avg - - for (auto& record : data) { - std::ranges::transform(record.accounts, std::back_inserter(statements), [this, &record](auto&& account) { - return schema_->insertAccountTx.bind( - std::forward(account), - std::make_tuple(record.ledgerSequence, record.transactionIndex), - record.txHash - ); - }); - } - - executor_.write(std::move(statements)); - } - - void - writeAccountTransaction(AccountTransactionsData record) override - { - std::vector statements; - statements.reserve(record.accounts.size()); - - std::ranges::transform(record.accounts, std::back_inserter(statements), [this, &record](auto&& account) { - return schema_->insertAccountTx.bind( - std::forward(account), - std::make_tuple(record.ledgerSequence, record.transactionIndex), - record.txHash - ); - }); - - executor_.write(std::move(statements)); - } - - void - writeNFTTransactions(std::vector const& data) override - { - std::vector statements; - statements.reserve(data.size()); - - std::ranges::transform(data, std::back_inserter(statements), [this](auto const& record) { - return schema_->insertNFTTx.bind( - record.tokenID, std::make_tuple(record.ledgerSequence, record.transactionIndex), record.txHash - ); - }); - - executor_.write(std::move(statements)); - } - - void - writeTransaction( - std::string&& hash, - std::uint32_t const seq, - std::uint32_t const date, - std::string&& transaction, - std::string&& metadata - ) override - { - LOG(log_.trace()) << "Writing txn to database"; - - executor_.write(schema_->insertLedgerTransaction, seq, hash); - executor_.write( - schema_->insertTransaction, std::move(hash), seq, date, std::move(transaction), std::move(metadata) - ); - } - - void - writeNFTs(std::vector const& data) override - { - std::vector statements; - statements.reserve(data.size() * 3); - - for (NFTsData const& record : data) { - if (!record.onlyUriChanged) { - statements.push_back( - schema_->insertNFT.bind(record.tokenID, record.ledgerSequence, record.owner, record.isBurned) - ); - - // If `uri` is set (and it can be set to an empty uri), we know this - // is a net-new NFT. That is, this NFT has not been seen before by - // us _OR_ it is in the extreme edge case of a re-minted NFT ID with - // the same NFT ID as an already-burned token. In this case, we need - // to record the URI and link to the issuer_nf_tokens table. - if (record.uri) { - statements.push_back(schema_->insertIssuerNFT.bind( - ripple::nft::getIssuer(record.tokenID), - static_cast(ripple::nft::getTaxon(record.tokenID)), - record.tokenID - )); - statements.push_back( - schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value()) - ); - } - } else { - // only uri changed, we update the uri table only - statements.push_back( - schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value()) - ); - } - } - - executor_.writeEach(std::move(statements)); - } - - void - writeMPTHolders(std::vector const& data) override - { - std::vector statements; - statements.reserve(data.size()); - for (auto [mptId, holder] : data) - statements.push_back(schema_->insertMPTHolder.bind(mptId, holder)); - - executor_.write(std::move(statements)); - } - - void - startWrites() const override - { - // Note: no-op in original implementation too. - // probably was used in PG to start a transaction or smth. - } - - void - writeMigratorStatus(std::string const& migratorName, std::string const& status) override - { - executor_.writeSync( - schema_->insertMigratorStatus, data::cassandra::Text{migratorName}, data::cassandra::Text(status) - ); - } - - void - writeNodeMessage(boost::uuids::uuid const& uuid, std::string message) override - { - executor_.writeSync(schema_->updateClioNodeMessage, data::cassandra::Text{std::move(message)}, uuid); - } - - bool - isTooBusy() const override - { - return executor_.isTooBusy(); - } - - boost::json::object - stats() const override - { - return executor_.stats(); - } - -private: - bool - executeSyncUpdate(Statement statement) - { - auto const res = executor_.writeSync(statement); - auto maybeSuccess = res->template get(); - if (not maybeSuccess) { - LOG(log_.error()) << "executeSyncUpdate - error getting result - no row"; - return false; - } - - if (not maybeSuccess.value()) { - LOG(log_.warn()) << "Update failed. Checking if DB state is what we expect"; - - // error may indicate that another writer wrote something. - // in this case let's just compare the current state of things - // against what we were trying to write in the first place and - // use that as the source of truth for the result. - auto rng = hardFetchLedgerRangeNoThrow(); - return rng && rng->maxSequence == ledgerSequence_; - } - - return true; - } }; using CassandraBackend = BasicCassandraBackend>; diff --git a/src/data/KeyspaceBackend.hpp b/src/data/KeyspaceBackend.hpp new file mode 100644 index 00000000..26971237 --- /dev/null +++ b/src/data/KeyspaceBackend.hpp @@ -0,0 +1,308 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/LedgerHeaderCache.hpp" +#include "data/Types.hpp" +#include "data/cassandra/CassandraBackendFamily.hpp" +#include "data/cassandra/Concepts.hpp" +#include "data/cassandra/KeyspaceSchema.hpp" +#include "data/cassandra/SettingsProvider.hpp" +#include "data/cassandra/Types.hpp" +#include "data/cassandra/impl/ExecutionStrategy.hpp" +#include "util/Assert.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace data::cassandra { + +/** + * @brief Implements @ref CassandraBackendFamily for Keyspace + * + * @tparam SettingsProviderType The settings provider type to use + * @tparam ExecutionStrategyType The execution strategy type to use + * @tparam FetchLedgerCacheType The ledger header cache type to use + */ +template < + SomeSettingsProvider SettingsProviderType, + SomeExecutionStrategy ExecutionStrategyType, + typename FetchLedgerCacheType = FetchLedgerCache> +class BasicKeyspaceBackend : public CassandraBackendFamily< + SettingsProviderType, + ExecutionStrategyType, + KeyspaceSchema, + FetchLedgerCacheType> { + using DefaultCassandraFamily = CassandraBackendFamily< + SettingsProviderType, + ExecutionStrategyType, + KeyspaceSchema, + FetchLedgerCacheType>; + + using DefaultCassandraFamily::executor_; + using DefaultCassandraFamily::ledgerSequence_; + using DefaultCassandraFamily::log_; + using DefaultCassandraFamily::range_; + using DefaultCassandraFamily::schema_; + +public: + /** + * @brief Inherit the constructors of the base class. + */ + using DefaultCassandraFamily::DefaultCassandraFamily; + + /** + * @brief Move constructor is deleted because handle_ is shared by reference with executor + */ + BasicKeyspaceBackend(BasicKeyspaceBackend&&) = delete; + + bool + doFinishWrites() override + { + this->waitForWritesToFinish(); + + // !range_.has_value() means the table 'ledger_range' is not populated; + // This would be the first write to the table. + // In this case, insert both min_sequence/max_sequence range into the table. + if (not(range_.has_value())) { + executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_); + executor_.writeSync(schema_->insertLedgerRange, true, ledgerSequence_); + } + + if (not this->executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) { + log_.warn() << "Update failed for ledger " << ledgerSequence_; + return false; + } + + log_.info() << "Committed ledger " << ledgerSequence_; + return true; + } + + NFTsAndCursor + fetchNFTsByIssuer( + ripple::AccountID const& issuer, + std::optional const& taxon, + std::uint32_t const ledgerSequence, + std::uint32_t const limit, + std::optional const& cursorIn, + boost::asio::yield_context yield + ) const override + { + std::vector nftIDs; + if (taxon.has_value()) { + // Keyspace and ScyllaDB uses the same logic for taxon-filtered queries + nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield); + } else { + // --- Amazon Keyspaces Workflow for non-taxon queries --- + auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0; + auto const startTokenID = cursorIn.value_or(ripple::uint256(0)); + + Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); + firstQuery.bindAt(1, startTaxon); + firstQuery.bindAt(2, startTokenID); + firstQuery.bindAt(3, Limit{limit}); + + auto const firstRes = executor_.read(yield, firstQuery); + if (firstRes) { + for (auto const [nftID] : extract(firstRes.value())) + nftIDs.push_back(nftID); + } + + if (nftIDs.size() < limit) { + auto const remainingLimit = limit - nftIDs.size(); + Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces.bind(issuer); + secondQuery.bindAt(1, startTaxon); + secondQuery.bindAt(2, Limit{remainingLimit}); + + auto const secondRes = executor_.read(yield, secondQuery); + if (secondRes) { + for (auto const [nftID] : extract(secondRes.value())) + nftIDs.push_back(nftID); + } + } + } + return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield); + } + + /** + * @brief (Unsupported in Keyspaces) Fetches account root object indexes by page. + * * @note Loading the cache by enumerating all accounts is currently unsupported by the AWS Keyspaces backend. + * This function's logic relies on "PER PARTITION LIMIT 1", which Keyspaces does not support, and there is + * no efficient alternative. This is acceptable as the cache is primarily loaded via diffs. Calling this + * function will throw an exception. + * + * @param number The total number of accounts to fetch. + * @param pageSize The maximum number of accounts per page. + * @param seq The accounts need to exist at this ledger sequence. + * @param yield The coroutine context. + * @return A vector of ripple::uint256 representing the account root hashes. + */ + std::vector + fetchAccountRoots( + [[maybe_unused]] std::uint32_t number, + [[maybe_unused]] std::uint32_t pageSize, + [[maybe_unused]] std::uint32_t seq, + [[maybe_unused]] boost::asio::yield_context yield + ) const override + { + ASSERT(false, "Fetching account roots is not supported by the Keyspaces backend."); + std::unreachable(); + } + +private: + std::vector + fetchNFTIDsByTaxon( + ripple::AccountID const& issuer, + std::uint32_t const taxon, + std::uint32_t const limit, + std::optional const& cursorIn, + boost::asio::yield_context yield + ) const + { + std::vector nftIDs; + Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); + statement.bindAt(1, taxon); + statement.bindAt(2, cursorIn.value_or(ripple::uint256(0))); + statement.bindAt(3, Limit{limit}); + + auto const res = executor_.read(yield, statement); + if (res && res.value().hasRows()) { + for (auto const [nftID] : extract(res.value())) + nftIDs.push_back(nftID); + } + return nftIDs; + } + + std::vector + fetchNFTIDsWithoutTaxon( + ripple::AccountID const& issuer, + std::uint32_t const limit, + std::optional const& cursorIn, + boost::asio::yield_context yield + ) const + { + std::vector nftIDs; + + auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0; + auto const startTokenID = cursorIn.value_or(ripple::uint256(0)); + + Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); + firstQuery.bindAt(1, startTaxon); + firstQuery.bindAt(2, startTokenID); + firstQuery.bindAt(3, Limit{limit}); + + auto const firstRes = executor_.read(yield, firstQuery); + if (firstRes) { + for (auto const [nftID] : extract(firstRes.value())) + nftIDs.push_back(nftID); + } + + if (nftIDs.size() < limit) { + auto const remainingLimit = limit - nftIDs.size(); + Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces.bind(issuer); + secondQuery.bindAt(1, startTaxon); + secondQuery.bindAt(2, Limit{remainingLimit}); + + auto const secondRes = executor_.read(yield, secondQuery); + if (secondRes) { + for (auto const [nftID] : extract(secondRes.value())) + nftIDs.push_back(nftID); + } + } + return nftIDs; + } + + /** + * @brief Takes a list of NFT IDs, fetches their full data, and assembles the final result with a cursor. + */ + NFTsAndCursor + populateNFTsAndCreateCursor( + std::vector const& nftIDs, + std::uint32_t const ledgerSequence, + std::uint32_t const limit, + boost::asio::yield_context yield + ) const + { + if (nftIDs.empty()) { + LOG(log_.debug()) << "No rows returned"; + return {}; + } + + NFTsAndCursor ret; + if (nftIDs.size() == limit) + ret.cursor = nftIDs.back(); + + // Prepare and execute queries to fetch NFT info and URIs in parallel. + std::vector selectNFTStatements; + selectNFTStatements.reserve(nftIDs.size()); + std::transform( + std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) { + return schema_->selectNFT.bind(nftID, ledgerSequence); + } + ); + + std::vector selectNFTURIStatements; + selectNFTURIStatements.reserve(nftIDs.size()); + std::transform( + std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) { + return schema_->selectNFTURI.bind(nftID, ledgerSequence); + } + ); + + auto const nftInfos = executor_.readEach(yield, selectNFTStatements); + auto const nftUris = executor_.readEach(yield, selectNFTURIStatements); + + // Combine the results into final NFT objects. + for (auto i = 0u; i < nftIDs.size(); ++i) { + if (auto const maybeRow = nftInfos[i].template get(); maybeRow) { + auto [seq, owner, isBurned] = *maybeRow; + NFT nft(nftIDs[i], seq, owner, isBurned); + if (auto const maybeUri = nftUris[i].template get(); maybeUri) + nft.uri = *maybeUri; + ret.nfts.push_back(nft); + } + } + return ret; + } +}; + +using KeyspaceBackend = BasicKeyspaceBackend>; + +} // namespace data::cassandra diff --git a/src/data/cassandra/CassandraBackendFamily.hpp b/src/data/cassandra/CassandraBackendFamily.hpp new file mode 100644 index 00000000..7233198d --- /dev/null +++ b/src/data/cassandra/CassandraBackendFamily.hpp @@ -0,0 +1,1077 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "data/DBHelpers.hpp" +#include "data/LedgerCacheInterface.hpp" +#include "data/LedgerHeaderCache.hpp" +#include "data/Types.hpp" +#include "data/cassandra/Concepts.hpp" +#include "data/cassandra/Handle.hpp" +#include "data/cassandra/Types.hpp" +#include "data/cassandra/impl/ExecutionStrategy.hpp" +#include "util/Assert.hpp" +#include "util/LedgerUtils.hpp" +#include "util/Profiler.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class CacheBackendCassandraTest; + +namespace data::cassandra { + +/** + * @brief Implements @ref BackendInterface for Cassandra/ScyllaDB/Keyspace. + * + * Note: This is a safer and more correct rewrite of the original implementation of the backend. + * + * @tparam SettingsProviderType The settings provider type to use + * @tparam ExecutionStrategyType The execution strategy type to use + * @tparam SchemaType The Schema type to use + * @tparam FetchLedgerCacheType The ledger header cache type to use + */ +template < + SomeSettingsProvider SettingsProviderType, + SomeExecutionStrategy ExecutionStrategyType, + typename SchemaType, + typename FetchLedgerCacheType = FetchLedgerCache> +class CassandraBackendFamily : public BackendInterface { +protected: + util::Logger log_{"Backend"}; + + SettingsProviderType settingsProvider_; + SchemaType schema_; + std::atomic_uint32_t ledgerSequence_ = 0u; + friend class ::CacheBackendCassandraTest; + + Handle handle_; + + // have to be mutable because BackendInterface constness :( + mutable ExecutionStrategyType executor_; + // TODO: move to interface level + mutable FetchLedgerCacheType ledgerCache_{}; + +public: + /** + * @brief Create a new cassandra/scylla backend instance. + * + * @param settingsProvider The settings provider to use + * @param cache The ledger cache to use + * @param readOnly Whether the database should be in readonly mode + */ + CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface& cache, bool readOnly) + : BackendInterface(cache) + , settingsProvider_{std::move(settingsProvider)} + , schema_{settingsProvider_} + , handle_{settingsProvider_.getSettings()} + , executor_{settingsProvider_.getSettings(), handle_} + { + if (auto const res = handle_.connect(); not res) + throw std::runtime_error("Could not connect to database: " + res.error()); + + if (not readOnly) { + if (auto const res = handle_.execute(schema_.createKeyspace); not res) { + // on datastax, creation of keyspaces can be configured to only be done thru the admin + // interface. this does not mean that the keyspace does not already exist tho. + if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED) + throw std::runtime_error("Could not create keyspace: " + res.error()); + } + + if (auto const res = handle_.executeEach(schema_.createSchema); not res) + throw std::runtime_error("Could not create schema: " + res.error()); + } + + try { + schema_.prepareStatements(handle_); + } catch (std::runtime_error const& ex) { + auto const error = fmt::format( + "Failed to prepare the statements: {}; readOnly: {}. ReadOnly should be turned off or another Clio " + "node with write access to DB should be started first.", + ex.what(), + readOnly + ); + LOG(log_.error()) << error; + throw std::runtime_error(error); + } + LOG(log_.info()) << "Created (revamped) CassandraBackend"; + } + + /* + * @brief Move constructor is deleted because handle_ is shared by reference with executor + */ + CassandraBackendFamily(CassandraBackendFamily&&) = delete; + + /** + * @copydoc BackendInterface::fetchAccountTransactions + */ + TransactionsAndCursor + fetchAccountTransactions( + ripple::AccountID const& account, + std::uint32_t const limit, + bool forward, + std::optional const& txnCursor, + boost::asio::yield_context yield + ) const override + { + auto rng = fetchLedgerRange(); + if (!rng) + return {.txns = {}, .cursor = {}}; + + Statement const statement = [this, forward, &account]() { + if (forward) + return schema_->selectAccountTxForward.bind(account); + + return schema_->selectAccountTx.bind(account); + }(); + + auto cursor = txnCursor; + if (cursor) { + statement.bindAt(1, cursor->asTuple()); + LOG(log_.debug()) << "account = " << ripple::strHex(account) << " tuple = " << cursor->ledgerSequence + << cursor->transactionIndex; + } else { + auto const seq = forward ? rng->minSequence : rng->maxSequence; + auto const placeHolder = forward ? 0u : std::numeric_limits::max(); + + statement.bindAt(1, std::make_tuple(placeHolder, placeHolder)); + LOG(log_.debug()) << "account = " << ripple::strHex(account) << " idx = " << seq + << " tuple = " << placeHolder; + } + + // FIXME: Limit is a hack to support uint32_t properly for the time + // being. Should be removed later and schema updated to use proper + // types. + statement.bindAt(2, Limit{limit}); + auto const res = executor_.read(yield, statement); + auto const& results = res.value(); + if (not results.hasRows()) { + LOG(log_.debug()) << "No rows returned"; + return {}; + } + + std::vector hashes = {}; + auto numRows = results.numRows(); + LOG(log_.info()) << "num_rows = " << numRows; + + for (auto [hash, data] : extract>(results)) { + hashes.push_back(hash); + if (--numRows == 0) { + LOG(log_.debug()) << "Setting cursor"; + cursor = data; + } + } + + auto const txns = fetchTransactions(hashes, yield); + LOG(log_.debug()) << "Txns = " << txns.size(); + + if (txns.size() == limit) { + LOG(log_.debug()) << "Returning cursor"; + return {txns, cursor}; + } + + return {txns, {}}; + } + + /** + * @copydoc BackendInterface::waitForWritesToFinish + */ + void + waitForWritesToFinish() override + { + executor_.sync(); + } + + /** + * @copydoc BackendInterface::writeLedger + */ + void + writeLedger(ripple::LedgerHeader const& ledgerHeader, std::string&& blob) override + { + executor_.write(schema_->insertLedgerHeader, ledgerHeader.seq, std::move(blob)); + + executor_.write(schema_->insertLedgerHash, ledgerHeader.hash, ledgerHeader.seq); + + ledgerSequence_ = ledgerHeader.seq; + } + + /** + * @copydoc BackendInterface::fetchLatestLedgerSequence + */ + std::optional + fetchLatestLedgerSequence(boost::asio::yield_context yield) const override + { + if (auto const res = executor_.read(yield, schema_->selectLatestLedger); res) { + if (auto const& result = res.value(); result) { + if (auto const maybeValue = result.template get(); maybeValue) + return maybeValue; + + LOG(log_.error()) << "Could not fetch latest ledger - no rows"; + return std::nullopt; + } + + LOG(log_.error()) << "Could not fetch latest ledger - no result"; + } else { + LOG(log_.error()) << "Could not fetch latest ledger: " << res.error(); + } + + return std::nullopt; + } + + /** + * @copydoc BackendInterface::fetchLedgerBySequence + */ + std::optional + fetchLedgerBySequence(std::uint32_t const sequence, boost::asio::yield_context yield) const override + { + if (auto const lock = ledgerCache_.get(); lock.has_value() && lock->seq == sequence) + return lock->ledger; + + auto const res = executor_.read(yield, schema_->selectLedgerBySeq, sequence); + if (res) { + if (auto const& result = res.value(); result) { + if (auto const maybeValue = result.template get>(); maybeValue) { + auto const header = util::deserializeHeader(ripple::makeSlice(*maybeValue)); + ledgerCache_.put(FetchLedgerCache::CacheEntry{header, sequence}); + return header; + } + + LOG(log_.error()) << "Could not fetch ledger by sequence - no rows"; + return std::nullopt; + } + + LOG(log_.error()) << "Could not fetch ledger by sequence - no result"; + } else { + LOG(log_.error()) << "Could not fetch ledger by sequence: " << res.error(); + } + + return std::nullopt; + } + + /** + * @copydoc BackendInterface::fetchLedgerByHash + */ + std::optional + fetchLedgerByHash(ripple::uint256 const& hash, boost::asio::yield_context yield) const override + { + if (auto const res = executor_.read(yield, schema_->selectLedgerByHash, hash); res) { + if (auto const& result = res.value(); result) { + if (auto const maybeValue = result.template get(); maybeValue) + return fetchLedgerBySequence(*maybeValue, yield); + + LOG(log_.error()) << "Could not fetch ledger by hash - no rows"; + return std::nullopt; + } + + LOG(log_.error()) << "Could not fetch ledger by hash - no result"; + } else { + LOG(log_.error()) << "Could not fetch ledger by hash: " << res.error(); + } + + return std::nullopt; + } + + /** + * @copydoc BackendInterface::hardFetchLedgerRange(boost::asio::yield_context) const + */ + std::optional + hardFetchLedgerRange(boost::asio::yield_context yield) const override + { + auto const res = executor_.read(yield, schema_->selectLedgerRange); + if (res) { + auto const& results = res.value(); + if (not results.hasRows()) { + LOG(log_.debug()) << "Could not fetch ledger range - no rows"; + return std::nullopt; + } + + // TODO: this is probably a good place to use user type in + // cassandra instead of having two rows with bool flag. or maybe at + // least use tuple? + LedgerRange range; + std::size_t idx = 0; + for (auto [seq] : extract(results)) { + if (idx == 0) { + range.maxSequence = range.minSequence = seq; + } else if (idx == 1) { + range.maxSequence = seq; + } + + ++idx; + } + + if (range.minSequence > range.maxSequence) + std::swap(range.minSequence, range.maxSequence); + + LOG(log_.debug()) << "After hardFetchLedgerRange range is " << range.minSequence << ":" + << range.maxSequence; + return range; + } + LOG(log_.error()) << "Could not fetch ledger range: " << res.error(); + + return std::nullopt; + } + + /** + * @copydoc BackendInterface::fetchAllTransactionsInLedger + */ + std::vector + fetchAllTransactionsInLedger(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override + { + auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence, yield); + return fetchTransactions(hashes, yield); + } + + /** + * @copydoc BackendInterface::fetchAllTransactionHashesInLedger + */ + std::vector + fetchAllTransactionHashesInLedger( + std::uint32_t const ledgerSequence, + boost::asio::yield_context yield + ) const override + { + auto start = std::chrono::system_clock::now(); + auto const res = executor_.read(yield, schema_->selectAllTransactionHashesInLedger, ledgerSequence); + + if (not res) { + LOG(log_.error()) << "Could not fetch all transaction hashes: " << res.error(); + return {}; + } + + auto const& result = res.value(); + if (not result.hasRows()) { + LOG(log_.warn()) << "Could not fetch all transaction hashes - no rows; ledger = " + << std::to_string(ledgerSequence); + return {}; + } + + std::vector hashes; + for (auto [hash] : extract(result)) + hashes.push_back(std::move(hash)); + + auto end = std::chrono::system_clock::now(); + LOG(log_.debug()) << "Fetched " << hashes.size() << " transaction hashes from database in " + << std::chrono::duration_cast(end - start).count() + << " milliseconds"; + + return hashes; + } + + /** + * @copydoc BackendInterface::fetchNFT + */ + std::optional + fetchNFT( + ripple::uint256 const& tokenID, + std::uint32_t const ledgerSequence, + boost::asio::yield_context yield + ) const override + { + auto const res = executor_.read(yield, schema_->selectNFT, tokenID, ledgerSequence); + if (not res) + return std::nullopt; + + if (auto const maybeRow = res->template get(); maybeRow) { + auto [seq, owner, isBurned] = *maybeRow; + auto result = std::make_optional(tokenID, seq, owner, isBurned); + + // now fetch URI. Usually we will have the URI even for burned NFTs, + // but if the first ledger on this clio included NFTokenBurn + // transactions we will not have the URIs for any of those tokens. + // In any other case not having the URI indicates something went + // wrong with our data. + // + // TODO - in the future would be great for any handlers that use + // this could inject a warning in this case (the case of not having + // a URI because it was burned in the first ledger) to indicate that + // even though we are returning a blank URI, the NFT might have had + // one. + auto uriRes = executor_.read(yield, schema_->selectNFTURI, tokenID, ledgerSequence); + if (uriRes) { + if (auto const maybeUri = uriRes->template get(); maybeUri) + result->uri = *maybeUri; + } + + return result; + } + + LOG(log_.error()) << "Could not fetch NFT - no rows"; + return std::nullopt; + } + + /** + * @copydoc BackendInterface::fetchNFTTransactions + */ + TransactionsAndCursor + fetchNFTTransactions( + ripple::uint256 const& tokenID, + std::uint32_t const limit, + bool const forward, + std::optional const& cursorIn, + boost::asio::yield_context yield + ) const override + { + auto rng = fetchLedgerRange(); + if (!rng) + return {.txns = {}, .cursor = {}}; + + Statement const statement = [this, forward, &tokenID]() { + if (forward) + return schema_->selectNFTTxForward.bind(tokenID); + + return schema_->selectNFTTx.bind(tokenID); + }(); + + auto cursor = cursorIn; + if (cursor) { + statement.bindAt(1, cursor->asTuple()); + LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " tuple = " << cursor->ledgerSequence + << cursor->transactionIndex; + } else { + auto const seq = forward ? rng->minSequence : rng->maxSequence; + auto const placeHolder = forward ? 0 : std::numeric_limits::max(); + + statement.bindAt(1, std::make_tuple(placeHolder, placeHolder)); + LOG(log_.debug()) << "token_id = " << ripple::strHex(tokenID) << " idx = " << seq + << " tuple = " << placeHolder; + } + + statement.bindAt(2, Limit{limit}); + + auto const res = executor_.read(yield, statement); + auto const& results = res.value(); + if (not results.hasRows()) { + LOG(log_.debug()) << "No rows returned"; + return {}; + } + + std::vector hashes = {}; + auto numRows = results.numRows(); + LOG(log_.info()) << "num_rows = " << numRows; + + for (auto [hash, data] : extract>(results)) { + hashes.push_back(hash); + if (--numRows == 0) { + LOG(log_.debug()) << "Setting cursor"; + cursor = data; + + // forward queries by ledger/tx sequence `>=` + // so we have to advance the index by one + if (forward) + ++cursor->transactionIndex; + } + } + + auto const txns = fetchTransactions(hashes, yield); + LOG(log_.debug()) << "NFT Txns = " << txns.size(); + + if (txns.size() == limit) { + LOG(log_.debug()) << "Returning cursor"; + return {txns, cursor}; + } + + return {txns, {}}; + } + + /** + * @copydoc BackendInterface::fetchMPTHolders + */ + MPTHoldersAndCursor + fetchMPTHolders( + ripple::uint192 const& mptID, + std::uint32_t const limit, + std::optional const& cursorIn, + std::uint32_t const ledgerSequence, + boost::asio::yield_context yield + ) const override + { + auto const holderEntries = executor_.read( + yield, schema_->selectMPTHolders, mptID, cursorIn.value_or(ripple::AccountID(0)), Limit{limit} + ); + + auto const& holderResults = holderEntries.value(); + if (not holderResults.hasRows()) { + LOG(log_.debug()) << "No rows returned"; + return {}; + } + + std::vector mptKeys; + std::optional cursor; + for (auto const [holder] : extract(holderResults)) { + mptKeys.push_back(ripple::keylet::mptoken(mptID, holder).key); + cursor = holder; + } + + auto mptObjects = doFetchLedgerObjects(mptKeys, ledgerSequence, yield); + + auto it = std::remove_if(mptObjects.begin(), mptObjects.end(), [](Blob const& mpt) { return mpt.empty(); }); + + mptObjects.erase(it, mptObjects.end()); + + ASSERT(mptKeys.size() <= limit, "Number of keys can't exceed the limit"); + if (mptKeys.size() == limit) + return {mptObjects, cursor}; + + return {mptObjects, {}}; + } + + /** + * @copydoc BackendInterface::doFetchLedgerObject + */ + std::optional + doFetchLedgerObject( + ripple::uint256 const& key, + std::uint32_t const sequence, + boost::asio::yield_context yield + ) const override + { + LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key); + if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) { + if (auto const result = res->template get(); result) { + if (result->size()) + return result; + } else { + LOG(log_.debug()) << "Could not fetch ledger object - no rows"; + } + } else { + LOG(log_.error()) << "Could not fetch ledger object: " << res.error(); + } + + return std::nullopt; + } + + /** + * @copydoc BackendInterface::doFetchLedgerObjectSeq + */ + std::optional + doFetchLedgerObjectSeq( + ripple::uint256 const& key, + std::uint32_t const sequence, + boost::asio::yield_context yield + ) const override + { + LOG(log_.debug()) << "Fetching ledger object for seq " << sequence << ", key = " << ripple::to_string(key); + if (auto const res = executor_.read(yield, schema_->selectObject, key, sequence); res) { + if (auto const result = res->template get(); result) { + auto [_, seq] = result.value(); + return seq; + } + LOG(log_.debug()) << "Could not fetch ledger object sequence - no rows"; + } else { + LOG(log_.error()) << "Could not fetch ledger object sequence: " << res.error(); + } + + return std::nullopt; + } + + /** + * @copydoc BackendInterface::fetchTransaction + */ + std::optional + fetchTransaction(ripple::uint256 const& hash, boost::asio::yield_context yield) const override + { + if (auto const res = executor_.read(yield, schema_->selectTransaction, hash); res) { + if (auto const maybeValue = res->template get(); maybeValue) { + auto [transaction, meta, seq, date] = *maybeValue; + return std::make_optional(transaction, meta, seq, date); + } + + LOG(log_.debug()) << "Could not fetch transaction - no rows"; + } else { + LOG(log_.error()) << "Could not fetch transaction: " << res.error(); + } + + return std::nullopt; + } + + /** + * @copydoc BackendInterface::doFetchSuccessorKey + */ + std::optional + doFetchSuccessorKey( + ripple::uint256 key, + std::uint32_t const ledgerSequence, + boost::asio::yield_context yield + ) const override + { + if (auto const res = executor_.read(yield, schema_->selectSuccessor, key, ledgerSequence); res) { + if (auto const result = res->template get(); result) { + if (*result == kLAST_KEY) + return std::nullopt; + return result; + } + + LOG(log_.debug()) << "Could not fetch successor - no rows"; + } else { + LOG(log_.error()) << "Could not fetch successor: " << res.error(); + } + + return std::nullopt; + } + + /** + * @copydoc BackendInterface::fetchTransactions + */ + std::vector + fetchTransactions(std::vector const& hashes, boost::asio::yield_context yield) const override + { + if (hashes.empty()) + return {}; + + auto const numHashes = hashes.size(); + std::vector results; + results.reserve(numHashes); + + std::vector statements; + statements.reserve(numHashes); + + auto const timeDiff = util::timed([this, yield, &results, &hashes, &statements]() { + // TODO: seems like a job for "hash IN (list of hashes)" instead? + std::transform( + std::cbegin(hashes), std::cend(hashes), std::back_inserter(statements), [this](auto const& hash) { + return schema_->selectTransaction.bind(hash); + } + ); + + auto const entries = executor_.readEach(yield, statements); + std::transform( + std::cbegin(entries), + std::cend(entries), + std::back_inserter(results), + [](auto const& res) -> TransactionAndMetadata { + if (auto const maybeRow = res.template get(); maybeRow) + return *maybeRow; + + return {}; + } + ); + }); + + ASSERT(numHashes == results.size(), "Number of hashes and results must match"); + LOG(log_.debug()) << "Fetched " << numHashes << " transactions from database in " << timeDiff + << " milliseconds"; + return results; + } + + /** + * @copydoc BackendInterface::doFetchLedgerObjects + */ + std::vector + doFetchLedgerObjects( + std::vector const& keys, + std::uint32_t const sequence, + boost::asio::yield_context yield + ) const override + { + if (keys.empty()) + return {}; + + auto const numKeys = keys.size(); + LOG(log_.trace()) << "Fetching " << numKeys << " objects"; + + std::vector results; + results.reserve(numKeys); + + std::vector statements; + statements.reserve(numKeys); + + // TODO: seems like a job for "key IN (list of keys)" instead? + std::transform( + std::cbegin(keys), std::cend(keys), std::back_inserter(statements), [this, &sequence](auto const& key) { + return schema_->selectObject.bind(key, sequence); + } + ); + + auto const entries = executor_.readEach(yield, statements); + std::transform( + std::cbegin(entries), std::cend(entries), std::back_inserter(results), [](auto const& res) -> Blob { + if (auto const maybeValue = res.template get(); maybeValue) + return *maybeValue; + + return {}; + } + ); + + LOG(log_.trace()) << "Fetched " << numKeys << " objects"; + return results; + } + + /** + * @copydoc BackendInterface::fetchLedgerDiff + */ + std::vector + fetchLedgerDiff(std::uint32_t const ledgerSequence, boost::asio::yield_context yield) const override + { + auto const [keys, timeDiff] = util::timed([this, &ledgerSequence, yield]() -> std::vector { + auto const res = executor_.read(yield, schema_->selectDiff, ledgerSequence); + if (not res) { + LOG(log_.error()) << "Could not fetch ledger diff: " << res.error() << "; ledger = " << ledgerSequence; + return {}; + } + + auto const& results = res.value(); + if (not results) { + LOG(log_.error()) << "Could not fetch ledger diff - no rows; ledger = " << ledgerSequence; + return {}; + } + + std::vector resultKeys; + for (auto [key] : extract(results)) + resultKeys.push_back(key); + + return resultKeys; + }); + + // one of the above errors must have happened + if (keys.empty()) + return {}; + + LOG(log_.debug()) << "Fetched " << keys.size() << " diff hashes from database in " << timeDiff + << " milliseconds"; + + auto const objs = fetchLedgerObjects(keys, ledgerSequence, yield); + std::vector results; + results.reserve(keys.size()); + + std::transform( + std::cbegin(keys), + std::cend(keys), + std::cbegin(objs), + std::back_inserter(results), + [](auto const& key, auto const& obj) { return LedgerObject{key, obj}; } + ); + + return results; + } + + /** + * @copydoc BackendInterface::fetchMigratorStatus + */ + std::optional + fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const override + { + auto const res = executor_.read(yield, schema_->selectMigratorStatus, Text(migratorName)); + if (not res) { + LOG(log_.error()) << "Could not fetch migrator status: " << res.error(); + return {}; + } + + auto const& results = res.value(); + if (not results) { + return {}; + } + + for (auto [statusString] : extract(results)) + return statusString; + + return {}; + } + + /** + * @copydoc BackendInterface::fetchClioNodesData + */ + std::expected>, std::string> + fetchClioNodesData(boost::asio::yield_context yield) const override + { + auto const readResult = executor_.read(yield, schema_->selectClioNodesData); + if (not readResult) + return std::unexpected{readResult.error().message()}; + + std::vector> result; + + for (auto [uuid, message] : extract(*readResult)) { + result.emplace_back(uuid, std::move(message)); + } + + return result; + } + + /** + * @copydoc BackendInterface::doWriteLedgerObject + */ + void + doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override + { + LOG(log_.trace()) << " Writing ledger object " << key.size() << ":" << seq << " [" << blob.size() << " bytes]"; + + if (range_) + executor_.write(schema_->insertDiff, seq, key); + + executor_.write(schema_->insertObject, std::move(key), seq, std::move(blob)); + } + + /** + * @copydoc BackendInterface::writeSuccessor + */ + void + writeSuccessor(std::string&& key, std::uint32_t const seq, std::string&& successor) override + { + LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. " + << " seq = " << std::to_string(seq) << " successor = " << successor.size() << " bytes."; + ASSERT(!key.empty(), "Key must not be empty"); + ASSERT(!successor.empty(), "Successor must not be empty"); + + executor_.write(schema_->insertSuccessor, std::move(key), seq, std::move(successor)); + } + + /** + * @copydoc BackendInterface::writeAccountTransactions + */ + void + writeAccountTransactions(std::vector data) override + { + std::vector statements; + statements.reserve(data.size() * 10); // assume 10 transactions avg + + for (auto& record : data) { + std::ranges::transform(record.accounts, std::back_inserter(statements), [this, &record](auto&& account) { + return schema_->insertAccountTx.bind( + std::forward(account), + std::make_tuple(record.ledgerSequence, record.transactionIndex), + record.txHash + ); + }); + } + + executor_.write(std::move(statements)); + } + + /** + * @copydoc BackendInterface::writeAccountTransaction + */ + void + writeAccountTransaction(AccountTransactionsData record) override + { + std::vector statements; + statements.reserve(record.accounts.size()); + + std::ranges::transform(record.accounts, std::back_inserter(statements), [this, &record](auto&& account) { + return schema_->insertAccountTx.bind( + std::forward(account), + std::make_tuple(record.ledgerSequence, record.transactionIndex), + record.txHash + ); + }); + + executor_.write(std::move(statements)); + } + + /** + * @copydoc BackendInterface::writeNFTTransactions + */ + void + writeNFTTransactions(std::vector const& data) override + { + std::vector statements; + statements.reserve(data.size()); + + std::ranges::transform(data, std::back_inserter(statements), [this](auto const& record) { + return schema_->insertNFTTx.bind( + record.tokenID, std::make_tuple(record.ledgerSequence, record.transactionIndex), record.txHash + ); + }); + + executor_.write(std::move(statements)); + } + + /** + * @copydoc BackendInterface::writeTransaction + */ + void + writeTransaction( + std::string&& hash, + std::uint32_t const seq, + std::uint32_t const date, + std::string&& transaction, + std::string&& metadata + ) override + { + LOG(log_.trace()) << "Writing txn to database"; + + executor_.write(schema_->insertLedgerTransaction, seq, hash); + executor_.write( + schema_->insertTransaction, std::move(hash), seq, date, std::move(transaction), std::move(metadata) + ); + } + + /** + * @copydoc BackendInterface::writeNFTs + */ + void + writeNFTs(std::vector const& data) override + { + std::vector statements; + statements.reserve(data.size() * 3); + + for (NFTsData const& record : data) { + if (!record.onlyUriChanged) { + statements.push_back( + schema_->insertNFT.bind(record.tokenID, record.ledgerSequence, record.owner, record.isBurned) + ); + + // If `uri` is set (and it can be set to an empty uri), we know this + // is a net-new NFT. That is, this NFT has not been seen before by + // us _OR_ it is in the extreme edge case of a re-minted NFT ID with + // the same NFT ID as an already-burned token. In this case, we need + // to record the URI and link to the issuer_nf_tokens table. + if (record.uri) { + statements.push_back(schema_->insertIssuerNFT.bind( + ripple::nft::getIssuer(record.tokenID), + static_cast(ripple::nft::getTaxon(record.tokenID)), + record.tokenID + )); + statements.push_back( + schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value()) + ); + } + } else { + // only uri changed, we update the uri table only + statements.push_back( + schema_->insertNFTURI.bind(record.tokenID, record.ledgerSequence, record.uri.value()) + ); + } + } + + executor_.writeEach(std::move(statements)); + } + + /** + * @copydoc BackendInterface::writeNFTs + */ + void + writeMPTHolders(std::vector const& data) override + { + std::vector statements; + statements.reserve(data.size()); + for (auto [mptId, holder] : data) + statements.push_back(schema_->insertMPTHolder.bind(mptId, holder)); + + executor_.write(std::move(statements)); + } + + /** + * @copydoc BackendInterface::startWrites + */ + void + startWrites() const override + { + // Note: no-op in original implementation too. + // probably was used in PG to start a transaction or smth. + } + + /** + * @copydoc BackendInterface::writeMigratorStatus + */ + void + writeMigratorStatus(std::string const& migratorName, std::string const& status) override + { + executor_.writeSync( + schema_->insertMigratorStatus, data::cassandra::Text{migratorName}, data::cassandra::Text(status) + ); + } + + /** + * @copydoc BackendInterface::writeNodeMessage + */ + void + writeNodeMessage(boost::uuids::uuid const& uuid, std::string message) override + { + executor_.writeSync(schema_->updateClioNodeMessage, data::cassandra::Text{std::move(message)}, uuid); + } + + /** + * @copydoc BackendInterface::isTooBusy + */ + bool + isTooBusy() const override + { + return executor_.isTooBusy(); + } + + /** + * @copydoc BackendInterface::stats + */ + boost::json::object + stats() const override + { + return executor_.stats(); + } + +protected: + /** + * @brief Executes statements and tries to write to DB + * + * @param statement statement to execute + * @return true if successful, false if it fails + */ + bool + executeSyncUpdate(Statement statement) + { + auto const res = executor_.writeSync(statement); + auto maybeSuccess = res->template get(); + if (not maybeSuccess) { + LOG(log_.error()) << "executeSyncUpdate - error getting result - no row"; + return false; + } + + if (not maybeSuccess.value()) { + LOG(log_.warn()) << "Update failed. Checking if DB state is what we expect"; + + // error may indicate that another writer wrote something. + // in this case let's just compare the current state of things + // against what we were trying to write in the first place and + // use that as the source of truth for the result. + auto rng = hardFetchLedgerRangeNoThrow(); + return rng && rng->maxSequence == ledgerSequence_; + } + + return true; + } +}; + +} // namespace data::cassandra diff --git a/src/data/cassandra/CassandraSchema.hpp b/src/data/cassandra/CassandraSchema.hpp new file mode 100644 index 00000000..e207606b --- /dev/null +++ b/src/data/cassandra/CassandraSchema.hpp @@ -0,0 +1,178 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/cassandra/Concepts.hpp" +#include "data/cassandra/Handle.hpp" +#include "data/cassandra/Schema.hpp" +#include "data/cassandra/SettingsProvider.hpp" +#include "data/cassandra/Types.hpp" +#include "util/log/Logger.hpp" + +#include +#include + +#include +#include + +namespace data::cassandra { + +/** + * @brief Manages the DB schema and provides access to prepared statements. + */ +template +class CassandraSchema : public Schema { + using Schema::Schema; + +public: + /** + * @brief Construct a new Cassandra Schema object + * + * @param settingsProvider The settings provider + */ + struct CassandraStatements : public Schema::Statements { + using Schema::Statements::Statements; + + // + // Update (and "delete") queries + // + PreparedStatement updateLedgerRange = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + UPDATE {} + SET sequence = ? + WHERE is_latest = ? + IF sequence IN (?, null) + )", + qualifiedTableName(settingsProvider_.get(), "ledger_range") + ) + ); + }(); + + // + // Select queries + // + + PreparedStatement selectNFTIDsByIssuer = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT token_id + FROM {} + WHERE issuer = ? + AND (taxon, token_id) > ? + ORDER BY taxon ASC, token_id ASC + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2") + ) + ); + }(); + + PreparedStatement selectAccountFromBeginning = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT account + FROM {} + WHERE token(account) > 0 + PER PARTITION LIMIT 1 + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "account_tx") + ) + ); + }(); + + PreparedStatement selectAccountFromToken = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT account + FROM {} + WHERE token(account) > token(?) + PER PARTITION LIMIT 1 + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "account_tx") + ) + ); + }(); + + PreparedStatement selectLedgerPageKeys = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT key + FROM {} + WHERE TOKEN(key) >= ? + AND sequence <= ? + PER PARTITION LIMIT 1 + LIMIT ? + ALLOW FILTERING + )", + qualifiedTableName(settingsProvider_.get(), "objects") + ) + ); + }(); + + PreparedStatement selectLedgerPage = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT object, key + FROM {} + WHERE TOKEN(key) >= ? + AND sequence <= ? + PER PARTITION LIMIT 1 + LIMIT ? + ALLOW FILTERING + )", + qualifiedTableName(settingsProvider_.get(), "objects") + ) + ); + }(); + }; + + void + prepareStatements(Handle const& handle) override + { + LOG(log_.info()) << "Preparing cassandra statements"; + statements_ = std::make_unique(settingsProvider_, handle); + LOG(log_.info()) << "Finished preparing statements"; + } + + /** + * @brief Provides access to statements. + * + * @return The statements + */ + std::unique_ptr const& + operator->() const + { + return statements_; + } + +private: + std::unique_ptr statements_{nullptr}; +}; + +} // namespace data::cassandra diff --git a/src/data/cassandra/KeyspaceSchema.hpp b/src/data/cassandra/KeyspaceSchema.hpp new file mode 100644 index 00000000..b4d08166 --- /dev/null +++ b/src/data/cassandra/KeyspaceSchema.hpp @@ -0,0 +1,140 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/cassandra/Concepts.hpp" +#include "data/cassandra/Handle.hpp" +#include "data/cassandra/Schema.hpp" +#include "data/cassandra/SettingsProvider.hpp" +#include "data/cassandra/Types.hpp" +#include "util/log/Logger.hpp" + +#include +#include + +#include +#include + +namespace data::cassandra { + +/** + * @brief Manages the DB schema and provides access to prepared statements. + */ +template +class KeyspaceSchema : public Schema { +public: + using Schema::Schema; + + /** + * @brief Construct a new Keyspace Schema object + * + * @param settingsProvider The settings provider + */ + struct KeyspaceStatements : public Schema::Statements { + using Schema::Statements::Statements; + + // + // Insert queries + // + PreparedStatement insertLedgerRange = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + INSERT INTO {} (is_latest, sequence) VALUES (?, ?) IF NOT EXISTS + )", + qualifiedTableName(settingsProvider_.get(), "ledger_range") + ) + ); + }(); + + // + // Update (and "delete") queries + // + PreparedStatement updateLedgerRange = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + UPDATE {} + SET sequence = ? + WHERE is_latest = ? + IF sequence = ? + )", + qualifiedTableName(settingsProvider_.get(), "ledger_range") + ) + ); + }(); + + PreparedStatement selectLedgerRange = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT sequence + FROM {} + WHERE is_latest in (True, False) + )", + qualifiedTableName(settingsProvider_.get(), "ledger_range") + ) + ); + }(); + + // + // Select queries + // + PreparedStatement selectNFTsAfterTaxonKeyspaces = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT token_id + FROM {} + WHERE issuer = ? + AND taxon > ? + ORDER BY taxon ASC, token_id ASC + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2") + ) + ); + }(); + }; + + void + prepareStatements(Handle const& handle) override + { + LOG(log_.info()) << "Preparing aws keyspace statements"; + statements_ = std::make_unique(settingsProvider_, handle); + LOG(log_.info()) << "Finished preparing statements"; + } + + /** + * @brief Provides access to statements. + * + * @return The statements + */ + std::unique_ptr const& + operator->() const + { + return statements_; + } + +private: + std::unique_ptr statements_{nullptr}; +}; + +} // namespace data::cassandra diff --git a/src/data/cassandra/Schema.hpp b/src/data/cassandra/Schema.hpp index 0ead25db..d89952c4 100644 --- a/src/data/cassandra/Schema.hpp +++ b/src/data/cassandra/Schema.hpp @@ -24,10 +24,10 @@ #include "data/cassandra/Types.hpp" #include "util/log/Logger.hpp" +#include #include #include -#include #include #include #include @@ -53,12 +53,15 @@ template */ template class Schema { +protected: util::Logger log_{"Backend"}; std::reference_wrapper settingsProvider_; public: + virtual ~Schema() = default; + /** - * @brief Construct a new Schema object + * @brief Shared Schema's between all Schema classes (Cassandra and Keyspace) * * @param settingsProvider The settings provider */ @@ -334,6 +337,7 @@ public: * @brief Prepared statements holder. */ class Statements { + protected: std::reference_wrapper settingsProvider_; std::reference_wrapper handle_; @@ -526,20 +530,6 @@ public: // Update (and "delete") queries // - PreparedStatement updateLedgerRange = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - UPDATE {} - SET sequence = ? - WHERE is_latest = ? - IF sequence IN (?, null) - )", - qualifiedTableName(settingsProvider_.get(), "ledger_range") - ) - ); - }(); - PreparedStatement deleteLedgerRange = [this]() { return handle_.get().prepare( fmt::format( @@ -654,40 +644,6 @@ public: ); }(); - PreparedStatement selectLedgerPageKeys = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT key - FROM {} - WHERE TOKEN(key) >= ? - AND sequence <= ? - PER PARTITION LIMIT 1 - LIMIT ? - ALLOW FILTERING - )", - qualifiedTableName(settingsProvider_.get(), "objects") - ) - ); - }(); - - PreparedStatement selectLedgerPage = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT object, key - FROM {} - WHERE TOKEN(key) >= ? - AND sequence <= ? - PER PARTITION LIMIT 1 - LIMIT ? - ALLOW FILTERING - )", - qualifiedTableName(settingsProvider_.get(), "objects") - ) - ); - }(); - PreparedStatement getToken = [this]() { return handle_.get().prepare( fmt::format( @@ -717,36 +673,6 @@ public: ); }(); - PreparedStatement selectAccountFromBeginning = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT account - FROM {} - WHERE token(account) > 0 - PER PARTITION LIMIT 1 - LIMIT ? - )", - qualifiedTableName(settingsProvider_.get(), "account_tx") - ) - ); - }(); - - PreparedStatement selectAccountFromToken = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT account - FROM {} - WHERE token(account) > token(?) - PER PARTITION LIMIT 1 - LIMIT ? - )", - qualifiedTableName(settingsProvider_.get(), "account_tx") - ) - ); - }(); - PreparedStatement selectAccountTxForward = [this]() { return handle_.get().prepare( fmt::format( @@ -827,22 +753,6 @@ public: ); }(); - PreparedStatement selectNFTIDsByIssuer = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT token_id - FROM {} - WHERE issuer = ? - AND (taxon, token_id) > ? - ORDER BY taxon ASC, token_id ASC - LIMIT ? - )", - qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2") - ) - ); - }(); - PreparedStatement selectNFTIDsByIssuerTaxon = [this]() { return handle_.get().prepare( fmt::format( @@ -960,27 +870,8 @@ public: * * @param handle The handle to the DB */ - void - prepareStatements(Handle const& handle) - { - LOG(log_.info()) << "Preparing cassandra statements"; - statements_ = std::make_unique(settingsProvider_, handle); - LOG(log_.info()) << "Finished preparing statements"; - } - - /** - * @brief Provides access to statements. - * - * @return The statements - */ - std::unique_ptr const& - operator->() const - { - return statements_; - } - -private: - std::unique_ptr statements_{nullptr}; + virtual void + prepareStatements(Handle const& handle) = 0; }; } // namespace data::cassandra diff --git a/src/data/cassandra/SettingsProvider.cpp b/src/data/cassandra/SettingsProvider.cpp index 0eda7215..889633e4 100644 --- a/src/data/cassandra/SettingsProvider.cpp +++ b/src/data/cassandra/SettingsProvider.cpp @@ -97,6 +97,7 @@ SettingsProvider::parseSettings() const settings.coreConnectionsPerHost = config_.get("core_connections_per_host"); settings.queueSizeIO = config_.maybeValue("queue_size_io"); settings.writeBatchSize = config_.get("write_batch_size"); + settings.provider = config_.get("provider"); if (config_.getValueView("connect_timeout").hasValue()) { auto const connectTimeoutSecond = config_.get("connect_timeout"); diff --git a/src/data/cassandra/impl/Batch.cpp b/src/data/cassandra/impl/Batch.cpp index b26e889c..48841a27 100644 --- a/src/data/cassandra/impl/Batch.cpp +++ b/src/data/cassandra/impl/Batch.cpp @@ -36,9 +36,18 @@ constexpr auto kBATCH_DELETER = [](CassBatch* ptr) { cass_batch_free(ptr); }; namespace data::cassandra::impl { -// TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases +/* + * There are 2 main batches of Cassandra Statements: + * LOGGED: Ensures all updates in the batch succeed together, or none do. + * Use this for critical, related changes (e.g., for the same user), but it is slower. + * + * UNLOGGED: For performance. Sends many separate updates in one network trip to be fast. + * Use this for bulk-loading unrelated data, but know there's NO all-or-nothing guarantee. + * + * More info here: https://docs.datastax.com/en/developer/cpp-driver-dse/1.10/features/basics/batches/index.html + */ Batch::Batch(std::vector const& statements) - : ManagedObject{cass_batch_new(CASS_BATCH_TYPE_LOGGED), kBATCH_DELETER} + : ManagedObject{cass_batch_new(CASS_BATCH_TYPE_UNLOGGED), kBATCH_DELETER} { cass_batch_set_is_idempotent(*this, cass_true); diff --git a/src/data/cassandra/impl/Cluster.cpp b/src/data/cassandra/impl/Cluster.cpp index 05afc145..7dcf7e9a 100644 --- a/src/data/cassandra/impl/Cluster.cpp +++ b/src/data/cassandra/impl/Cluster.cpp @@ -60,6 +60,17 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), k cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count()); cass_cluster_set_request_timeout(*this, settings.requestTimeout.count()); + // TODO: AWS keyspace reads should be local_one to save cost + if (settings.provider == toString(cassandra::impl::Provider::Keyspace)) { + if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); rc != CASS_OK) { + throw std::runtime_error(fmt::format("Error setting keyspace consistency: {}", cass_error_desc(rc))); + } + } else { + if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_QUORUM); rc != CASS_OK) { + throw std::runtime_error(fmt::format("Error setting cassandra consistency: {}", cass_error_desc(rc))); + } + } + if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost); rc != CASS_OK) { throw std::runtime_error(fmt::format("Could not set core connections per host: {}", cass_error_desc(rc))); diff --git a/src/data/cassandra/impl/Cluster.hpp b/src/data/cassandra/impl/Cluster.hpp index 61bdbfd5..bd137650 100644 --- a/src/data/cassandra/impl/Cluster.hpp +++ b/src/data/cassandra/impl/Cluster.hpp @@ -31,10 +31,29 @@ #include #include #include +#include #include namespace data::cassandra::impl { +namespace { + +enum class Provider { Cassandra, Keyspace }; + +inline std::string +toString(Provider provider) +{ + switch (provider) { + case Provider::Cassandra: + return "cassandra"; + case Provider::Keyspace: + return "aws_keyspace"; + } + std::unreachable(); +} + +} // namespace + // TODO: move Settings to public interface, not impl /** @@ -45,6 +64,7 @@ struct Settings { static constexpr uint32_t kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000; static constexpr uint32_t kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000; static constexpr std::size_t kDEFAULT_BATCH_SIZE = 20; + static constexpr Provider kDEFAULT_PROVIDER = Provider::Cassandra; /** * @brief Represents the configuration of contact points for cassandra. @@ -83,11 +103,14 @@ struct Settings { uint32_t maxReadRequestsOutstanding = kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING; /** @brief The number of connection per host to always have active */ - uint32_t coreConnectionsPerHost = 1u; + uint32_t coreConnectionsPerHost = 3u; /** @brief Size of batches when writing */ std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE; + /** @brief Provider to know if we are using scylladb or keyspace */ + std::string provider = toString(kDEFAULT_PROVIDER); + /** @brief Size of the IO queue */ std::optional queueSizeIO = std::nullopt; // NOLINT(readability-redundant-member-init) diff --git a/src/data/cassandra/impl/Statement.hpp b/src/data/cassandra/impl/Statement.hpp index d184f643..6998c741 100644 --- a/src/data/cassandra/impl/Statement.hpp +++ b/src/data/cassandra/impl/Statement.hpp @@ -58,14 +58,16 @@ public: explicit Statement(std::string_view query, Args&&... args) : ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), kDELETER} { - cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM); + // TODO: figure out how to set consistency level in config + // NOTE: Keyspace doesn't support QUORUM at write level + // cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); cass_statement_set_is_idempotent(*this, cass_true); bind(std::forward(args)...); } /* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, kDELETER} { - cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM); + // cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); cass_statement_set_is_idempotent(*this, cass_true); } diff --git a/src/util/config/ConfigConstraints.hpp b/src/util/config/ConfigConstraints.hpp index 4f64d0b6..3ee89ee9 100644 --- a/src/util/config/ConfigConstraints.hpp +++ b/src/util/config/ConfigConstraints.hpp @@ -84,6 +84,11 @@ static constexpr std::array kDATABASE_TYPE = {"cassandra"}; */ static constexpr std::array kPROCESSING_POLICY = {"parallel", "sequent"}; +/** + * @brief specific values that are accepted for database provider in config. + */ +static constexpr std::array kPROVIDER = {"cassandra", "aws_keyspace"}; + /** * @brief An interface to enforce constraints on certain values within ClioConfigDefinition. */ @@ -470,6 +475,7 @@ static constinit OneOf gValidateCassandraName{"database.type", kDATABASE_TYPE}; static constinit OneOf gValidateLoadMode{"cache.load", kLOAD_CACHE_MODE}; static constinit OneOf gValidateLogTag{"log.tag_style", kLOG_TAGS}; static constinit OneOf gValidateProcessingPolicy{"server.processing_policy", kPROCESSING_POLICY}; +static constinit OneOf gValidateProvider{"database.cassandra.provider", kPROVIDER}; static constinit PositiveDouble gValidatePositiveDouble{}; diff --git a/src/util/config/ConfigDefinition.cpp b/src/util/config/ConfigDefinition.cpp index 2ef754d3..652f3715 100644 --- a/src/util/config/ConfigDefinition.cpp +++ b/src/util/config/ConfigDefinition.cpp @@ -285,6 +285,8 @@ getClioConfig() {"database.cassandra.username", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.password", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()}, + {"database.cassandra.provider", + ConfigValue{ConfigType::String}.defaultValue("cassandra").withConstraint(gValidateProvider)}, {"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, {"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, diff --git a/src/util/config/ConfigDescription.hpp b/src/util/config/ConfigDescription.hpp index c7975bee..963169ac 100644 --- a/src/util/config/ConfigDescription.hpp +++ b/src/util/config/ConfigDescription.hpp @@ -173,6 +173,7 @@ This document provides a list of all available Clio configuration properties in "Maximum number of outstanding read requests. Read requests are API calls that read from the database."}, KV{.key = "database.cassandra.threads", .value = "Represents the number of threads that will be used for database operations."}, + KV{.key = "database.cassandra.provider", .value = "The specific database backend provider we are using."}, KV{.key = "database.cassandra.core_connections_per_host", .value = "The number of core connections per host for the Cassandra database."}, KV{.key = "database.cassandra.queue_size_io", diff --git a/tests/integration/data/BackendFactoryTests.cpp b/tests/integration/data/BackendFactoryTests.cpp index ab97cd6d..359a3b03 100644 --- a/tests/integration/data/BackendFactoryTests.cpp +++ b/tests/integration/data/BackendFactoryTests.cpp @@ -44,6 +44,7 @@ using namespace util::config; struct BackendCassandraFactoryTest : SyncAsioContextTest, util::prometheus::WithPrometheus { static constexpr auto kKEYSPACE = "factory_test"; + static constexpr auto kPROVIDER = "cassandra"; protected: ClioConfigDefinition cfg_{ @@ -53,6 +54,7 @@ protected: {"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(kKEYSPACE)}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue(kPROVIDER)}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)}, diff --git a/tests/integration/data/cassandra/BackendTests.cpp b/tests/integration/data/cassandra/BackendTests.cpp index cd79da21..3e5a6ac6 100644 --- a/tests/integration/data/cassandra/BackendTests.cpp +++ b/tests/integration/data/cassandra/BackendTests.cpp @@ -93,6 +93,7 @@ protected: {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)}, diff --git a/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp b/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp index 4a3a95fa..8b0325a2 100644 --- a/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp +++ b/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp @@ -102,6 +102,7 @@ protected: ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)}, {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)}, diff --git a/tests/unit/data/cassandra/SettingsProviderTests.cpp b/tests/unit/data/cassandra/SettingsProviderTests.cpp index f9c17eaf..8fe36392 100644 --- a/tests/unit/data/cassandra/SettingsProviderTests.cpp +++ b/tests/unit/data/cassandra/SettingsProviderTests.cpp @@ -58,6 +58,7 @@ getParseSettingsConfig(boost::json::value val) {"database.cassandra.certificate", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.username", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.password", ConfigValue{ConfigType::String}.optional()}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")}, {"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.write_batch_size", ConfigValue{ConfigType::Integer}.defaultValue(20)}, {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.optional()},