Compare commits

...

16 Commits

Author SHA1 Message Date
Peter Chen
7eaf0005e4 Merge tag 'experiment' into experimental_keyspace
test keyspace
2025-09-16 09:08:58 -07:00
Peter Chen
497721ee7c change connections per host 2025-09-15 10:06:12 -07:00
Peter Chen
26530108e3 fix comments 2025-09-15 08:55:14 -07:00
Alex Kremer
fc88abdaeb Merge branch 'develop' into SupportKeyspace 2025-09-15 14:59:54 +01:00
Ayaz Salikhov
8bc36c2c0b Merge branch 'develop' into SupportKeyspace 2025-09-04 18:23:37 +01:00
Peter Chen
4e9558f76b 'Merge branch 'SupportKeyspace' into experimental_keyspace 2025-09-04 08:44:50 -07:00
Ayaz Salikhov
84db880ce7 Merge branch 'develop' into SupportKeyspace 2025-08-28 15:11:58 +01:00
Peter Chen
f88ce31363 let scylladb use same schema to update ledger range as before 2025-08-27 14:34:34 -07:00
Peter Chen
e03f5e46c0 Merge branch 'develop' into SupportKeyspace 2025-08-24 18:58:38 -04:00
Peter Chen
30da8d8f63 fix val 2025-08-24 18:58:07 -04:00
Peter Chen
8f6bec2e25 merge develop 2025-08-21 12:37:19 -04:00
Peter Chen
0d9a83fd4d fix comments 2025-08-20 12:29:01 -04:00
Peter Chen
47c2af0421 Merge branch 'develop' into SupportKeyspace 2025-08-19 16:37:08 -04:00
Peter Chen
c3e04426d3 fix bug and comments 2025-08-19 16:19:44 -04:00
Peter Chen
d598396445 remove unnecessary comment 2025-08-18 16:33:12 -04:00
Peter Chen
bbd2884e3b feat: Support Keyspace 2025-08-18 15:38:56 -04:00
15 changed files with 302 additions and 128 deletions

View File

@@ -89,6 +89,14 @@ This document provides a list of all available Clio configuration properties in
- **Constraints**: The minimum value is `1`. The maximum value is `4294967295`. - **Constraints**: The minimum value is `1`. The maximum value is `4294967295`.
- **Description**: Represents the number of threads that will be used for database operations. - **Description**: Represents the number of threads that will be used for database operations.
### database.cassandra.provider
- **Required**: True
- **Type**: string
- **Default value**: `cassandra`
- **Constraints**: The value must be one of the following: `cassandra`, `aws_keyspace`.
- **Description**: The specific database backend provider we are using.
### database.cassandra.core_connections_per_host ### database.cassandra.core_connections_per_host
- **Required**: True - **Required**: True

View File

