feat: Support Keyspace

This commit is contained in:
Peter Chen
2025-08-18 15:38:56 -04:00
parent 73bc85864b
commit bbd2884e3b
12 changed files with 194 additions and 82 deletions

View File

@@ -87,6 +87,14 @@ This document provides a list of all available Clio configuration properties in
- **Constraints**: The minimum value is `1`. The maximum value is `4294967295`.
- **Description**: Represents the number of threads that will be used for database operations.
### database.cassandra.provider
- **Required**: True
- **Type**: string
- **Default value**: `scylladb`
- **Constraints**: The value must be one of the following: `scylladb`, `aws_keyspace`.
- **Description**: The specific database backend provider we are using. Currently we only support scylladb, or aws_keyspace.
### database.cassandra.core_connections_per_host
- **Required**: True

View File

@@ -225,8 +225,9 @@ public:
{
waitForWritesToFinish();
// only run if require to write in first ledger
if (!range_) {
executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_);
executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_);
}
if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
@@ -514,44 +515,79 @@ public:
) const override
{
NFTsAndCursor ret;
std::vector<ripple::uint256> nftIDs;
Statement const idQueryStatement = [&taxon, &issuer, &cursorIn, &limit, this]() {
if (taxon.has_value()) {
auto r = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
r.bindAt(1, *taxon);
r.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
r.bindAt(3, Limit{limit});
return r;
// --- A specific taxon is requested ---
if (taxon.has_value()) {
Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
statement.bindAt(1, *taxon);
statement.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
statement.bindAt(3, Limit{limit});
auto const res = executor_.read(yield, statement);
if (res && res.value().hasRows()) {
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
nftIDs.push_back(nftID);
}
}
// --- No taxon is specified (general pagination) ---
else {
if (settingsProvider_.getSettings().provider == "aws_keyspace") {
// --- Amazon Keyspaces Workflow ---
auto const startTaxon =
cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
auto r = schema_->selectNFTIDsByIssuer.bind(issuer);
r.bindAt(
1,
std::make_tuple(
cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
cursorIn.value_or(ripple::uint256(0))
)
);
r.bindAt(2, Limit{limit});
return r;
}();
// Execute the first query to try and fill the page from the current taxon.
Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
firstQuery.bindAt(1, startTaxon);
firstQuery.bindAt(2, startTokenID);
firstQuery.bindAt(3, Limit{limit});
// Query for all the NFTs issued by the account, potentially filtered by the taxon
auto const res = executor_.read(yield, idQueryStatement);
auto const firstRes = executor_.read(yield, firstQuery);
if (firstRes) {
for (auto const [nftID] : extract<ripple::uint256>(firstRes.value()))
nftIDs.push_back(nftID);
}
auto const& idQueryResults = res.value();
if (not idQueryResults.hasRows()) {
// If the page is not full, execute the second query to get the rest.
if (nftIDs.size() < limit) {
auto const remainingLimit = limit - nftIDs.size();
Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces->bind(issuer);
secondQuery.bindAt(1, startTaxon);
secondQuery.bindAt(2, Limit{remainingLimit});
auto const secondRes = executor_.read(yield, secondQuery);
if (secondRes) {
for (auto const [nftID] : extract<ripple::uint256>(secondRes.value()))
nftIDs.push_back(nftID);
}
}
} else if (settingsProvider_.getSettings().provider == "scylladb") {
auto r = schema_->selectNFTsByIssuerScylla->bind(issuer); // Note: using optional
r.bindAt(
1,
std::make_tuple(
cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
cursorIn.value_or(ripple::uint256(0))
)
);
r.bindAt(2, Limit{limit});
auto const res = executor_.read(yield, r);
if (res && res.value().hasRows()) {
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
nftIDs.push_back(nftID);
}
}
}
if (nftIDs.empty()) {
LOG(log_.debug()) << "No rows returned";
return {};
}
std::vector<ripple::uint256> nftIDs;
for (auto const [nftID] : extract<ripple::uint256>(idQueryResults))
nftIDs.push_back(nftID);
if (nftIDs.empty())
return ret;
if (nftIDs.size() == limit)
ret.cursor = nftIDs.back();
@@ -803,8 +839,9 @@ public:
std::optional<ripple::AccountID> lastItem;
while (liveAccounts.size() < number) {
Statement const statement = lastItem ? schema_->selectAccountFromToken.bind(*lastItem, Limit{pageSize})
: schema_->selectAccountFromBeginning.bind(Limit{pageSize});
Statement const statement = lastItem
? schema_->selectAccountFromTokenScylla->bind(*lastItem, Limit{pageSize})
: schema_->selectAccountFromBeginningScylla->bind(Limit{pageSize});
auto const res = executor_.read(yield, statement);
if (res) {

View File

@@ -28,6 +28,7 @@
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <vector>
@@ -347,6 +348,71 @@ public:
Statements(SettingsProviderType const& settingsProvider, Handle const& handle)
: settingsProvider_{settingsProvider}, handle_{std::cref(handle)}
{
// initialize scylladb supported queries
if (settingsProvider_.get().getSettings().provider == "scylladb") {
selectAccountFromBeginningScylla = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > 0
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
selectAccountFromTokenScylla = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > token(?)
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
selectNFTsByIssuerScylla = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT token_id
FROM {}
WHERE issuer = ?
AND (taxon, token_id) > ?
ORDER BY taxon ASC, token_id ASC
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
)
);
}();
// AWS_keyspace supported queries
} else if (settingsProvider_.get().getSettings().provider == "aws_keyspace") {
selectNFTsAfterTaxonKeyspaces = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT token_id
FROM {}
WHERE issuer = ?
AND taxon > ?
ORDER BY taxon ASC, token_id ASC
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
)
);
}();
}
}
//
@@ -526,6 +592,17 @@ public:
// Update (and "delete") queries
//
PreparedStatement insertLedgerRange = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
INSERT INTO {} (is_latest, sequence) VALUES (?, ?) IF NOT EXISTS
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
)
);
}();
PreparedStatement updateLedgerRange = [this]() {
return handle_.get().prepare(
fmt::format(
@@ -533,7 +610,7 @@ public:
UPDATE {}
SET sequence = ?
WHERE is_latest = ?
IF sequence IN (?, null)
IF sequence = ?
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
)
@@ -654,6 +731,10 @@ public:
);
}();
/*
Currently, these two SELECT statements is not used.
If we ever use them, will need to change the PER PARTITION LIMIT to support for Keyspace
PreparedStatement selectLedgerPageKeys = [this]() {
return handle_.get().prepare(
fmt::format(
@@ -687,6 +768,7 @@ public:
)
);
}();
*/
PreparedStatement getToken = [this]() {
return handle_.get().prepare(
@@ -717,36 +799,6 @@ public:
);
}();
PreparedStatement selectAccountFromBeginning = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > 0
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
PreparedStatement selectAccountFromToken = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT account
FROM {}
WHERE token(account) > token(?)
PER PARTITION LIMIT 1
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "account_tx")
)
);
}();
PreparedStatement selectAccountTxForward = [this]() {
return handle_.get().prepare(
fmt::format(
@@ -827,22 +879,6 @@ public:
);
}();
PreparedStatement selectNFTIDsByIssuer = [this]() {
return handle_.get().prepare(
fmt::format(
R"(
SELECT token_id
FROM {}
WHERE issuer = ?
AND (taxon, token_id) > ?
ORDER BY taxon ASC, token_id ASC
LIMIT ?
)",
qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
)
);
}();
PreparedStatement selectNFTIDsByIssuerTaxon = [this]() {
return handle_.get().prepare(
fmt::format(
@@ -953,6 +989,15 @@ public:
)
);
}();
// For ScyllaDB / Cassandra ONLY
std::optional<PreparedStatement> selectAccountFromBeginningScylla;
std::optional<PreparedStatement> selectAccountFromTokenScylla;
std::optional<PreparedStatement> selectNFTsByIssuerScylla;
// For AWS Keyspaces ONLY
// NOTE: AWS keyspace is not able to load cache with accounts
std::optional<PreparedStatement> selectNFTsAfterTaxonKeyspaces;
};
/**

View File

@@ -97,6 +97,7 @@ SettingsProvider::parseSettings() const
settings.coreConnectionsPerHost = config_.get<uint32_t>("core_connections_per_host");
settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io");
settings.writeBatchSize = config_.get<std::size_t>("write_batch_size");
settings.provider = config_.get<std::string>("provider");
if (config_.getValueView("connect_timeout").hasValue()) {
auto const connectTimeoutSecond = config_.get<uint32_t>("connect_timeout");

View File

@@ -38,7 +38,7 @@ namespace data::cassandra::impl {
// TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases
Batch::Batch(std::vector<Statement> const& statements)
: ManagedObject{cass_batch_new(CASS_BATCH_TYPE_LOGGED), kBATCH_DELETER}
: ManagedObject{cass_batch_new(CASS_BATCH_TYPE_UNLOGGED), kBATCH_DELETER}
{
cass_batch_set_is_idempotent(*this, cass_true);

View File

@@ -60,6 +60,11 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), k
cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count());
cass_cluster_set_request_timeout(*this, settings.requestTimeout.count());
// TODO: AWS keyspace reads should be local_one to save cost
if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); rc != CASS_OK) {
throw std::runtime_error(fmt::format("Error setting cassandra consistency: {}", cass_error_desc(rc)));
}
if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost);
rc != CASS_OK) {
throw std::runtime_error(fmt::format("Could not set core connections per host: {}", cass_error_desc(rc)));

View File

@@ -45,6 +45,7 @@ struct Settings {
static constexpr uint32_t kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000;
static constexpr uint32_t kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000;
static constexpr std::size_t kDEFAULT_BATCH_SIZE = 20;
static constexpr std::string kDEFAULT_PROVIDER = "scylladb";
/**
* @brief Represents the configuration of contact points for cassandra.
@@ -88,6 +89,9 @@ struct Settings {
/** @brief Size of batches when writing */
std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE;
/** @brief Provider to know if we are using scylladb or keyspace */
std::string provider = kDEFAULT_PROVIDER;
/** @brief Size of the IO queue */
std::optional<uint32_t> queueSizeIO = std::nullopt; // NOLINT(readability-redundant-member-init)

