mirror of
				https://github.com/XRPLF/clio.git
				synced 2025-11-04 03:45:50 +00:00 
			
		
		
		
	refactor: Keyspace comments (#2684)
Co-authored-by: Ayaz Salikhov <mathbunnyru@users.noreply.github.com>
This commit is contained in:
		@@ -46,6 +46,7 @@ namespace data {
 | 
			
		||||
inline std::shared_ptr<BackendInterface>
 | 
			
		||||
makeBackend(util::config::ClioConfigDefinition const& config, data::LedgerCacheInterface& cache)
 | 
			
		||||
{
 | 
			
		||||
    using namespace cassandra::impl;
 | 
			
		||||
    static util::Logger const log{"Backend"};  // NOLINT(readability-identifier-naming)
 | 
			
		||||
    LOG(log.info()) << "Constructing BackendInterface";
 | 
			
		||||
 | 
			
		||||
@@ -56,7 +57,7 @@ makeBackend(util::config::ClioConfigDefinition const& config, data::LedgerCacheI
 | 
			
		||||
 | 
			
		||||
    if (boost::iequals(type, "cassandra")) {
 | 
			
		||||
        auto const cfg = config.getObject("database." + type);
 | 
			
		||||
        if (cfg.getValueView("provider").asString() == toString(cassandra::impl::Provider::Keyspace)) {
 | 
			
		||||
        if (providerFromString(cfg.getValueView("provider").asString()) == Provider::Keyspace) {
 | 
			
		||||
            backend = std::make_shared<data::cassandra::KeyspaceBackend>(
 | 
			
		||||
                data::cassandra::SettingsProvider{cfg}, cache, readOnly
 | 
			
		||||
            );
 | 
			
		||||
 
 | 
			
		||||
@@ -189,10 +189,11 @@ public:
 | 
			
		||||
        auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
 | 
			
		||||
 | 
			
		||||
        for (auto i = 0u; i < nftIDs.size(); i++) {
 | 
			
		||||
            if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
 | 
			
		||||
            if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>();
 | 
			
		||||
                maybeRow.has_value()) {
 | 
			
		||||
                auto [seq, owner, isBurned] = *maybeRow;
 | 
			
		||||
                NFT nft(nftIDs[i], seq, owner, isBurned);
 | 
			
		||||
                if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
 | 
			
		||||
                if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri.has_value())
 | 
			
		||||
                    nft.uri = *maybeUri;
 | 
			
		||||
                ret.nfts.push_back(nft);
 | 
			
		||||
            }
 | 
			
		||||
 
 | 
			
		||||
@@ -57,9 +57,9 @@ namespace data::cassandra {
 | 
			
		||||
/**
 | 
			
		||||
 * @brief Implements @ref CassandraBackendFamily for Keyspace
 | 
			
		||||
 *
 | 
			
		||||
 * @tparam SettingsProviderType The settings provider type to use
 | 
			
		||||
 * @tparam ExecutionStrategyType The execution strategy type to use
 | 
			
		||||
 * @tparam FetchLedgerCacheType The ledger header cache type to use
 | 
			
		||||
 * @tparam SettingsProviderType The settings provider type
 | 
			
		||||
 * @tparam ExecutionStrategyType The execution strategy type
 | 
			
		||||
 * @tparam FetchLedgerCacheType The ledger header cache type
 | 
			
		||||
 */
 | 
			
		||||
template <
 | 
			
		||||
    SomeSettingsProvider SettingsProviderType,
 | 
			
		||||
@@ -101,9 +101,9 @@ public:
 | 
			
		||||
        // !range_.has_value() means the table 'ledger_range' is not populated;
 | 
			
		||||
        // This would be the first write to the table.
 | 
			
		||||
        // In this case, insert both min_sequence/max_sequence range into the table.
 | 
			
		||||
        if (not(range_.has_value())) {
 | 
			
		||||
            executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_);
 | 
			
		||||
            executor_.writeSync(schema_->insertLedgerRange, true, ledgerSequence_);
 | 
			
		||||
        if (not range_.has_value()) {
 | 
			
		||||
            executor_.writeSync(schema_->insertLedgerRange, /* isLatestLedger =*/false, ledgerSequence_);
 | 
			
		||||
            executor_.writeSync(schema_->insertLedgerRange, /* isLatestLedger =*/true, ledgerSequence_);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (not this->executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
 | 
			
		||||
@@ -130,7 +130,7 @@ public:
 | 
			
		||||
            // Keyspace and ScyllaDB uses the same logic for taxon-filtered queries
 | 
			
		||||
            nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield);
 | 
			
		||||
        } else {
 | 
			
		||||
            // --- Amazon Keyspaces Workflow for non-taxon queries ---
 | 
			
		||||
            // Amazon Keyspaces Workflow for non-taxon queries
 | 
			
		||||
            auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
 | 
			
		||||
            auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
 | 
			
		||||
 | 
			
		||||
@@ -140,8 +140,8 @@ public:
 | 
			
		||||
            firstQuery.bindAt(3, Limit{limit});
 | 
			
		||||
 | 
			
		||||
            auto const firstRes = executor_.read(yield, firstQuery);
 | 
			
		||||
            if (firstRes) {
 | 
			
		||||
                for (auto const [nftID] : extract<ripple::uint256>(firstRes.value()))
 | 
			
		||||
            if (firstRes.has_value()) {
 | 
			
		||||
                for (auto const [nftID] : extract<ripple::uint256>(*firstRes))
 | 
			
		||||
                    nftIDs.push_back(nftID);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@@ -152,8 +152,8 @@ public:
 | 
			
		||||
                secondQuery.bindAt(2, Limit{remainingLimit});
 | 
			
		||||
 | 
			
		||||
                auto const secondRes = executor_.read(yield, secondQuery);
 | 
			
		||||
                if (secondRes) {
 | 
			
		||||
                    for (auto const [nftID] : extract<ripple::uint256>(secondRes.value()))
 | 
			
		||||
                if (secondRes.has_value()) {
 | 
			
		||||
                    for (auto const [nftID] : extract<ripple::uint256>(*secondRes))
 | 
			
		||||
                        nftIDs.push_back(nftID);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
@@ -163,7 +163,7 @@ public:
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief (Unsupported in Keyspaces) Fetches account root object indexes by page.
 | 
			
		||||
     * * @note Loading the cache by enumerating all accounts is currently unsupported by the AWS Keyspaces backend.
 | 
			
		||||
     * @note Loading the cache by enumerating all accounts is currently unsupported by the AWS Keyspaces backend.
 | 
			
		||||
     * This function's logic relies on "PER PARTITION LIMIT 1", which Keyspaces does not support, and there is
 | 
			
		||||
     * no efficient alternative. This is acceptable as the cache is primarily loaded via diffs. Calling this
 | 
			
		||||
     * function will throw an exception.
 | 
			
		||||
@@ -203,8 +203,8 @@ private:
 | 
			
		||||
        statement.bindAt(3, Limit{limit});
 | 
			
		||||
 | 
			
		||||
        auto const res = executor_.read(yield, statement);
 | 
			
		||||
        if (res && res.value().hasRows()) {
 | 
			
		||||
            for (auto const [nftID] : extract<ripple::uint256>(res.value()))
 | 
			
		||||
        if (res.has_value() && res->hasRows()) {
 | 
			
		||||
            for (auto const [nftID] : extract<ripple::uint256>(*res))
 | 
			
		||||
                nftIDs.push_back(nftID);
 | 
			
		||||
        }
 | 
			
		||||
        return nftIDs;
 | 
			
		||||
@@ -229,8 +229,8 @@ private:
 | 
			
		||||
        firstQuery.bindAt(3, Limit{limit});
 | 
			
		||||
 | 
			
		||||
        auto const firstRes = executor_.read(yield, firstQuery);
 | 
			
		||||
        if (firstRes) {
 | 
			
		||||
            for (auto const [nftID] : extract<ripple::uint256>(firstRes.value()))
 | 
			
		||||
        if (firstRes.has_value()) {
 | 
			
		||||
            for (auto const [nftID] : extract<ripple::uint256>(*firstRes))
 | 
			
		||||
                nftIDs.push_back(nftID);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@@ -241,8 +241,8 @@ private:
 | 
			
		||||
            secondQuery.bindAt(2, Limit{remainingLimit});
 | 
			
		||||
 | 
			
		||||
            auto const secondRes = executor_.read(yield, secondQuery);
 | 
			
		||||
            if (secondRes) {
 | 
			
		||||
                for (auto const [nftID] : extract<ripple::uint256>(secondRes.value()))
 | 
			
		||||
            if (secondRes.has_value()) {
 | 
			
		||||
                for (auto const [nftID] : extract<ripple::uint256>(*secondRes))
 | 
			
		||||
                    nftIDs.push_back(nftID);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@@ -291,10 +291,11 @@ private:
 | 
			
		||||
 | 
			
		||||
        // Combine the results into final NFT objects.
 | 
			
		||||
        for (auto i = 0u; i < nftIDs.size(); ++i) {
 | 
			
		||||
            if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
 | 
			
		||||
            if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>();
 | 
			
		||||
                maybeRow.has_value()) {
 | 
			
		||||
                auto [seq, owner, isBurned] = *maybeRow;
 | 
			
		||||
                NFT nft(nftIDs[i], seq, owner, isBurned);
 | 
			
		||||
                if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
 | 
			
		||||
                if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri.has_value())
 | 
			
		||||
                    nft.uri = *maybeUri;
 | 
			
		||||
                ret.nfts.push_back(nft);
 | 
			
		||||
            }
 | 
			
		||||
 
 | 
			
		||||
@@ -70,10 +70,10 @@ namespace data::cassandra {
 | 
			
		||||
 *
 | 
			
		||||
 * Note: This is a safer and more correct rewrite of the original implementation of the backend.
 | 
			
		||||
 *
 | 
			
		||||
 * @tparam SettingsProviderType The settings provider type to use
 | 
			
		||||
 * @tparam ExecutionStrategyType The execution strategy type to use
 | 
			
		||||
 * @tparam SchemaType The Schema type to use
 | 
			
		||||
 * @tparam FetchLedgerCacheType The ledger header cache type to use
 | 
			
		||||
 * @tparam SettingsProviderType The settings provider type
 | 
			
		||||
 * @tparam ExecutionStrategyType The execution strategy type
 | 
			
		||||
 * @tparam SchemaType The Schema type
 | 
			
		||||
 * @tparam FetchLedgerCacheType The ledger header cache type
 | 
			
		||||
 */
 | 
			
		||||
template <
 | 
			
		||||
    SomeSettingsProvider SettingsProviderType,
 | 
			
		||||
@@ -100,8 +100,8 @@ public:
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Create a new cassandra/scylla backend instance.
 | 
			
		||||
     *
 | 
			
		||||
     * @param settingsProvider The settings provider to use
 | 
			
		||||
     * @param cache The ledger cache to use
 | 
			
		||||
     * @param settingsProvider The settings provider
 | 
			
		||||
     * @param cache The ledger cache
 | 
			
		||||
     * @param readOnly Whether the database should be in readonly mode
 | 
			
		||||
     */
 | 
			
		||||
    CassandraBackendFamily(SettingsProviderType settingsProvider, data::LedgerCacheInterface& cache, bool readOnly)
 | 
			
		||||
@@ -111,18 +111,18 @@ public:
 | 
			
		||||
        , handle_{settingsProvider_.getSettings()}
 | 
			
		||||
        , executor_{settingsProvider_.getSettings(), handle_}
 | 
			
		||||
    {
 | 
			
		||||
        if (auto const res = handle_.connect(); not res)
 | 
			
		||||
        if (auto const res = handle_.connect(); not res.has_value())
 | 
			
		||||
            throw std::runtime_error("Could not connect to database: " + res.error());
 | 
			
		||||
 | 
			
		||||
        if (not readOnly) {
 | 
			
		||||
            if (auto const res = handle_.execute(schema_.createKeyspace); not res) {
 | 
			
		||||
            if (auto const res = handle_.execute(schema_.createKeyspace); not res.has_value()) {
 | 
			
		||||
                // on datastax, creation of keyspaces can be configured to only be done thru the admin
 | 
			
		||||
                // interface. this does not mean that the keyspace does not already exist tho.
 | 
			
		||||
                if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED)
 | 
			
		||||
                    throw std::runtime_error("Could not create keyspace: " + res.error());
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (auto const res = handle_.executeEach(schema_.createSchema); not res)
 | 
			
		||||
            if (auto const res = handle_.executeEach(schema_.createSchema); not res.has_value())
 | 
			
		||||
                throw std::runtime_error("Could not create schema: " + res.error());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@@ -233,10 +233,10 @@ public:
 | 
			
		||||
    std::optional<std::uint32_t>
 | 
			
		||||
    fetchLatestLedgerSequence(boost::asio::yield_context yield) const override
 | 
			
		||||
    {
 | 
			
		||||
        if (auto const res = executor_.read(yield, schema_->selectLatestLedger); res) {
 | 
			
		||||
            if (auto const& result = res.value(); result) {
 | 
			
		||||
                if (auto const maybeValue = result.template get<uint32_t>(); maybeValue)
 | 
			
		||||
                    return maybeValue;
 | 
			
		||||
        if (auto const res = executor_.read(yield, schema_->selectLatestLedger); res.has_value()) {
 | 
			
		||||
            if (auto const& rows = *res; rows) {
 | 
			
		||||
                if (auto const maybeRow = rows.template get<uint32_t>(); maybeRow.has_value())
 | 
			
		||||
                    return maybeRow;
 | 
			
		||||
 | 
			
		||||
                LOG(log_.error()) << "Could not fetch latest ledger - no rows";
 | 
			
		||||
                return std::nullopt;
 | 
			
		||||
 
 | 
			
		||||
@@ -97,7 +97,7 @@ SettingsProvider::parseSettings() const
 | 
			
		||||
    settings.coreConnectionsPerHost = config_.get<uint32_t>("core_connections_per_host");
 | 
			
		||||
    settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io");
 | 
			
		||||
    settings.writeBatchSize = config_.get<std::size_t>("write_batch_size");
 | 
			
		||||
    settings.provider = config_.get<std::string>("provider");
 | 
			
		||||
    settings.provider = impl::providerFromString(config_.get<std::string>("provider"));
 | 
			
		||||
 | 
			
		||||
    if (config_.getValueView("connect_timeout").hasValue()) {
 | 
			
		||||
        auto const connectTimeoutSecond = config_.get<uint32_t>("connect_timeout");
 | 
			
		||||
 
 | 
			
		||||
@@ -61,7 +61,7 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), k
 | 
			
		||||
    cass_cluster_set_request_timeout(*this, settings.requestTimeout.count());
 | 
			
		||||
 | 
			
		||||
    // TODO: AWS keyspace reads should be local_one to save cost
 | 
			
		||||
    if (settings.provider == toString(cassandra::impl::Provider::Keyspace)) {
 | 
			
		||||
    if (settings.provider == cassandra::impl::Provider::Keyspace) {
 | 
			
		||||
        if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); rc != CASS_OK) {
 | 
			
		||||
            throw std::runtime_error(fmt::format("Error setting keyspace consistency: {}", cass_error_desc(rc)));
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -20,6 +20,7 @@
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include "data/cassandra/impl/ManagedObject.hpp"
 | 
			
		||||
#include "util/Assert.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
 | 
			
		||||
#include <cassandra.h>
 | 
			
		||||
@@ -31,29 +32,22 @@
 | 
			
		||||
#include <string>
 | 
			
		||||
#include <string_view>
 | 
			
		||||
#include <thread>
 | 
			
		||||
#include <utility>
 | 
			
		||||
#include <variant>
 | 
			
		||||
 | 
			
		||||
namespace data::cassandra::impl {
 | 
			
		||||
 | 
			
		||||
namespace {
 | 
			
		||||
 | 
			
		||||
enum class Provider { Cassandra, Keyspace };
 | 
			
		||||
 | 
			
		||||
inline std::string
 | 
			
		||||
toString(Provider provider)
 | 
			
		||||
inline Provider
 | 
			
		||||
providerFromString(std::string const& provider)
 | 
			
		||||
{
 | 
			
		||||
    switch (provider) {
 | 
			
		||||
        case Provider::Cassandra:
 | 
			
		||||
            return "cassandra";
 | 
			
		||||
        case Provider::Keyspace:
 | 
			
		||||
            return "aws_keyspace";
 | 
			
		||||
    }
 | 
			
		||||
    std::unreachable();
 | 
			
		||||
    ASSERT(
 | 
			
		||||
        provider == "cassandra" || provider == "aws_keyspace",
 | 
			
		||||
        "Provider type must be one of 'cassandra' or 'aws_keyspace'"
 | 
			
		||||
    );
 | 
			
		||||
    return provider == "cassandra" ? Provider::Cassandra : Provider::Keyspace;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}  // namespace
 | 
			
		||||
 | 
			
		||||
// TODO: move Settings to public interface, not impl
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
@@ -109,7 +103,7 @@ struct Settings {
 | 
			
		||||
    std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE;
 | 
			
		||||
 | 
			
		||||
    /** @brief Provider to know if we are using scylladb or keyspace */
 | 
			
		||||
    std::string provider = toString(kDEFAULT_PROVIDER);
 | 
			
		||||
    Provider provider = kDEFAULT_PROVIDER;
 | 
			
		||||
 | 
			
		||||
    /** @brief Size of the IO queue */
 | 
			
		||||
    std::optional<uint32_t> queueSizeIO = std::nullopt;  // NOLINT(readability-redundant-member-init)
 | 
			
		||||
 
 | 
			
		||||
@@ -85,15 +85,17 @@ using namespace data::cassandra;
 | 
			
		||||
 | 
			
		||||
class BackendCassandraTestBase : public SyncAsioContextTest, public WithPrometheus {
 | 
			
		||||
protected:
 | 
			
		||||
    static constexpr auto kCASSANDRA = "cassandra";
 | 
			
		||||
 | 
			
		||||
    ClioConfigDefinition cfg_{
 | 
			
		||||
        {"database.type", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
 | 
			
		||||
        {"database.type", ConfigValue{ConfigType::String}.defaultValue(kCASSANDRA)},
 | 
			
		||||
        {"database.cassandra.contact_points",
 | 
			
		||||
         ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)},
 | 
			
		||||
        {"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
        {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
 | 
			
		||||
        {"database.cassandra.keyspace",
 | 
			
		||||
         ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
 | 
			
		||||
        {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
 | 
			
		||||
        {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue(kCASSANDRA)},
 | 
			
		||||
        {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
 | 
			
		||||
        {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
        {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},
 | 
			
		||||
 
 | 
			
		||||
@@ -95,14 +95,15 @@ class MigrationCassandraSimpleTest : public WithPrometheus {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
protected:
 | 
			
		||||
    ClioConfigDefinition cfg_{
 | 
			
		||||
    static constexpr auto kCASSANDRA = "cassandra";
 | 
			
		||||
 | 
			
		||||
        {{"database.type", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
 | 
			
		||||
    ClioConfigDefinition cfg_{
 | 
			
		||||
        {{"database.type", ConfigValue{ConfigType::String}.defaultValue(kCASSANDRA)},
 | 
			
		||||
         {"database.cassandra.contact_points",
 | 
			
		||||
          ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)},
 | 
			
		||||
         {"database.cassandra.keyspace",
 | 
			
		||||
          ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
 | 
			
		||||
         {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
 | 
			
		||||
         {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue(kCASSANDRA)},
 | 
			
		||||
         {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
 | 
			
		||||
         {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
 | 
			
		||||
         {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)},
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user