@@ -225,8 +225,11 @@ public:
{ {
waitForWritesToFinish(); waitForWritesToFinish();
if (!range_) { // !range_.has_value() means the table 'ledger_range' is not populated; This would be the first write to the
executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_); // table In this case, insert both min_sequence/max_sequence range into the table
if (!range_.has_value()) {
executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_);
executor_.writeSync(schema_->insertLedgerRange, true, ledgerSequence_);
} }
if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) { if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
@@ -513,80 +516,14 @@ public:
boost::asio::yield_context yield boost::asio::yield_context yield
) const override ) const override
{ {
NFTsAndCursor ret;
Statement const idQueryStatement = [&taxon, &issuer, &cursorIn, &limit, this]() {
if (taxon.has_value()) {
auto r = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
r.bindAt(1, *taxon);
r.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
r.bindAt(3, Limit{limit});
return r;
}
auto r = schema_->selectNFTIDsByIssuer.bind(issuer);
r.bindAt(
1,
std::make_tuple(
cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
cursorIn.value_or(ripple::uint256(0))
)
);
r.bindAt(2, Limit{limit});
return r;
}();
// Query for all the NFTs issued by the account, potentially filtered by the taxon
auto const res = executor_.read(yield, idQueryStatement);
auto const& idQueryResults = res.value();
if (not idQueryResults.hasRows()) {
LOG(log_.debug()) << "No rows returned";
return {};
}
std::vector<ripple::uint256> nftIDs; std::vector<ripple::uint256> nftIDs;
for (auto const [nftID] : extract<ripple::uint256>(idQueryResults)) if (taxon.has_value()) {
nftIDs.push_back(nftID); nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield);
} else {
if (nftIDs.empty()) nftIDs = fetchNFTIDsWithoutTaxon(issuer, limit, cursorIn, yield);
return ret;
if (nftIDs.size() == limit)
ret.cursor = nftIDs.back();
std::vector<Statement> selectNFTStatements;
selectNFTStatements.reserve(nftIDs.size());
std::transform(
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
return schema_->selectNFT.bind(nftID, ledgerSequence);
} }
);
auto const nftInfos = executor_.readEach(yield, selectNFTStatements); return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield);
std::vector<Statement> selectNFTURIStatements;
selectNFTURIStatements.reserve(nftIDs.size());
std::transform(
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
return schema_->selectNFTURI.bind(nftID, ledgerSequence);
}
);
auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
for (auto i = 0u; i < nftIDs.size(); i++) {
if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
auto [seq, owner, isBurned] = *maybeRow;
NFT nft(nftIDs[i], seq, owner, isBurned);
if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
nft.uri = *maybeUri;
ret.nfts.push_back(nft);
}
}
return ret;
} }
MPTHoldersAndCursor MPTHoldersAndCursor
@@ -803,8 +740,9 @@ public:
std::optional<ripple::AccountID> lastItem; std::optional<ripple::AccountID> lastItem;
while (liveAccounts.size() < number) { while (liveAccounts.size() < number) {
Statement const statement = lastItem ? schema_->selectAccountFromToken.bind(*lastItem, Limit{pageSize}) Statement const statement = lastItem
: schema_->selectAccountFromBeginning.bind(Limit{pageSize}); ? schema_->selectAccountFromTokenScylla->bind(*lastItem, Limit{pageSize})
: schema_->selectAccountFromBeginningScylla->bind(Limit{pageSize});
auto const res = executor_.read(yield, statement); auto const res = executor_.read(yield, statement);
if (res) { if (res) {
@@ -1116,6 +1054,139 @@ private:
return true; return true;
} }
std::vector<ripple::uint256>
fetchNFTIDsByTaxon(
ripple::AccountID const& issuer,
std::uint32_t const taxon,
std::uint32_t const limit,
std::optional<ripple::uint256> const& cursorIn,
boost::asio::yield_context yield
) const
{
std::vector<ripple::uint256> nftIDs;
Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
statement.bindAt(1, taxon);
statement.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
statement.bindAt(3, Limit{limit});
auto const res = executor_.read(yield, statement);
if (res && res.value().hasRows()) {
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
nftIDs.push_back(nftID);
}
return nftIDs;
}
std::vector<ripple::uint256>
fetchNFTIDsWithoutTaxon(
ripple::AccountID const& issuer,
std::uint32_t const limit,
std::optional<ripple::uint256> const& cursorIn,
boost::asio::yield_context yield
) const
{
std::vector<ripple::uint256> nftIDs;
if (settingsProvider_.getSettings().provider == "aws_keyspace") {
// --- Amazon Keyspaces Workflow ---
auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
firstQuery.bindAt(1, startTaxon);
firstQuery.bindAt(2, startTokenID);
firstQuery.bindAt(3, Limit{limit});
auto const firstRes = executor_.read(yield, firstQuery);
if (firstRes) {
for (auto const [nftID] : extract<ripple::uint256>(firstRes.value()))
nftIDs.push_back(nftID);
}
if (nftIDs.size() < limit) {
auto const remainingLimit = limit - nftIDs.size();
Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces->bind(issuer);
secondQuery.bindAt(1, startTaxon);
secondQuery.bindAt(2, Limit{remainingLimit});
auto const secondRes = executor_.read(yield, secondQuery);
if (secondRes) {
for (auto const [nftID] : extract<ripple::uint256>(secondRes.value()))
nftIDs.push_back(nftID);
}
}
} else if (settingsProvider_.getSettings().provider == "scylladb") {
auto r = schema_->selectNFTsByIssuerScylla->bind(issuer);
r.bindAt(
1,
std::make_tuple(
cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
cursorIn.value_or(ripple::uint256(0))
)
);
r.bindAt(2, Limit{limit});
auto const res = executor_.read(yield, r);
if (res && res.value().hasRows()) {
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
nftIDs.push_back(nftID);
}
}
return nftIDs;
}
/**
* @brief Takes a list of NFT IDs, fetches their full data, and assembles the final result with a cursor.
*/
NFTsAndCursor
populateNFTsAndCreateCursor(
std::vector<ripple::uint256> const& nftIDs,
std::uint32_t const ledgerSequence,
std::uint32_t const limit,
boost::asio::yield_context yield
) const
{
if (nftIDs.empty()) {
LOG(log_.debug()) << "No rows returned";
return {};
}
NFTsAndCursor ret;
if (nftIDs.size() == limit)
ret.cursor = nftIDs.back();
// Prepare and execute queries to fetch NFT info and URIs in parallel.
std::vector<Statement> selectNFTStatements;
selectNFTStatements.reserve(nftIDs.size());
std::transform(
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
return schema_->selectNFT.bind(nftID, ledgerSequence);
}
);
std::vector<Statement> selectNFTURIStatements;
selectNFTURIStatements.reserve(nftIDs.size());
std::transform(
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
return schema_->selectNFTURI.bind(nftID, ledgerSequence);
}
);
auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
// Combine the results into final NFT objects.
for (auto i = 0u; i < nftIDs.size(); ++i) {
if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
auto [seq, owner, isBurned] = *maybeRow;
NFT nft(nftIDs[i], seq, owner, isBurned);
if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
nft.uri = *maybeUri;
ret.nfts.push_back(nft);
}
}
return ret;
}
}; };
using CassandraBackend = BasicCassandraBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>; using CassandraBackend = BasicCassandraBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>;