View File

@@ -58,14 +58,14 @@ public:
explicit Statement(std::string_view query, Args&&... args)
: ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), kDELETER}
{
cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM);
cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM);
cass_statement_set_is_idempotent(*this, cass_true);
bind<Args...>(std::forward<Args>(args)...);
}
/* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, kDELETER}
{
cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM);
cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM);
cass_statement_set_is_idempotent(*this, cass_true);
}

View File

@@ -84,6 +84,11 @@ static constexpr std::array<char const*, 1> kDATABASE_TYPE = {"cassandra"};
*/
static constexpr std::array<char const*, 2> kPROCESSING_POLICY = {"parallel", "sequent"};
/**
* @brief specific values that are accepted for database provider in config.
*/
static constexpr std::array<char const*, 2> kPROVIDER = {"scylladb", "aws_keyspace"};
/**
* @brief An interface to enforce constraints on certain values within ClioConfigDefinition.
*/
@@ -470,6 +475,7 @@ static constinit OneOf gValidateCassandraName{"database.type", kDATABASE_TYPE};
static constinit OneOf gValidateLoadMode{"cache.load", kLOAD_CACHE_MODE};
static constinit OneOf gValidateLogTag{"log_tag_style", kLOG_TAGS};
static constinit OneOf gValidateProcessingPolicy{"server.processing_policy", kPROCESSING_POLICY};
static constinit OneOf gValidateProvider{"database.cassandra.provider", kPROVIDER};
static constinit PositiveDouble gValidatePositiveDouble{};

View File

@@ -279,6 +279,8 @@ getClioConfig()
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.provider",
ConfigValue{ConfigType::String}.defaultValue("scylladb").withConstraint(gValidateProvider)},
{"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
{"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},

View File

@@ -166,6 +166,9 @@ private:
"Maximum number of outstanding read requests. Read requests are API calls that read from the database."},
KV{.key = "database.cassandra.threads",
.value = "Represents the number of threads that will be used for database operations."},
KV{.key = "database.cassandra.provider",
.value = "The specific database backend provider we are using. Currently we only support scylladb, or "
"aws_keyspace."},
KV{.key = "database.cassandra.core_connections_per_host",
.value = "The number of core connections per host for the Cassandra database."},
KV{.key = "database.cassandra.queue_size_io",

View File

@@ -59,6 +59,7 @@ getParseSettingsConfig(boost::json::value val)
{"database.cassandra.certificate", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("scylladb")},
{"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional()},
{"database.cassandra.write_batch_size", ConfigValue{ConfigType::Integer}.defaultValue(20)},
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.optional()},