diff --git a/docs/config-description.md b/docs/config-description.md index 57ed3aeb1..e124cd3f0 100644 --- a/docs/config-description.md +++ b/docs/config-description.md @@ -89,6 +89,14 @@ This document provides a list of all available Clio configuration properties in - **Constraints**: The minimum value is `1`. The maximum value is `4294967295`. - **Description**: Represents the number of threads that will be used for database operations. +### database.cassandra.provider + +- **Required**: True +- **Type**: string +- **Default value**: `cassandra` +- **Constraints**: The value must be one of the following: `cassandra`, `aws_keyspace`. +- **Description**: The specific database backend provider we are using. Currently we only support cassandra, or aws_keyspace. + ### database.cassandra.core_connections_per_host - **Required**: True diff --git a/src/data/CassandraBackend.hpp b/src/data/CassandraBackend.hpp index 00e5be6e9..434516f3d 100644 --- a/src/data/CassandraBackend.hpp +++ b/src/data/CassandraBackend.hpp @@ -225,8 +225,11 @@ public: { waitForWritesToFinish(); + // !range means the table 'ledger_range' is not populated it; This would be first write to the table + // In this case, insert both min_sequence/max_sequence range into the table if (!range_) { - executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_); + executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_); + executor_.writeSync(schema_->insertLedgerRange, true, ledgerSequence_); } if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) { @@ -513,80 +516,16 @@ public: boost::asio::yield_context yield ) const override { - NFTsAndCursor ret; - - Statement const idQueryStatement = [&taxon, &issuer, &cursorIn, &limit, this]() { - if (taxon.has_value()) { - auto r = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); - r.bindAt(1, *taxon); - r.bindAt(2, cursorIn.value_or(ripple::uint256(0))); - r.bindAt(3, Limit{limit}); - return r; - } - - auto r = schema_->selectNFTIDsByIssuer.bind(issuer); - r.bindAt( - 1, - std::make_tuple( - cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0, - cursorIn.value_or(ripple::uint256(0)) - ) - ); - r.bindAt(2, Limit{limit}); - return r; - }(); - - // Query for all the NFTs issued by the account, potentially filtered by the taxon - auto const res = executor_.read(yield, idQueryStatement); - - auto const& idQueryResults = res.value(); - if (not idQueryResults.hasRows()) { - LOG(log_.debug()) << "No rows returned"; - return {}; - } - std::vector nftIDs; - for (auto const [nftID] : extract(idQueryResults)) - nftIDs.push_back(nftID); - - if (nftIDs.empty()) - return ret; - - if (nftIDs.size() == limit) - ret.cursor = nftIDs.back(); - - std::vector selectNFTStatements; - selectNFTStatements.reserve(nftIDs.size()); - - std::transform( - std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) { - return schema_->selectNFT.bind(nftID, ledgerSequence); - } - ); - - auto const nftInfos = executor_.readEach(yield, selectNFTStatements); - - std::vector selectNFTURIStatements; - selectNFTURIStatements.reserve(nftIDs.size()); - - std::transform( - std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) { - return schema_->selectNFTURI.bind(nftID, ledgerSequence); - } - ); - - auto const nftUris = executor_.readEach(yield, selectNFTURIStatements); - - for (auto i = 0u; i < nftIDs.size(); i++) { - if (auto const maybeRow = nftInfos[i].template get(); maybeRow) { - auto [seq, owner, isBurned] = *maybeRow; - NFT nft(nftIDs[i], seq, owner, isBurned); - if (auto const maybeUri = nftUris[i].template get(); maybeUri) - nft.uri = *maybeUri; - ret.nfts.push_back(nft); - } + // --- A specific taxon is requested --- + if (taxon.has_value()) { + nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield); + } else { + // --- No taxon is specified (general pagination) --- + nftIDs = fetchNFTIDsWithoutTaxon(issuer, limit, cursorIn, yield); } - return ret; + + return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield); } MPTHoldersAndCursor @@ -803,8 +742,9 @@ public: std::optional lastItem; while (liveAccounts.size() < number) { - Statement const statement = lastItem ? schema_->selectAccountFromToken.bind(*lastItem, Limit{pageSize}) - : schema_->selectAccountFromBeginning.bind(Limit{pageSize}); + Statement const statement = lastItem + ? schema_->selectAccountFromTokenScylla->bind(*lastItem, Limit{pageSize}) + : schema_->selectAccountFromBeginningScylla->bind(Limit{pageSize}); auto const res = executor_.read(yield, statement); if (res) { @@ -1116,6 +1056,139 @@ private: return true; } + + std::vector + fetchNFTIDsByTaxon( + ripple::AccountID const& issuer, + std::uint32_t const taxon, + std::uint32_t const limit, + std::optional const& cursorIn, + boost::asio::yield_context yield + ) const + { + std::vector nftIDs; + Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); + statement.bindAt(1, taxon); + statement.bindAt(2, cursorIn.value_or(ripple::uint256(0))); + statement.bindAt(3, Limit{limit}); + + auto const res = executor_.read(yield, statement); + if (res && res.value().hasRows()) { + for (auto const [nftID] : extract(res.value())) + nftIDs.push_back(nftID); + } + return nftIDs; + } + + std::vector + fetchNFTIDsWithoutTaxon( + ripple::AccountID const& issuer, + std::uint32_t const limit, + std::optional const& cursorIn, + boost::asio::yield_context yield + ) const + { + std::vector nftIDs; + if (settingsProvider_.getSettings().provider == "aws_keyspace") { + // --- Amazon Keyspaces Workflow --- + auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0; + auto const startTokenID = cursorIn.value_or(ripple::uint256(0)); + + Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); + firstQuery.bindAt(1, startTaxon); + firstQuery.bindAt(2, startTokenID); + firstQuery.bindAt(3, Limit{limit}); + + auto const firstRes = executor_.read(yield, firstQuery); + if (firstRes) { + for (auto const [nftID] : extract(firstRes.value())) + nftIDs.push_back(nftID); + } + + if (nftIDs.size() < limit) { + auto const remainingLimit = limit - nftIDs.size(); + Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces->bind(issuer); + secondQuery.bindAt(1, startTaxon); + secondQuery.bindAt(2, Limit{remainingLimit}); + + auto const secondRes = executor_.read(yield, secondQuery); + if (secondRes) { + for (auto const [nftID] : extract(secondRes.value())) + nftIDs.push_back(nftID); + } + } + } else if (settingsProvider_.getSettings().provider == "scylladb") { + auto r = schema_->selectNFTsByIssuerScylla->bind(issuer); + r.bindAt( + 1, + std::make_tuple( + cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0, + cursorIn.value_or(ripple::uint256(0)) + ) + ); + r.bindAt(2, Limit{limit}); + + auto const res = executor_.read(yield, r); + if (res && res.value().hasRows()) { + for (auto const [nftID] : extract(res.value())) + nftIDs.push_back(nftID); + } + } + return nftIDs; + } + + /** + * @brief Takes a list of NFT IDs, fetches their full data, and assembles the final result with a cursor. + */ + NFTsAndCursor + populateNFTsAndCreateCursor( + std::vector const& nftIDs, + std::uint32_t const ledgerSequence, + std::uint32_t const limit, + boost::asio::yield_context yield + ) const + { + if (nftIDs.empty()) { + LOG(log_.debug()) << "No rows returned"; + return {}; + } + + NFTsAndCursor ret; + if (nftIDs.size() == limit) + ret.cursor = nftIDs.back(); + + // Prepare and execute queries to fetch NFT info and URIs in parallel. + std::vector selectNFTStatements; + selectNFTStatements.reserve(nftIDs.size()); + std::transform( + std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) { + return schema_->selectNFT.bind(nftID, ledgerSequence); + } + ); + + std::vector selectNFTURIStatements; + selectNFTURIStatements.reserve(nftIDs.size()); + std::transform( + std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) { + return schema_->selectNFTURI.bind(nftID, ledgerSequence); + } + ); + + auto const nftInfos = executor_.readEach(yield, selectNFTStatements); + auto const nftUris = executor_.readEach(yield, selectNFTURIStatements); + + // Combine the results into final NFT objects. + for (auto i = 0u; i < nftIDs.size(); ++i) { + if (auto const maybeRow = nftInfos[i].template get(); maybeRow) { + auto [seq, owner, isBurned] = *maybeRow; + NFT nft(nftIDs[i], seq, owner, isBurned); + if (auto const maybeUri = nftUris[i].template get(); maybeUri) + nft.uri = *maybeUri; + ret.nfts.push_back(nft); + } + } + return ret; + } }; using CassandraBackend = BasicCassandraBackend>; diff --git a/src/data/cassandra/Schema.hpp b/src/data/cassandra/Schema.hpp index 0ead25db4..99a29f2f9 100644 --- a/src/data/cassandra/Schema.hpp +++ b/src/data/cassandra/Schema.hpp @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -347,6 +348,86 @@ public: Statements(SettingsProviderType const& settingsProvider, Handle const& handle) : settingsProvider_{settingsProvider}, handle_{std::cref(handle)} { + // initialize scylladb supported queries + if (settingsProvider_.get().getSettings().provider == "scylladb") { + selectAccountFromBeginningScylla = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT account + FROM {} + WHERE token(account) > 0 + PER PARTITION LIMIT 1 + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "account_tx") + ) + ); + }(); + + selectAccountFromTokenScylla = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT account + FROM {} + WHERE token(account) > token(?) + PER PARTITION LIMIT 1 + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "account_tx") + ) + ); + }(); + + selectNFTsByIssuerScylla = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT token_id + FROM {} + WHERE issuer = ? + AND (taxon, token_id) > ? + ORDER BY taxon ASC, token_id ASC + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2") + ) + ); + }(); + + updateLedgerRange = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + UPDATE {} + SET sequence = ? + WHERE is_latest = ? + IF sequence IN (?, null) + )", + qualifiedTableName(settingsProvider_.get(), "ledger_range") + ) + ); + }(); + + // AWS_keyspace supported queries + } else if (settingsProvider_.get().getSettings().provider == "aws_keyspace") { + selectNFTsAfterTaxonKeyspaces = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + SELECT token_id + FROM {} + WHERE issuer = ? + AND taxon > ? + ORDER BY taxon ASC, token_id ASC + LIMIT ? + )", + qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2") + ) + ); + }(); + } } // @@ -526,6 +607,17 @@ public: // Update (and "delete") queries // + PreparedStatement insertLedgerRange = [this]() { + return handle_.get().prepare( + fmt::format( + R"( + INSERT INTO {} (is_latest, sequence) VALUES (?, ?) IF NOT EXISTS + )", + qualifiedTableName(settingsProvider_.get(), "ledger_range") + ) + ); + }(); + PreparedStatement updateLedgerRange = [this]() { return handle_.get().prepare( fmt::format( @@ -533,7 +625,7 @@ public: UPDATE {} SET sequence = ? WHERE is_latest = ? - IF sequence IN (?, null) + IF sequence = ? )", qualifiedTableName(settingsProvider_.get(), "ledger_range") ) @@ -654,6 +746,10 @@ public: ); }(); + /* + Currently, these two SELECT statements is not used. + If we ever use them, will need to change the PER PARTITION LIMIT to support for Keyspace + PreparedStatement selectLedgerPageKeys = [this]() { return handle_.get().prepare( fmt::format( @@ -687,6 +783,7 @@ public: ) ); }(); + */ PreparedStatement getToken = [this]() { return handle_.get().prepare( @@ -717,36 +814,6 @@ public: ); }(); - PreparedStatement selectAccountFromBeginning = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT account - FROM {} - WHERE token(account) > 0 - PER PARTITION LIMIT 1 - LIMIT ? - )", - qualifiedTableName(settingsProvider_.get(), "account_tx") - ) - ); - }(); - - PreparedStatement selectAccountFromToken = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT account - FROM {} - WHERE token(account) > token(?) - PER PARTITION LIMIT 1 - LIMIT ? - )", - qualifiedTableName(settingsProvider_.get(), "account_tx") - ) - ); - }(); - PreparedStatement selectAccountTxForward = [this]() { return handle_.get().prepare( fmt::format( @@ -827,22 +894,6 @@ public: ); }(); - PreparedStatement selectNFTIDsByIssuer = [this]() { - return handle_.get().prepare( - fmt::format( - R"( - SELECT token_id - FROM {} - WHERE issuer = ? - AND (taxon, token_id) > ? - ORDER BY taxon ASC, token_id ASC - LIMIT ? - )", - qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2") - ) - ); - }(); - PreparedStatement selectNFTIDsByIssuerTaxon = [this]() { return handle_.get().prepare( fmt::format( @@ -953,6 +1004,15 @@ public: ) ); }(); + + // For ScyllaDB / Cassandra ONLY + std::optional selectAccountFromBeginningScylla; + std::optional selectAccountFromTokenScylla; + std::optional selectNFTsByIssuerScylla; + + // For AWS Keyspaces ONLY + // NOTE: AWS keyspace is not able to load cache with accounts + std::optional selectNFTsAfterTaxonKeyspaces; }; /** diff --git a/src/data/cassandra/SettingsProvider.cpp b/src/data/cassandra/SettingsProvider.cpp index 0eda7215a..889633e4a 100644 --- a/src/data/cassandra/SettingsProvider.cpp +++ b/src/data/cassandra/SettingsProvider.cpp @@ -97,6 +97,7 @@ SettingsProvider::parseSettings() const settings.coreConnectionsPerHost = config_.get("core_connections_per_host"); settings.queueSizeIO = config_.maybeValue("queue_size_io"); settings.writeBatchSize = config_.get("write_batch_size"); + settings.provider = config_.get("provider"); if (config_.getValueView("connect_timeout").hasValue()) { auto const connectTimeoutSecond = config_.get("connect_timeout"); diff --git a/src/data/cassandra/impl/Batch.cpp b/src/data/cassandra/impl/Batch.cpp index b26e889cc..48841a278 100644 --- a/src/data/cassandra/impl/Batch.cpp +++ b/src/data/cassandra/impl/Batch.cpp @@ -36,9 +36,18 @@ constexpr auto kBATCH_DELETER = [](CassBatch* ptr) { cass_batch_free(ptr); }; namespace data::cassandra::impl { -// TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases +/* + * There are 2 main batches of Cassandra Statements: + * LOGGED: Ensures all updates in the batch succeed together, or none do. + * Use this for critical, related changes (e.g., for the same user), but it is slower. + * + * UNLOGGED: For performance. Sends many separate updates in one network trip to be fast. + * Use this for bulk-loading unrelated data, but know there's NO all-or-nothing guarantee. + * + * More info here: https://docs.datastax.com/en/developer/cpp-driver-dse/1.10/features/basics/batches/index.html + */ Batch::Batch(std::vector const& statements) - : ManagedObject{cass_batch_new(CASS_BATCH_TYPE_LOGGED), kBATCH_DELETER} + : ManagedObject{cass_batch_new(CASS_BATCH_TYPE_UNLOGGED), kBATCH_DELETER} { cass_batch_set_is_idempotent(*this, cass_true); diff --git a/src/data/cassandra/impl/Cluster.cpp b/src/data/cassandra/impl/Cluster.cpp index 05afc1452..cbdd22abd 100644 --- a/src/data/cassandra/impl/Cluster.cpp +++ b/src/data/cassandra/impl/Cluster.cpp @@ -60,6 +60,13 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), k cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count()); cass_cluster_set_request_timeout(*this, settings.requestTimeout.count()); + // TODO: AWS keyspace reads should be local_one to save cost + if (settings.provider == "aws_keyspace") { + if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); rc != CASS_OK) { + throw std::runtime_error(fmt::format("Error setting cassandra consistency: {}", cass_error_desc(rc))); + } + } + if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost); rc != CASS_OK) { throw std::runtime_error(fmt::format("Could not set core connections per host: {}", cass_error_desc(rc))); diff --git a/src/data/cassandra/impl/Cluster.hpp b/src/data/cassandra/impl/Cluster.hpp index 61bdbfd58..0aa440e69 100644 --- a/src/data/cassandra/impl/Cluster.hpp +++ b/src/data/cassandra/impl/Cluster.hpp @@ -45,6 +45,7 @@ struct Settings { static constexpr uint32_t kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000; static constexpr uint32_t kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000; static constexpr std::size_t kDEFAULT_BATCH_SIZE = 20; + static constexpr std::string kDEFAULT_PROVIDER = "scylladb"; /** * @brief Represents the configuration of contact points for cassandra. @@ -88,6 +89,9 @@ struct Settings { /** @brief Size of batches when writing */ std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE; + /** @brief Provider to know if we are using scylladb or keyspace */ + std::string provider = kDEFAULT_PROVIDER; + /** @brief Size of the IO queue */ std::optional queueSizeIO = std::nullopt; // NOLINT(readability-redundant-member-init) diff --git a/src/data/cassandra/impl/Statement.hpp b/src/data/cassandra/impl/Statement.hpp index d184f6430..ccc693029 100644 --- a/src/data/cassandra/impl/Statement.hpp +++ b/src/data/cassandra/impl/Statement.hpp @@ -58,14 +58,14 @@ public: explicit Statement(std::string_view query, Args&&... args) : ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), kDELETER} { - cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM); + cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); cass_statement_set_is_idempotent(*this, cass_true); bind(std::forward(args)...); } /* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, kDELETER} { - cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM); + cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); cass_statement_set_is_idempotent(*this, cass_true); } diff --git a/src/util/config/ConfigConstraints.hpp b/src/util/config/ConfigConstraints.hpp index 4f64d0b67..3ee89ee98 100644 --- a/src/util/config/ConfigConstraints.hpp +++ b/src/util/config/ConfigConstraints.hpp @@ -84,6 +84,11 @@ static constexpr std::array kDATABASE_TYPE = {"cassandra"}; */ static constexpr std::array kPROCESSING_POLICY = {"parallel", "sequent"}; +/** + * @brief specific values that are accepted for database provider in config. + */ +static constexpr std::array kPROVIDER = {"cassandra", "aws_keyspace"}; + /** * @brief An interface to enforce constraints on certain values within ClioConfigDefinition. */ @@ -470,6 +475,7 @@ static constinit OneOf gValidateCassandraName{"database.type", kDATABASE_TYPE}; static constinit OneOf gValidateLoadMode{"cache.load", kLOAD_CACHE_MODE}; static constinit OneOf gValidateLogTag{"log.tag_style", kLOG_TAGS}; static constinit OneOf gValidateProcessingPolicy{"server.processing_policy", kPROCESSING_POLICY}; +static constinit OneOf gValidateProvider{"database.cassandra.provider", kPROVIDER}; static constinit PositiveDouble gValidatePositiveDouble{}; diff --git a/src/util/config/ConfigDefinition.cpp b/src/util/config/ConfigDefinition.cpp index 2ef754d3a..652f37157 100644 --- a/src/util/config/ConfigDefinition.cpp +++ b/src/util/config/ConfigDefinition.cpp @@ -285,6 +285,8 @@ getClioConfig() {"database.cassandra.username", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.password", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()}, + {"database.cassandra.provider", + ConfigValue{ConfigType::String}.defaultValue("cassandra").withConstraint(gValidateProvider)}, {"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, {"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, diff --git a/src/util/config/ConfigDescription.hpp b/src/util/config/ConfigDescription.hpp index c7975bee1..e3064dca6 100644 --- a/src/util/config/ConfigDescription.hpp +++ b/src/util/config/ConfigDescription.hpp @@ -173,6 +173,9 @@ This document provides a list of all available Clio configuration properties in "Maximum number of outstanding read requests. Read requests are API calls that read from the database."}, KV{.key = "database.cassandra.threads", .value = "Represents the number of threads that will be used for database operations."}, + KV{.key = "database.cassandra.provider", + .value = "The specific database backend provider we are using. Currently we only support cassandra, or " + "aws_keyspace."}, KV{.key = "database.cassandra.core_connections_per_host", .value = "The number of core connections per host for the Cassandra database."}, KV{.key = "database.cassandra.queue_size_io", diff --git a/tests/integration/data/BackendFactoryTests.cpp b/tests/integration/data/BackendFactoryTests.cpp index ab97cd6d5..46acd60ba 100644 --- a/tests/integration/data/BackendFactoryTests.cpp +++ b/tests/integration/data/BackendFactoryTests.cpp @@ -53,6 +53,7 @@ protected: {"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(kKEYSPACE)}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)}, diff --git a/tests/integration/data/cassandra/BackendTests.cpp b/tests/integration/data/cassandra/BackendTests.cpp index cd79da215..3e5a6ac63 100644 --- a/tests/integration/data/cassandra/BackendTests.cpp +++ b/tests/integration/data/cassandra/BackendTests.cpp @@ -93,6 +93,7 @@ protected: {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)}, diff --git a/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp b/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp index 393b4249c..834d775ad 100644 --- a/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp +++ b/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp @@ -103,6 +103,7 @@ protected: ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)}, {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)}, diff --git a/tests/unit/data/cassandra/SettingsProviderTests.cpp b/tests/unit/data/cassandra/SettingsProviderTests.cpp index d992f163f..9b516ae5c 100644 --- a/tests/unit/data/cassandra/SettingsProviderTests.cpp +++ b/tests/unit/data/cassandra/SettingsProviderTests.cpp @@ -59,6 +59,7 @@ getParseSettingsConfig(boost::json::value val) {"database.cassandra.certificate", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.username", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.password", ConfigValue{ConfigType::String}.optional()}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")}, {"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.write_batch_size", ConfigValue{ConfigType::Integer}.defaultValue(20)}, {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.optional()},