View File

@@ -28,6 +28,7 @@
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <optional>
#include <string> #include <string>
#include <string_view> #include <string_view>
#include <vector> #include <vector>
@@ -347,6 +348,86 @@ public:
Statements(SettingsProviderType const& settingsProvider, Handle const& handle) Statements(SettingsProviderType const& settingsProvider, Handle const& handle)
: settingsProvider_{settingsProvider}, handle_{std::cref(handle)} : settingsProvider_{settingsProvider}, handle_{std::cref(handle)}
{ {
// initialize scylladb supported queries
if (settingsProvider_.get().getSettings().provider == "scylladb") {
selectAccountFromBeginningScylla = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > 0
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
selectAccountFromTokenScylla = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > token(?)
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
selectNFTsByIssuerScylla = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT token_id
FROM {}
WHERE issuer = ?
AND (taxon, token_id) > ?
ORDER BY taxon ASC, token_id ASC
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
)
);
}();
updateLedgerRange = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
UPDATE {}
SET sequence = ?
WHERE is_latest = ?
IF sequence IN (?, null)
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
)
);
}();
// AWS_keyspace supported queries
} else if (settingsProvider_.get().getSettings().provider == "aws_keyspace") {
selectNFTsAfterTaxonKeyspaces = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT token_id
FROM {}
WHERE issuer = ?
AND taxon > ?
ORDER BY taxon ASC, token_id ASC
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
)
);
}();
}
} }
// //
@@ -526,6 +607,17 @@ public:
// Update (and "delete") queries // Update (and "delete") queries
// //
PreparedStatement insertLedgerRange = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
INSERT INTO {} (is_latest, sequence) VALUES (?, ?) IF NOT EXISTS
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
)
);
}();
PreparedStatement updateLedgerRange = [this]() { PreparedStatement updateLedgerRange = [this]() {
return handle_.get().prepare( return handle_.get().prepare(
fmt::format( fmt::format(
@@ -533,7 +625,7 @@ public:
UPDATE {} UPDATE {}
SET sequence = ? SET sequence = ?
WHERE is_latest = ? WHERE is_latest = ?
IF sequence IN (?, null) IF sequence = ?
)", )",
qualifiedTableName(settingsProvider_.get(), "ledger_range") qualifiedTableName(settingsProvider_.get(), "ledger_range")
) )
@@ -654,6 +746,10 @@ public:
); );
}(); }();
/*
Currently, these two SELECT statements is not used.
If we ever use them, will need to change the PER PARTITION LIMIT to support for Keyspace
PreparedStatement selectLedgerPageKeys = [this]() { PreparedStatement selectLedgerPageKeys = [this]() {
return handle_.get().prepare( return handle_.get().prepare(
fmt::format( fmt::format(
@@ -687,6 +783,7 @@ public:
) )
); );
}(); }();
*/
PreparedStatement getToken = [this]() { PreparedStatement getToken = [this]() {
return handle_.get().prepare( return handle_.get().prepare(
@@ -717,36 +814,6 @@ public:
); );
}(); }();
PreparedStatement selectAccountFromBeginning = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > 0
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
PreparedStatement selectAccountFromToken = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > token(?)
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
PreparedStatement selectAccountTxForward = [this]() { PreparedStatement selectAccountTxForward = [this]() {
return handle_.get().prepare( return handle_.get().prepare(
fmt::format( fmt::format(
@@ -827,22 +894,6 @@ public:
); );
}(); }();
PreparedStatement selectNFTIDsByIssuer = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT token_id
FROM {}
WHERE issuer = ?
AND (taxon, token_id) > ?
ORDER BY taxon ASC, token_id ASC
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
)
);
}();
PreparedStatement selectNFTIDsByIssuerTaxon = [this]() { PreparedStatement selectNFTIDsByIssuerTaxon = [this]() {
return handle_.get().prepare( return handle_.get().prepare(
fmt::format( fmt::format(
@@ -953,6 +1004,15 @@ public:
) )
); );
}(); }();
// For ScyllaDB / Cassandra ONLY
std::optional<PreparedStatement> selectAccountFromBeginningScylla;
std::optional<PreparedStatement> selectAccountFromTokenScylla;
std::optional<PreparedStatement> selectNFTsByIssuerScylla;
// For AWS Keyspaces ONLY
// NOTE: AWS keyspace is not able to load cache with accounts
std::optional<PreparedStatement> selectNFTsAfterTaxonKeyspaces;
}; };
/** /**

View File

@@ -97,6 +97,7 @@ SettingsProvider::parseSettings() const
settings.coreConnectionsPerHost = config_.get<uint32_t>("core_connections_per_host"); settings.coreConnectionsPerHost = config_.get<uint32_t>("core_connections_per_host");
settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io"); settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io");
settings.writeBatchSize = config_.get<std::size_t>("write_batch_size"); settings.writeBatchSize = config_.get<std::size_t>("write_batch_size");
settings.provider = config_.get<std::string>("provider");
if (config_.getValueView("connect_timeout").hasValue()) { if (config_.getValueView("connect_timeout").hasValue()) {
auto const connectTimeoutSecond = config_.get<uint32_t>("connect_timeout"); auto const connectTimeoutSecond = config_.get<uint32_t>("connect_timeout");

View File

@@ -36,9 +36,18 @@ constexpr auto kBATCH_DELETER = [](CassBatch* ptr) { cass_batch_free(ptr); };
namespace data::cassandra::impl { namespace data::cassandra::impl {
// TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases /*
* There are 2 main batches of Cassandra Statements:
* LOGGED: Ensures all updates in the batch succeed together, or none do.
* Use this for critical, related changes (e.g., for the same user), but it is slower.
*
* UNLOGGED: For performance. Sends many separate updates in one network trip to be fast.
* Use this for bulk-loading unrelated data, but know there's NO all-or-nothing guarantee.
*
* More info here: https://docs.datastax.com/en/developer/cpp-driver-dse/1.10/features/basics/batches/index.html
*/
Batch::Batch(std::vector<Statement> const& statements) Batch::Batch(std::vector<Statement> const& statements)
: ManagedObject{cass_batch_new(CASS_BATCH_TYPE_LOGGED), kBATCH_DELETER} : ManagedObject{cass_batch_new(CASS_BATCH_TYPE_UNLOGGED), kBATCH_DELETER}
{ {
cass_batch_set_is_idempotent(*this, cass_true); cass_batch_set_is_idempotent(*this, cass_true);

View File

@@ -60,6 +60,13 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), k
cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count()); cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count());
cass_cluster_set_request_timeout(*this, settings.requestTimeout.count()); cass_cluster_set_request_timeout(*this, settings.requestTimeout.count());
// TODO: AWS keyspace reads should be local_one to save cost
if (settings.provider == "aws_keyspace") {
if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); rc != CASS_OK) {
throw std::runtime_error(fmt::format("Error setting cassandra consistency: {}", cass_error_desc(rc)));
}
}
if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost); if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost);
rc != CASS_OK) { rc != CASS_OK) {
throw std::runtime_error(fmt::format("Could not set core connections per host: {}", cass_error_desc(rc))); throw std::runtime_error(fmt::format("Could not set core connections per host: {}", cass_error_desc(rc)));

View File

@@ -45,6 +45,7 @@ struct Settings {
static constexpr uint32_t kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000; static constexpr uint32_t kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000;
static constexpr uint32_t kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000; static constexpr uint32_t kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000;
static constexpr std::size_t kDEFAULT_BATCH_SIZE = 20; static constexpr std::size_t kDEFAULT_BATCH_SIZE = 20;
static constexpr std::string kDEFAULT_PROVIDER = "cassandra";
/** /**
* @brief Represents the configuration of contact points for cassandra. * @brief Represents the configuration of contact points for cassandra.
@@ -83,11 +84,14 @@ struct Settings {
uint32_t maxReadRequestsOutstanding = kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING; uint32_t maxReadRequestsOutstanding = kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING;
/** @brief The number of connection per host to always have active */ /** @brief The number of connection per host to always have active */
uint32_t coreConnectionsPerHost = 1u; uint32_t coreConnectionsPerHost = 3u;
/** @brief Size of batches when writing */ /** @brief Size of batches when writing */
std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE; std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE;
/** @brief Provider to know if we are using scylladb or keyspace */
std::string provider = kDEFAULT_PROVIDER;
/** @brief Size of the IO queue */ /** @brief Size of the IO queue */
std::optional<uint32_t> queueSizeIO = std::nullopt; // NOLINT(readability-redundant-member-init) std::optional<uint32_t> queueSizeIO = std::nullopt; // NOLINT(readability-redundant-member-init)

View File

@@ -58,14 +58,14 @@ public:
explicit Statement(std::string_view query, Args&&... args) explicit Statement(std::string_view query, Args&&... args)
: ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), kDELETER} : ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), kDELETER}
{ {
cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM);
cass_statement_set_is_idempotent(*this, cass_true); cass_statement_set_is_idempotent(*this, cass_true);
bind<Args...>(std::forward<Args>(args)...); bind<Args...>(std::forward<Args>(args)...);
} }
/* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, kDELETER} /* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, kDELETER}
{ {
cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM);
cass_statement_set_is_idempotent(*this, cass_true); cass_statement_set_is_idempotent(*this, cass_true);
} }

View File

@@ -84,6 +84,11 @@ static constexpr std::array<char const*, 1> kDATABASE_TYPE = {"cassandra"};
*/ */
static constexpr std::array<char const*, 2> kPROCESSING_POLICY = {"parallel", "sequent"}; static constexpr std::array<char const*, 2> kPROCESSING_POLICY = {"parallel", "sequent"};
/**
* @brief specific values that are accepted for database provider in config.
*/
static constexpr std::array<char const*, 2> kPROVIDER = {"cassandra", "aws_keyspace"};
/** /**
* @brief An interface to enforce constraints on certain values within ClioConfigDefinition. * @brief An interface to enforce constraints on certain values within ClioConfigDefinition.
*/ */
@@ -470,6 +475,7 @@ static constinit OneOf gValidateCassandraName{"database.type", kDATABASE_TYPE};
static constinit OneOf gValidateLoadMode{"cache.load", kLOAD_CACHE_MODE}; static constinit OneOf gValidateLoadMode{"cache.load", kLOAD_CACHE_MODE};
static constinit OneOf gValidateLogTag{"log.tag_style", kLOG_TAGS}; static constinit OneOf gValidateLogTag{"log.tag_style", kLOG_TAGS};
static constinit OneOf gValidateProcessingPolicy{"server.processing_policy", kPROCESSING_POLICY}; static constinit OneOf gValidateProcessingPolicy{"server.processing_policy", kPROCESSING_POLICY};
static constinit OneOf gValidateProvider{"database.cassandra.provider", kPROVIDER};
static constinit PositiveDouble gValidatePositiveDouble{}; static constinit PositiveDouble gValidatePositiveDouble{};

