mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-04 11:55:51 +00:00
Compare commits
16 Commits
2.6.0-rc4
...
experiment
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7eaf0005e4 | ||
|
|
497721ee7c | ||
|
|
26530108e3 | ||
|
|
fc88abdaeb | ||
|
|
8bc36c2c0b | ||
|
|
4e9558f76b | ||
|
|
84db880ce7 | ||
|
|
f88ce31363 | ||
|
|
e03f5e46c0 | ||
|
|
30da8d8f63 | ||
|
|
8f6bec2e25 | ||
|
|
0d9a83fd4d | ||
|
|
47c2af0421 | ||
|
|
c3e04426d3 | ||
|
|
d598396445 | ||
|
|
bbd2884e3b |
@@ -89,6 +89,14 @@ This document provides a list of all available Clio configuration properties in
|
||||
- **Constraints**: The minimum value is `1`. The maximum value is `4294967295`.
|
||||
- **Description**: Represents the number of threads that will be used for database operations.
|
||||
|
||||
### database.cassandra.provider
|
||||
|
||||
- **Required**: True
|
||||
- **Type**: string
|
||||
- **Default value**: `cassandra`
|
||||
- **Constraints**: The value must be one of the following: `cassandra`, `aws_keyspace`.
|
||||
- **Description**: The specific database backend provider we are using.
|
||||
|
||||
### database.cassandra.core_connections_per_host
|
||||
|
||||
- **Required**: True
|
||||
|
||||
@@ -225,8 +225,11 @@ public:
|
||||
{
|
||||
waitForWritesToFinish();
|
||||
|
||||
if (!range_) {
|
||||
executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_);
|
||||
// !range_.has_value() means the table 'ledger_range' is not populated; This would be the first write to the
|
||||
// table In this case, insert both min_sequence/max_sequence range into the table
|
||||
if (!range_.has_value()) {
|
||||
executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_);
|
||||
executor_.writeSync(schema_->insertLedgerRange, true, ledgerSequence_);
|
||||
}
|
||||
|
||||
if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
|
||||
@@ -513,80 +516,14 @@ public:
|
||||
boost::asio::yield_context yield
|
||||
) const override
|
||||
{
|
||||
NFTsAndCursor ret;
|
||||
|
||||
Statement const idQueryStatement = [&taxon, &issuer, &cursorIn, &limit, this]() {
|
||||
if (taxon.has_value()) {
|
||||
auto r = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
|
||||
r.bindAt(1, *taxon);
|
||||
r.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
|
||||
r.bindAt(3, Limit{limit});
|
||||
return r;
|
||||
}
|
||||
|
||||
auto r = schema_->selectNFTIDsByIssuer.bind(issuer);
|
||||
r.bindAt(
|
||||
1,
|
||||
std::make_tuple(
|
||||
cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
|
||||
cursorIn.value_or(ripple::uint256(0))
|
||||
)
|
||||
);
|
||||
r.bindAt(2, Limit{limit});
|
||||
return r;
|
||||
}();
|
||||
|
||||
// Query for all the NFTs issued by the account, potentially filtered by the taxon
|
||||
auto const res = executor_.read(yield, idQueryStatement);
|
||||
|
||||
auto const& idQueryResults = res.value();
|
||||
if (not idQueryResults.hasRows()) {
|
||||
LOG(log_.debug()) << "No rows returned";
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<ripple::uint256> nftIDs;
|
||||
for (auto const [nftID] : extract<ripple::uint256>(idQueryResults))
|
||||
nftIDs.push_back(nftID);
|
||||
|
||||
if (nftIDs.empty())
|
||||
return ret;
|
||||
|
||||
if (nftIDs.size() == limit)
|
||||
ret.cursor = nftIDs.back();
|
||||
|
||||
std::vector<Statement> selectNFTStatements;
|
||||
selectNFTStatements.reserve(nftIDs.size());
|
||||
|
||||
std::transform(
|
||||
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
|
||||
return schema_->selectNFT.bind(nftID, ledgerSequence);
|
||||
}
|
||||
);
|
||||
|
||||
auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
|
||||
|
||||
std::vector<Statement> selectNFTURIStatements;
|
||||
selectNFTURIStatements.reserve(nftIDs.size());
|
||||
|
||||
std::transform(
|
||||
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
|
||||
return schema_->selectNFTURI.bind(nftID, ledgerSequence);
|
||||
}
|
||||
);
|
||||
|
||||
auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
|
||||
|
||||
for (auto i = 0u; i < nftIDs.size(); i++) {
|
||||
if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
|
||||
auto [seq, owner, isBurned] = *maybeRow;
|
||||
NFT nft(nftIDs[i], seq, owner, isBurned);
|
||||
if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
|
||||
nft.uri = *maybeUri;
|
||||
ret.nfts.push_back(nft);
|
||||
}
|
||||
if (taxon.has_value()) {
|
||||
nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield);
|
||||
} else {
|
||||
nftIDs = fetchNFTIDsWithoutTaxon(issuer, limit, cursorIn, yield);
|
||||
}
|
||||
return ret;
|
||||
|
||||
return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield);
|
||||
}
|
||||
|
||||
MPTHoldersAndCursor
|
||||
@@ -803,8 +740,9 @@ public:
|
||||
std::optional<ripple::AccountID> lastItem;
|
||||
|
||||
while (liveAccounts.size() < number) {
|
||||
Statement const statement = lastItem ? schema_->selectAccountFromToken.bind(*lastItem, Limit{pageSize})
|
||||
: schema_->selectAccountFromBeginning.bind(Limit{pageSize});
|
||||
Statement const statement = lastItem
|
||||
? schema_->selectAccountFromTokenScylla->bind(*lastItem, Limit{pageSize})
|
||||
: schema_->selectAccountFromBeginningScylla->bind(Limit{pageSize});
|
||||
|
||||
auto const res = executor_.read(yield, statement);
|
||||
if (res) {
|
||||
@@ -1116,6 +1054,139 @@ private:
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::vector<ripple::uint256>
|
||||
fetchNFTIDsByTaxon(
|
||||
ripple::AccountID const& issuer,
|
||||
std::uint32_t const taxon,
|
||||
std::uint32_t const limit,
|
||||
std::optional<ripple::uint256> const& cursorIn,
|
||||
boost::asio::yield_context yield
|
||||
) const
|
||||
{
|
||||
std::vector<ripple::uint256> nftIDs;
|
||||
Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
|
||||
statement.bindAt(1, taxon);
|
||||
statement.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
|
||||
statement.bindAt(3, Limit{limit});
|
||||
|
||||
auto const res = executor_.read(yield, statement);
|
||||
if (res && res.value().hasRows()) {
|
||||
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
|
||||
nftIDs.push_back(nftID);
|
||||
}
|
||||
return nftIDs;
|
||||
}
|
||||
|
||||
std::vector<ripple::uint256>
|
||||
fetchNFTIDsWithoutTaxon(
|
||||
ripple::AccountID const& issuer,
|
||||
std::uint32_t const limit,
|
||||
std::optional<ripple::uint256> const& cursorIn,
|
||||
boost::asio::yield_context yield
|
||||
) const
|
||||
{
|
||||
std::vector<ripple::uint256> nftIDs;
|
||||
if (settingsProvider_.getSettings().provider == "aws_keyspace") {
|
||||
// --- Amazon Keyspaces Workflow ---
|
||||
auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
|
||||
auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
|
||||
|
||||
Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
|
||||
firstQuery.bindAt(1, startTaxon);
|
||||
firstQuery.bindAt(2, startTokenID);
|
||||
firstQuery.bindAt(3, Limit{limit});
|
||||
|
||||
auto const firstRes = executor_.read(yield, firstQuery);
|
||||
if (firstRes) {
|
||||
for (auto const [nftID] : extract<ripple::uint256>(firstRes.value()))
|
||||
nftIDs.push_back(nftID);
|
||||
}
|
||||
|
||||
if (nftIDs.size() < limit) {
|
||||
auto const remainingLimit = limit - nftIDs.size();
|
||||
Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces->bind(issuer);
|
||||
secondQuery.bindAt(1, startTaxon);
|
||||
secondQuery.bindAt(2, Limit{remainingLimit});
|
||||
|
||||
auto const secondRes = executor_.read(yield, secondQuery);
|
||||
if (secondRes) {
|
||||
for (auto const [nftID] : extract<ripple::uint256>(secondRes.value()))
|
||||
nftIDs.push_back(nftID);
|
||||
}
|
||||
}
|
||||
} else if (settingsProvider_.getSettings().provider == "scylladb") {
|
||||
auto r = schema_->selectNFTsByIssuerScylla->bind(issuer);
|
||||
r.bindAt(
|
||||
1,
|
||||
std::make_tuple(
|
||||
cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
|
||||
cursorIn.value_or(ripple::uint256(0))
|
||||
)
|
||||
);
|
||||
r.bindAt(2, Limit{limit});
|
||||
|
||||
auto const res = executor_.read(yield, r);
|
||||
if (res && res.value().hasRows()) {
|
||||
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
|
||||
nftIDs.push_back(nftID);
|
||||
}
|
||||
}
|
||||
return nftIDs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Takes a list of NFT IDs, fetches their full data, and assembles the final result with a cursor.
|
||||
*/
|
||||
NFTsAndCursor
|
||||
populateNFTsAndCreateCursor(
|
||||
std::vector<ripple::uint256> const& nftIDs,
|
||||
std::uint32_t const ledgerSequence,
|
||||
std::uint32_t const limit,
|
||||
boost::asio::yield_context yield
|
||||
) const
|
||||
{
|
||||
if (nftIDs.empty()) {
|
||||
LOG(log_.debug()) << "No rows returned";
|
||||
return {};
|
||||
}
|
||||
|
||||
NFTsAndCursor ret;
|
||||
if (nftIDs.size() == limit)
|
||||
ret.cursor = nftIDs.back();
|
||||
|
||||
// Prepare and execute queries to fetch NFT info and URIs in parallel.
|
||||
std::vector<Statement> selectNFTStatements;
|
||||
selectNFTStatements.reserve(nftIDs.size());
|
||||
std::transform(
|
||||
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
|
||||
return schema_->selectNFT.bind(nftID, ledgerSequence);
|
||||
}
|
||||
);
|
||||
|
||||
std::vector<Statement> selectNFTURIStatements;
|
||||
selectNFTURIStatements.reserve(nftIDs.size());
|
||||
std::transform(
|
||||
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
|
||||
return schema_->selectNFTURI.bind(nftID, ledgerSequence);
|
||||
}
|
||||
);
|
||||
|
||||
auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
|
||||
auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
|
||||
|
||||
// Combine the results into final NFT objects.
|
||||
for (auto i = 0u; i < nftIDs.size(); ++i) {
|
||||
if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
|
||||
auto [seq, owner, isBurned] = *maybeRow;
|
||||
NFT nft(nftIDs[i], seq, owner, isBurned);
|
||||
if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
|
||||
nft.uri = *maybeUri;
|
||||
ret.nfts.push_back(nft);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
|
||||
using CassandraBackend = BasicCassandraBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>;
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <vector>
|
||||
@@ -347,6 +348,86 @@ public:
|
||||
Statements(SettingsProviderType const& settingsProvider, Handle const& handle)
|
||||
: settingsProvider_{settingsProvider}, handle_{std::cref(handle)}
|
||||
{
|
||||
// initialize scylladb supported queries
|
||||
if (settingsProvider_.get().getSettings().provider == "scylladb") {
|
||||
selectAccountFromBeginningScylla = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
R"(
|
||||
SELECT account
|
||||
FROM {}
|
||||
WHERE token(account) > 0
|
||||
PER PARTITION LIMIT 1
|
||||
LIMIT ?
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "account_tx")
|
||||
)
|
||||
);
|
||||
}();
|
||||
|
||||
selectAccountFromTokenScylla = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
R"(
|
||||
SELECT account
|
||||
FROM {}
|
||||
WHERE token(account) > token(?)
|
||||
PER PARTITION LIMIT 1
|
||||
LIMIT ?
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "account_tx")
|
||||
)
|
||||
);
|
||||
}();
|
||||
|
||||
selectNFTsByIssuerScylla = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
R"(
|
||||
SELECT token_id
|
||||
FROM {}
|
||||
WHERE issuer = ?
|
||||
AND (taxon, token_id) > ?
|
||||
ORDER BY taxon ASC, token_id ASC
|
||||
LIMIT ?
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
|
||||
)
|
||||
);
|
||||
}();
|
||||
|
||||
updateLedgerRange = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
R"(
|
||||
UPDATE {}
|
||||
SET sequence = ?
|
||||
WHERE is_latest = ?
|
||||
IF sequence IN (?, null)
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "ledger_range")
|
||||
)
|
||||
);
|
||||
}();
|
||||
|
||||
// AWS_keyspace supported queries
|
||||
} else if (settingsProvider_.get().getSettings().provider == "aws_keyspace") {
|
||||
selectNFTsAfterTaxonKeyspaces = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
R"(
|
||||
SELECT token_id
|
||||
FROM {}
|
||||
WHERE issuer = ?
|
||||
AND taxon > ?
|
||||
ORDER BY taxon ASC, token_id ASC
|
||||
LIMIT ?
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
|
||||
)
|
||||
);
|
||||
}();
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
@@ -526,6 +607,17 @@ public:
|
||||
// Update (and "delete") queries
|
||||
//
|
||||
|
||||
PreparedStatement insertLedgerRange = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
R"(
|
||||
INSERT INTO {} (is_latest, sequence) VALUES (?, ?) IF NOT EXISTS
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "ledger_range")
|
||||
)
|
||||
);
|
||||
}();
|
||||
|
||||
PreparedStatement updateLedgerRange = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
@@ -533,7 +625,7 @@ public:
|
||||
UPDATE {}
|
||||
SET sequence = ?
|
||||
WHERE is_latest = ?
|
||||
IF sequence IN (?, null)
|
||||
IF sequence = ?
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "ledger_range")
|
||||
)
|
||||
@@ -654,6 +746,10 @@ public:
|
||||
);
|
||||
}();
|
||||
|
||||
/*
|
||||
Currently, these two SELECT statements is not used.
|
||||
If we ever use them, will need to change the PER PARTITION LIMIT to support for Keyspace
|
||||
|
||||
PreparedStatement selectLedgerPageKeys = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
@@ -687,6 +783,7 @@ public:
|
||||
)
|
||||
);
|
||||
}();
|
||||
*/
|
||||
|
||||
PreparedStatement getToken = [this]() {
|
||||
return handle_.get().prepare(
|
||||
@@ -717,36 +814,6 @@ public:
|
||||
);
|
||||
}();
|
||||
|
||||
PreparedStatement selectAccountFromBeginning = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
R"(
|
||||
SELECT account
|
||||
FROM {}
|
||||
WHERE token(account) > 0
|
||||
PER PARTITION LIMIT 1
|
||||
LIMIT ?
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "account_tx")
|
||||
)
|
||||
);
|
||||
}();
|
||||
|
||||
PreparedStatement selectAccountFromToken = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
R"(
|
||||
SELECT account
|
||||
FROM {}
|
||||
WHERE token(account) > token(?)
|
||||
PER PARTITION LIMIT 1
|
||||
LIMIT ?
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "account_tx")
|
||||
)
|
||||
);
|
||||
}();
|
||||
|
||||
PreparedStatement selectAccountTxForward = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
@@ -827,22 +894,6 @@ public:
|
||||
);
|
||||
}();
|
||||
|
||||
PreparedStatement selectNFTIDsByIssuer = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
R"(
|
||||
SELECT token_id
|
||||
FROM {}
|
||||
WHERE issuer = ?
|
||||
AND (taxon, token_id) > ?
|
||||
ORDER BY taxon ASC, token_id ASC
|
||||
LIMIT ?
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
|
||||
)
|
||||
);
|
||||
}();
|
||||
|
||||
PreparedStatement selectNFTIDsByIssuerTaxon = [this]() {
|
||||
return handle_.get().prepare(
|
||||
fmt::format(
|
||||
@@ -953,6 +1004,15 @@ public:
|
||||
)
|
||||
);
|
||||
}();
|
||||
|
||||
// For ScyllaDB / Cassandra ONLY
|
||||
std::optional<PreparedStatement> selectAccountFromBeginningScylla;
|
||||
std::optional<PreparedStatement> selectAccountFromTokenScylla;
|
||||
std::optional<PreparedStatement> selectNFTsByIssuerScylla;
|
||||
|
||||
// For AWS Keyspaces ONLY
|
||||
// NOTE: AWS keyspace is not able to load cache with accounts
|
||||
std::optional<PreparedStatement> selectNFTsAfterTaxonKeyspaces;
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -97,6 +97,7 @@ SettingsProvider::parseSettings() const
|
||||
settings.coreConnectionsPerHost = config_.get<uint32_t>("core_connections_per_host");
|
||||
settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io");
|
||||
settings.writeBatchSize = config_.get<std::size_t>("write_batch_size");
|
||||
settings.provider = config_.get<std::string>("provider");
|
||||
|
||||
if (config_.getValueView("connect_timeout").hasValue()) {
|
||||
auto const connectTimeoutSecond = config_.get<uint32_t>("connect_timeout");
|
||||
|
||||
@@ -36,9 +36,18 @@ constexpr auto kBATCH_DELETER = [](CassBatch* ptr) { cass_batch_free(ptr); };
|
||||
|
||||
namespace data::cassandra::impl {
|
||||
|
||||
// TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases
|
||||
/*
|
||||
* There are 2 main batches of Cassandra Statements:
|
||||
* LOGGED: Ensures all updates in the batch succeed together, or none do.
|
||||
* Use this for critical, related changes (e.g., for the same user), but it is slower.
|
||||
*
|
||||
* UNLOGGED: For performance. Sends many separate updates in one network trip to be fast.
|
||||
* Use this for bulk-loading unrelated data, but know there's NO all-or-nothing guarantee.
|
||||
*
|
||||
* More info here: https://docs.datastax.com/en/developer/cpp-driver-dse/1.10/features/basics/batches/index.html
|
||||
*/
|
||||
Batch::Batch(std::vector<Statement> const& statements)
|
||||
: ManagedObject{cass_batch_new(CASS_BATCH_TYPE_LOGGED), kBATCH_DELETER}
|
||||
: ManagedObject{cass_batch_new(CASS_BATCH_TYPE_UNLOGGED), kBATCH_DELETER}
|
||||
{
|
||||
cass_batch_set_is_idempotent(*this, cass_true);
|
||||
|
||||
|
||||
@@ -60,6 +60,13 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), k
|
||||
cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count());
|
||||
cass_cluster_set_request_timeout(*this, settings.requestTimeout.count());
|
||||
|
||||
// TODO: AWS keyspace reads should be local_one to save cost
|
||||
if (settings.provider == "aws_keyspace") {
|
||||
if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); rc != CASS_OK) {
|
||||
throw std::runtime_error(fmt::format("Error setting cassandra consistency: {}", cass_error_desc(rc)));
|
||||
}
|
||||
}
|
||||
|
||||
if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost);
|
||||
rc != CASS_OK) {
|
||||
throw std::runtime_error(fmt::format("Could not set core connections per host: {}", cass_error_desc(rc)));
|
||||
|
||||
@@ -45,6 +45,7 @@ struct Settings {
|
||||
static constexpr uint32_t kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000;
|
||||
static constexpr uint32_t kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000;
|
||||
static constexpr std::size_t kDEFAULT_BATCH_SIZE = 20;
|
||||
static constexpr std::string kDEFAULT_PROVIDER = "cassandra";
|
||||
|
||||
/**
|
||||
* @brief Represents the configuration of contact points for cassandra.
|
||||
@@ -83,11 +84,14 @@ struct Settings {
|
||||
uint32_t maxReadRequestsOutstanding = kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING;
|
||||
|
||||
/** @brief The number of connection per host to always have active */
|
||||
uint32_t coreConnectionsPerHost = 1u;
|
||||
uint32_t coreConnectionsPerHost = 3u;
|
||||
|
||||
/** @brief Size of batches when writing */
|
||||
std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE;
|
||||
|
||||
/** @brief Provider to know if we are using scylladb or keyspace */
|
||||
std::string provider = kDEFAULT_PROVIDER;
|
||||
|
||||
/** @brief Size of the IO queue */
|
||||
std::optional<uint32_t> queueSizeIO = std::nullopt; // NOLINT(readability-redundant-member-init)
|
||||
|
||||
|
||||
@@ -58,14 +58,14 @@ public:
|
||||
explicit Statement(std::string_view query, Args&&... args)
|
||||
: ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), kDELETER}
|
||||
{
|
||||
cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM);
|
||||
cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM);
|
||||
cass_statement_set_is_idempotent(*this, cass_true);
|
||||
bind<Args...>(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
/* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, kDELETER}
|
||||
{
|
||||
cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM);
|
||||
cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM);
|
||||
cass_statement_set_is_idempotent(*this, cass_true);
|
||||
}
|
||||
|
||||
|
||||
@@ -84,6 +84,11 @@ static constexpr std::array<char const*, 1> kDATABASE_TYPE = {"cassandra"};
|
||||
*/
|
||||
static constexpr std::array<char const*, 2> kPROCESSING_POLICY = {"parallel", "sequent"};
|
||||
|
||||
/**
|
||||
* @brief specific values that are accepted for database provider in config.
|
||||
*/
|
||||
static constexpr std::array<char const*, 2> kPROVIDER = {"cassandra", "aws_keyspace"};
|
||||
|
||||
/**
|
||||
* @brief An interface to enforce constraints on certain values within ClioConfigDefinition.
|
||||
*/
|
||||
@@ -470,6 +475,7 @@ static constinit OneOf gValidateCassandraName{"database.type", kDATABASE_TYPE};
|
||||
static constinit OneOf gValidateLoadMode{"cache.load", kLOAD_CACHE_MODE};
|
||||
static constinit OneOf gValidateLogTag{"log.tag_style", kLOG_TAGS};
|
||||
static constinit OneOf gValidateProcessingPolicy{"server.processing_policy", kPROCESSING_POLICY};
|
||||
static constinit OneOf gValidateProvider{"database.cassandra.provider", kPROVIDER};
|
||||
|
||||
static constinit PositiveDouble gValidatePositiveDouble{};
|
||||
|
||||
|
||||
@@ -285,6 +285,8 @@ getClioConfig()
|
||||
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
|
||||
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
|
||||
{"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()},
|
||||
{"database.cassandra.provider",
|
||||
ConfigValue{ConfigType::String}.defaultValue("cassandra").withConstraint(gValidateProvider)},
|
||||
|
||||
{"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
|
||||
{"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
|
||||
|
||||
@@ -173,6 +173,7 @@ This document provides a list of all available Clio configuration properties in
|
||||
"Maximum number of outstanding read requests. Read requests are API calls that read from the database."},
|
||||
KV{.key = "database.cassandra.threads",
|
||||
.value = "Represents the number of threads that will be used for database operations."},
|
||||
KV{.key = "database.cassandra.provider", .value = "The specific database backend provider we are using."},
|
||||
KV{.key = "database.cassandra.core_connections_per_host",
|
||||
.value = "The number of core connections per host for the Cassandra database."},
|
||||
KV{.key = "database.cassandra.queue_size_io",
|
||||
|
||||
@@ -44,6 +44,7 @@ using namespace util::config;
|
||||
|
||||
struct BackendCassandraFactoryTest : SyncAsioContextTest, util::prometheus::WithPrometheus {
|
||||
static constexpr auto kKEYSPACE = "factory_test";
|
||||
static constexpr auto kPROVIDER = "cassandra";
|
||||
|
||||
protected:
|
||||
ClioConfigDefinition cfg_{
|
||||
@@ -53,6 +54,7 @@ protected:
|
||||
{"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
|
||||
{"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
|
||||
{"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(kKEYSPACE)},
|
||||
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue(kPROVIDER)},
|
||||
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
|
||||
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
|
||||
{"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},
|
||||
|
||||
@@ -93,6 +93,7 @@ protected:
|
||||
{"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
|
||||
{"database.cassandra.keyspace",
|
||||
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
|
||||
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
|
||||
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
|
||||
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
|
||||
{"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},
|
||||
|
||||
@@ -103,6 +103,7 @@ protected:
|
||||
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)},
|
||||
{"database.cassandra.keyspace",
|
||||
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
|
||||
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
|
||||
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
|
||||
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
|
||||
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)},
|
||||
|
||||
@@ -59,6 +59,7 @@ getParseSettingsConfig(boost::json::value val)
|
||||
{"database.cassandra.certificate", ConfigValue{ConfigType::String}.optional()},
|
||||
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
|
||||
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
|
||||
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
|
||||
{"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional()},
|
||||
{"database.cassandra.write_batch_size", ConfigValue{ConfigType::Integer}.defaultValue(20)},
|
||||
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.optional()},
|
||||
|
||||
Reference in New Issue
Block a user