From bbd2884e3b593211af7ced7fccd555760be7a196 Mon Sep 17 00:00:00 2001 From: Peter Chen Date: Mon, 18 Aug 2025 15:38:56 -0400 Subject: [PATCH] feat: Support Keyspace --- docs/config-description.md | 8 + src/data/CassandraBackend.hpp | 101 +++++++++---- src/data/cassandra/Schema.hpp | 139 ++++++++++++------ src/data/cassandra/SettingsProvider.cpp | 1 + src/data/cassandra/impl/Batch.cpp | 2 +- src/data/cassandra/impl/Cluster.cpp | 5 + src/data/cassandra/impl/Cluster.hpp | 4 + src/data/cassandra/impl/Statement.hpp | 4 +- src/util/config/ConfigConstraints.hpp | 6 + src/util/config/ConfigDefinition.cpp | 2 + src/util/config/ConfigDescription.hpp | 3 + .../data/cassandra/SettingsProviderTests.cpp | 1 + 12 files changed, 194 insertions(+), 82 deletions(-) diff --git a/docs/config-description.md b/docs/config-description.md index a071aa3f..34fb6acf 100644 --- a/docs/config-description.md +++ b/docs/config-description.md @@ -87,6 +87,14 @@ This document provides a list of all available Clio configuration properties in - **Constraints**: The minimum value is `1`. The maximum value is `4294967295`. - **Description**: Represents the number of threads that will be used for database operations. +### database.cassandra.provider + +- **Required**: True +- **Type**: string +- **Default value**: `scylladb` +- **Constraints**: The value must be one of the following: `scylladb`, `aws_keyspace`. +- **Description**: The specific database backend provider we are using. Currently we only support scylladb, or aws_keyspace. + ### database.cassandra.core_connections_per_host - **Required**: True diff --git a/src/data/CassandraBackend.hpp b/src/data/CassandraBackend.hpp index 00e5be6e..84bbf3ad 100644 --- a/src/data/CassandraBackend.hpp +++ b/src/data/CassandraBackend.hpp @@ -225,8 +225,9 @@ public: { waitForWritesToFinish(); + // only run if require to write in first ledger if (!range_) { - executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_); + executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_); } if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) { @@ -514,44 +515,79 @@ public: ) const override { NFTsAndCursor ret; + std::vector nftIDs; - Statement const idQueryStatement = [&taxon, &issuer, &cursorIn, &limit, this]() { - if (taxon.has_value()) { - auto r = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); - r.bindAt(1, *taxon); - r.bindAt(2, cursorIn.value_or(ripple::uint256(0))); - r.bindAt(3, Limit{limit}); - return r; + // --- A specific taxon is requested --- + if (taxon.has_value()) { + Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); + statement.bindAt(1, *taxon); + statement.bindAt(2, cursorIn.value_or(ripple::uint256(0))); + statement.bindAt(3, Limit{limit}); + + auto const res = executor_.read(yield, statement); + if (res && res.value().hasRows()) { + for (auto const [nftID] : extract(res.value())) + nftIDs.push_back(nftID); } + } + // --- No taxon is specified (general pagination) --- + else { + if (settingsProvider_.getSettings().provider == "aws_keyspace") { + // --- Amazon Keyspaces Workflow --- + auto const startTaxon = + cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0; + auto const startTokenID = cursorIn.value_or(ripple::uint256(0)); - auto r = schema_->selectNFTIDsByIssuer.bind(issuer); - r.bindAt( - 1, - std::make_tuple( - cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0, - cursorIn.value_or(ripple::uint256(0)) - ) - ); - r.bindAt(2, Limit{limit}); - return r; - }(); + // Execute the first query to try and fill the page from the current taxon. + Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); + firstQuery.bindAt(1, startTaxon); + firstQuery.bindAt(2, startTokenID); + firstQuery.bindAt(3, Limit{limit}); - // Query for all the NFTs issued by the account, potentially filtered by the taxon - auto const res = executor_.read(yield, idQueryStatement); + auto const firstRes = executor_.read(yield, firstQuery); + if (firstRes) { + for (auto const [nftID] : extract(firstRes.value())) + nftIDs.push_back(nftID); + } - auto const& idQueryResults = res.value(); - if (not idQueryResults.hasRows()) { + // If the page is not full, execute the second query to get the rest. + if (nftIDs.size() < limit) { + auto const remainingLimit = limit - nftIDs.size(); + Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces->bind(issuer); + secondQuery.bindAt(1, startTaxon); + secondQuery.bindAt(2, Limit{remainingLimit}); + + auto const secondRes = executor_.read(yield, secondQuery); + if (secondRes) { + for (auto const [nftID] : extract(secondRes.value())) + nftIDs.push_back(nftID); + } + } + + } else if (settingsProvider_.getSettings().provider == "scylladb") { + auto r = schema_->selectNFTsByIssuerScylla->bind(issuer); // Note: using optional + r.bindAt( + 1, + std::make_tuple( + cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0, + cursorIn.value_or(ripple::uint256(0)) + ) + ); + r.bindAt(2, Limit{limit}); + + auto const res = executor_.read(yield, r); + if (res && res.value().hasRows()) { + for (auto const [nftID] : extract(res.value())) + nftIDs.push_back(nftID); + } + } + } + + if (nftIDs.empty()) { LOG(log_.debug()) << "No rows returned"; return {}; } - std::vector nftIDs; - for (auto const [nftID] : extract(idQueryResults)) - nftIDs.push_back(nftID); - - if (nftIDs.empty()) - return ret; - if (nftIDs.size() == limit) ret.cursor = nftIDs.back(); @@ -803,8 +839,9 @@ public: std::optional lastItem; while (liveAccounts.size() < number) { - Statement const statement = lastItem ? schema_->selectAccountFromToken.bind(*lastItem, Limit{pageSize}) - : schema_->selectAccountFromBeginning.bind(Limit{pageSize}); + Statement const statement = lastItem + ? schema_->selectAccountFromTokenScylla->bind(*lastItem, Limit{pageSize}) + : schema_->selectAccountFromBeginningScylla->bind(Limit{pageSize}); auto const res = executor_.read(yield, statement); if (res) { diff --git a/src/data/cassandra/Schema.hpp b/src/data/cassandra/Schema.hpp index 0ead25db..4ae0b4be 100644 --- a/src/data/cassandra/Schema.hpp +++ b/src/data/cassandra/Schema.hpp @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -347,6 +348,71 @@ public: Statements(SettingsProviderType const& settingsProvider, Handle const& handle) : settingsProvider_{settingsProvider}, handle_{std::cref(handle)} { + // initialize scylladb supported queries + if (settingsProvider_.get().getSettings().provider == "scylladb") { + selectAccountFromBeginningScylla = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT account + FROM {} + WHERE token(account) > 0 + PER PARTITION LIMIT 1 + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "account_tx") + ) + ); + }(); + + selectAccountFromTokenScylla = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT account + FROM {} + WHERE token(account) > token(?) + PER PARTITION LIMIT 1 + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "account_tx") + ) + ); + }(); + + selectNFTsByIssuerScylla = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT token_id + FROM {} + WHERE issuer = ? + AND (taxon, token_id) > ? + ORDER BY taxon ASC, token_id ASC + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2") + ) + ); + }(); + // AWS_keyspace supported queries + } else if (settingsProvider_.get().getSettings().provider == "aws_keyspace") { + selectNFTsAfterTaxonKeyspaces = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT token_id + FROM {} + WHERE issuer = ? + AND taxon > ? + ORDER BY taxon ASC, token_id ASC + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2") + ) + ); + }(); + } } // @@ -526,6 +592,17 @@ public: // Update (and "delete") queries // + PreparedStatement insertLedgerRange = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + INSERT INTO {} (is_latest, sequence) VALUES (?, ?) IF NOT EXISTS + )", + qualifiedTableName(settingsProvider_.get(), "ledger_range") + ) + ); + }(); + PreparedStatement updateLedgerRange = [this]() { return handle_.get().prepare( fmt::format( @@ -533,7 +610,7 @@ public: UPDATE {} SET sequence = ? WHERE is_latest = ? - IF sequence IN (?, null) + IF sequence = ? )", qualifiedTableName(settingsProvider_.get(), "ledger_range") ) @@ -654,6 +731,10 @@ public: ); }(); + /* + Currently, these two SELECT statements is not used. + If we ever use them, will need to change the PER PARTITION LIMIT to support for Keyspace + PreparedStatement selectLedgerPageKeys = [this]() { return handle_.get().prepare( fmt::format( @@ -687,6 +768,7 @@ public: ) ); }(); + */ PreparedStatement getToken = [this]() { return handle_.get().prepare( @@ -717,36 +799,6 @@ public: ); }(); - PreparedStatement selectAccountFromBeginning = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT account - FROM {} - WHERE token(account) > 0 - PER PARTITION LIMIT 1 - LIMIT ? - )", - qualifiedTableName(settingsProvider_.get(), "account_tx") - ) - ); - }(); - - PreparedStatement selectAccountFromToken = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT account - FROM {} - WHERE token(account) > token(?) - PER PARTITION LIMIT 1 - LIMIT ? - )", - qualifiedTableName(settingsProvider_.get(), "account_tx") - ) - ); - }(); - PreparedStatement selectAccountTxForward = [this]() { return handle_.get().prepare( fmt::format( @@ -827,22 +879,6 @@ public: ); }(); - PreparedStatement selectNFTIDsByIssuer = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT token_id - FROM {} - WHERE issuer = ? - AND (taxon, token_id) > ? - ORDER BY taxon ASC, token_id ASC - LIMIT ? - )", - qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2") - ) - ); - }(); - PreparedStatement selectNFTIDsByIssuerTaxon = [this]() { return handle_.get().prepare( fmt::format( @@ -953,6 +989,15 @@ public: ) ); }(); + + // For ScyllaDB / Cassandra ONLY + std::optional selectAccountFromBeginningScylla; + std::optional selectAccountFromTokenScylla; + std::optional selectNFTsByIssuerScylla; + + // For AWS Keyspaces ONLY + // NOTE: AWS keyspace is not able to load cache with accounts + std::optional selectNFTsAfterTaxonKeyspaces; }; /** diff --git a/src/data/cassandra/SettingsProvider.cpp b/src/data/cassandra/SettingsProvider.cpp index 0eda7215..889633e4 100644 --- a/src/data/cassandra/SettingsProvider.cpp +++ b/src/data/cassandra/SettingsProvider.cpp @@ -97,6 +97,7 @@ SettingsProvider::parseSettings() const settings.coreConnectionsPerHost = config_.get("core_connections_per_host"); settings.queueSizeIO = config_.maybeValue("queue_size_io"); settings.writeBatchSize = config_.get("write_batch_size"); + settings.provider = config_.get("provider"); if (config_.getValueView("connect_timeout").hasValue()) { auto const connectTimeoutSecond = config_.get("connect_timeout"); diff --git a/src/data/cassandra/impl/Batch.cpp b/src/data/cassandra/impl/Batch.cpp index b26e889c..71d9d2da 100644 --- a/src/data/cassandra/impl/Batch.cpp +++ b/src/data/cassandra/impl/Batch.cpp @@ -38,7 +38,7 @@ namespace data::cassandra::impl { // TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases Batch::Batch(std::vector const& statements) - : ManagedObject{cass_batch_new(CASS_BATCH_TYPE_LOGGED), kBATCH_DELETER} + : ManagedObject{cass_batch_new(CASS_BATCH_TYPE_UNLOGGED), kBATCH_DELETER} { cass_batch_set_is_idempotent(*this, cass_true); diff --git a/src/data/cassandra/impl/Cluster.cpp b/src/data/cassandra/impl/Cluster.cpp index 05afc145..d3faeab0 100644 --- a/src/data/cassandra/impl/Cluster.cpp +++ b/src/data/cassandra/impl/Cluster.cpp @@ -60,6 +60,11 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), k cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count()); cass_cluster_set_request_timeout(*this, settings.requestTimeout.count()); + // TODO: AWS keyspace reads should be local_one to save cost + if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); rc != CASS_OK) { + throw std::runtime_error(fmt::format("Error setting cassandra consistency: {}", cass_error_desc(rc))); + } + if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost); rc != CASS_OK) { throw std::runtime_error(fmt::format("Could not set core connections per host: {}", cass_error_desc(rc))); diff --git a/src/data/cassandra/impl/Cluster.hpp b/src/data/cassandra/impl/Cluster.hpp index 61bdbfd5..0aa440e6 100644 --- a/src/data/cassandra/impl/Cluster.hpp +++ b/src/data/cassandra/impl/Cluster.hpp @@ -45,6 +45,7 @@ struct Settings { static constexpr uint32_t kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000; static constexpr uint32_t kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000; static constexpr std::size_t kDEFAULT_BATCH_SIZE = 20; + static constexpr std::string kDEFAULT_PROVIDER = "scylladb"; /** * @brief Represents the configuration of contact points for cassandra. @@ -88,6 +89,9 @@ struct Settings { /** @brief Size of batches when writing */ std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE; + /** @brief Provider to know if we are using scylladb or keyspace */ + std::string provider = kDEFAULT_PROVIDER; + /** @brief Size of the IO queue */ std::optional queueSizeIO = std::nullopt; // NOLINT(readability-redundant-member-init) diff --git a/src/data/cassandra/impl/Statement.hpp b/src/data/cassandra/impl/Statement.hpp index d184f643..ccc69302 100644 --- a/src/data/cassandra/impl/Statement.hpp +++ b/src/data/cassandra/impl/Statement.hpp @@ -58,14 +58,14 @@ public: explicit Statement(std::string_view query, Args&&... args) : ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), kDELETER} { - cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM); + cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); cass_statement_set_is_idempotent(*this, cass_true); bind(std::forward(args)...); } /* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, kDELETER} { - cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM); + cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); cass_statement_set_is_idempotent(*this, cass_true); } diff --git a/src/util/config/ConfigConstraints.hpp b/src/util/config/ConfigConstraints.hpp index fe1e7fe6..2fe37b16 100644 --- a/src/util/config/ConfigConstraints.hpp +++ b/src/util/config/ConfigConstraints.hpp @@ -84,6 +84,11 @@ static constexpr std::array kDATABASE_TYPE = {"cassandra"}; */ static constexpr std::array kPROCESSING_POLICY = {"parallel", "sequent"}; +/** + * @brief specific values that are accepted for database provider in config. + */ +static constexpr std::array kPROVIDER = {"scylladb", "aws_keyspace"}; + /** * @brief An interface to enforce constraints on certain values within ClioConfigDefinition. */ @@ -470,6 +475,7 @@ static constinit OneOf gValidateCassandraName{"database.type", kDATABASE_TYPE}; static constinit OneOf gValidateLoadMode{"cache.load", kLOAD_CACHE_MODE}; static constinit OneOf gValidateLogTag{"log_tag_style", kLOG_TAGS}; static constinit OneOf gValidateProcessingPolicy{"server.processing_policy", kPROCESSING_POLICY}; +static constinit OneOf gValidateProvider{"database.cassandra.provider", kPROVIDER}; static constinit PositiveDouble gValidatePositiveDouble{}; diff --git a/src/util/config/ConfigDefinition.cpp b/src/util/config/ConfigDefinition.cpp index c134647e..59468713 100644 --- a/src/util/config/ConfigDefinition.cpp +++ b/src/util/config/ConfigDefinition.cpp @@ -279,6 +279,8 @@ getClioConfig() {"database.cassandra.username", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.password", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()}, + {"database.cassandra.provider", + ConfigValue{ConfigType::String}.defaultValue("scylladb").withConstraint(gValidateProvider)}, {"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, {"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, diff --git a/src/util/config/ConfigDescription.hpp b/src/util/config/ConfigDescription.hpp index 6ee16856..b6058e30 100644 --- a/src/util/config/ConfigDescription.hpp +++ b/src/util/config/ConfigDescription.hpp @@ -166,6 +166,9 @@ private: "Maximum number of outstanding read requests. Read requests are API calls that read from the database."}, KV{.key = "database.cassandra.threads", .value = "Represents the number of threads that will be used for database operations."}, + KV{.key = "database.cassandra.provider", + .value = "The specific database backend provider we are using. Currently we only support scylladb, or " + "aws_keyspace."}, KV{.key = "database.cassandra.core_connections_per_host", .value = "The number of core connections per host for the Cassandra database."}, KV{.key = "database.cassandra.queue_size_io", diff --git a/tests/unit/data/cassandra/SettingsProviderTests.cpp b/tests/unit/data/cassandra/SettingsProviderTests.cpp index 54c7570b..04f142b3 100644 --- a/tests/unit/data/cassandra/SettingsProviderTests.cpp +++ b/tests/unit/data/cassandra/SettingsProviderTests.cpp @@ -59,6 +59,7 @@ getParseSettingsConfig(boost::json::value val) {"database.cassandra.certificate", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.username", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.password", ConfigValue{ConfigType::String}.optional()}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("scylladb")}, {"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.write_batch_size", ConfigValue{ConfigType::Integer}.defaultValue(20)}, {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.optional()},