View File

@@ -285,6 +285,8 @@ getClioConfig()
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.provider",
ConfigValue{ConfigType::String}.defaultValue("cassandra").withConstraint(gValidateProvider)},
{"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, {"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
{"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, {"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},

View File

@@ -173,6 +173,7 @@ This document provides a list of all available Clio configuration properties in
"Maximum number of outstanding read requests. Read requests are API calls that read from the database."}, "Maximum number of outstanding read requests. Read requests are API calls that read from the database."},
KV{.key = "database.cassandra.threads", KV{.key = "database.cassandra.threads",
.value = "Represents the number of threads that will be used for database operations."}, .value = "Represents the number of threads that will be used for database operations."},
KV{.key = "database.cassandra.provider", .value = "The specific database backend provider we are using."},
KV{.key = "database.cassandra.core_connections_per_host", KV{.key = "database.cassandra.core_connections_per_host",
.value = "The number of core connections per host for the Cassandra database."}, .value = "The number of core connections per host for the Cassandra database."},
KV{.key = "database.cassandra.queue_size_io", KV{.key = "database.cassandra.queue_size_io",

View File

@@ -44,6 +44,7 @@ using namespace util::config;
struct BackendCassandraFactoryTest : SyncAsioContextTest, util::prometheus::WithPrometheus { struct BackendCassandraFactoryTest : SyncAsioContextTest, util::prometheus::WithPrometheus {
static constexpr auto kKEYSPACE = "factory_test"; static constexpr auto kKEYSPACE = "factory_test";
static constexpr auto kPROVIDER = "cassandra";
protected: protected:
ClioConfigDefinition cfg_{ ClioConfigDefinition cfg_{
@@ -53,6 +54,7 @@ protected:
{"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
{"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(kKEYSPACE)}, {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(kKEYSPACE)},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue(kPROVIDER)},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)}, {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},

View File

@@ -93,6 +93,7 @@ protected:
{"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
{"database.cassandra.keyspace", {"database.cassandra.keyspace",
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)}, ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)}, {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},

View File

@@ -103,6 +103,7 @@ protected:
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)}, ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)},
{"database.cassandra.keyspace", {"database.cassandra.keyspace",
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)}, ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)}, {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)},

View File

@@ -59,6 +59,7 @@ getParseSettingsConfig(boost::json::value val)
{"database.cassandra.certificate", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.certificate", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
{"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional()},
{"database.cassandra.write_batch_size", ConfigValue{ConfigType::Integer}.defaultValue(20)}, {"database.cassandra.write_batch_size", ConfigValue{ConfigType::Integer}.defaultValue(20)},
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.optional()},