fix bug and comments

This commit is contained in:
Peter Chen
2025-08-19 16:19:44 -04:00
parent d598396445
commit c3e04426d3
5 changed files with 154 additions and 106 deletions

View File

@@ -225,9 +225,11 @@ public:
{
waitForWritesToFinish();
// only run if require to write in first ledger
// !range means the table 'ledger_range' is not populated it; This would be first write to the table
// In this case, insert both min_sequence/max_sequence range into the table
if (!range_) {
executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_);
executor_.writeSync(schema_->insertLedgerRange, true, ledgerSequence_);
}
if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
@@ -514,115 +516,16 @@ public:
boost::asio::yield_context yield
) const override
{
NFTsAndCursor ret;
std::vector<ripple::uint256> nftIDs;
// --- A specific taxon is requested ---
if (taxon.has_value()) {
Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
statement.bindAt(1, *taxon);
statement.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
statement.bindAt(3, Limit{limit});
auto const res = executor_.read(yield, statement);
if (res && res.value().hasRows()) {
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
nftIDs.push_back(nftID);
}
}
// --- No taxon is specified (general pagination) ---
else {
if (settingsProvider_.getSettings().provider == "aws_keyspace") {
// --- Amazon Keyspaces Workflow ---
auto const startTaxon =
cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
// Execute the first query to try and fill the page from the current taxon.
Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
firstQuery.bindAt(1, startTaxon);
firstQuery.bindAt(2, startTokenID);
firstQuery.bindAt(3, Limit{limit});
auto const firstRes = executor_.read(yield, firstQuery);
if (firstRes) {
for (auto const [nftID] : extract<ripple::uint256>(firstRes.value()))
nftIDs.push_back(nftID);
}
// If the page is not full, execute the second query to get the rest.
if (nftIDs.size() < limit) {
auto const remainingLimit = limit - nftIDs.size();
Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces->bind(issuer);
secondQuery.bindAt(1, startTaxon);
secondQuery.bindAt(2, Limit{remainingLimit});
auto const secondRes = executor_.read(yield, secondQuery);
if (secondRes) {
for (auto const [nftID] : extract<ripple::uint256>(secondRes.value()))
nftIDs.push_back(nftID);
}
}
} else if (settingsProvider_.getSettings().provider == "scylladb") {
auto r = schema_->selectNFTsByIssuerScylla->bind(issuer);
r.bindAt(
1,
std::make_tuple(
cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
cursorIn.value_or(ripple::uint256(0))
)
);
r.bindAt(2, Limit{limit});
auto const res = executor_.read(yield, r);
if (res && res.value().hasRows()) {
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
nftIDs.push_back(nftID);
}
}
nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield);
} else {
// --- No taxon is specified (general pagination) ---
nftIDs = fetchNFTIDsWithoutTaxon(issuer, limit, cursorIn, yield);
}
if (nftIDs.empty()) {
LOG(log_.debug()) << "No rows returned";
return {};
}
if (nftIDs.size() == limit)
ret.cursor = nftIDs.back();
std::vector<Statement> selectNFTStatements;
selectNFTStatements.reserve(nftIDs.size());
std::transform(
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
return schema_->selectNFT.bind(nftID, ledgerSequence);
}
);
auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
std::vector<Statement> selectNFTURIStatements;
selectNFTURIStatements.reserve(nftIDs.size());
std::transform(
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
return schema_->selectNFTURI.bind(nftID, ledgerSequence);
}
);
auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
for (auto i = 0u; i < nftIDs.size(); i++) {
if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
auto [seq, owner, isBurned] = *maybeRow;
NFT nft(nftIDs[i], seq, owner, isBurned);
if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
nft.uri = *maybeUri;
ret.nfts.push_back(nft);
}
}
return ret;
return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield);
}
MPTHoldersAndCursor
@@ -1153,6 +1056,139 @@ private:
return true;
}
std::vector<ripple::uint256>
fetchNFTIDsByTaxon(
ripple::AccountID const& issuer,
std::uint32_t const taxon,
std::uint32_t const limit,
std::optional<ripple::uint256> const& cursorIn,
boost::asio::yield_context yield
) const
{
std::vector<ripple::uint256> nftIDs;
Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
statement.bindAt(1, taxon);
statement.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
statement.bindAt(3, Limit{limit});
auto const res = executor_.read(yield, statement);
if (res && res.value().hasRows()) {
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
nftIDs.push_back(nftID);
}
return nftIDs;
}
std::vector<ripple::uint256>
fetchNFTIDsWithoutTaxon(
ripple::AccountID const& issuer,
std::uint32_t const limit,
std::optional<ripple::uint256> const& cursorIn,
boost::asio::yield_context yield
) const
{
std::vector<ripple::uint256> nftIDs;
if (settingsProvider_.getSettings().provider == "aws_keyspace") {
// --- Amazon Keyspaces Workflow ---
auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
firstQuery.bindAt(1, startTaxon);
firstQuery.bindAt(2, startTokenID);
firstQuery.bindAt(3, Limit{limit});
auto const firstRes = executor_.read(yield, firstQuery);
if (firstRes) {
for (auto const [nftID] : extract<ripple::uint256>(firstRes.value()))
nftIDs.push_back(nftID);
}
if (nftIDs.size() < limit) {
auto const remainingLimit = limit - nftIDs.size();
Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces->bind(issuer);
secondQuery.bindAt(1, startTaxon);
secondQuery.bindAt(2, Limit{remainingLimit});
auto const secondRes = executor_.read(yield, secondQuery);
if (secondRes) {
for (auto const [nftID] : extract<ripple::uint256>(secondRes.value()))
nftIDs.push_back(nftID);
}
}
} else if (settingsProvider_.getSettings().provider == "scylladb") {
auto r = schema_->selectNFTsByIssuerScylla->bind(issuer);
r.bindAt(
1,
std::make_tuple(
cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
cursorIn.value_or(ripple::uint256(0))
)
);
r.bindAt(2, Limit{limit});
auto const res = executor_.read(yield, r);
if (res && res.value().hasRows()) {
for (auto const [nftID] : extract<ripple::uint256>(res.value()))
nftIDs.push_back(nftID);
}
}
return nftIDs;
}
/**
* @brief Takes a list of NFT IDs, fetches their full data, and assembles the final result with a cursor.
*/
NFTsAndCursor
populateNFTsAndCreateCursor(
std::vector<ripple::uint256> const& nftIDs,
std::uint32_t const ledgerSequence,
std::uint32_t const limit,
boost::asio::yield_context yield
) const
{
if (nftIDs.empty()) {
LOG(log_.debug()) << "No rows returned";
return {};
}
NFTsAndCursor ret;
if (nftIDs.size() == limit)
ret.cursor = nftIDs.back();
// Prepare and execute queries to fetch NFT info and URIs in parallel.
std::vector<Statement> selectNFTStatements;
selectNFTStatements.reserve(nftIDs.size());
std::transform(
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
return schema_->selectNFT.bind(nftID, ledgerSequence);
}
);
std::vector<Statement> selectNFTURIStatements;
selectNFTURIStatements.reserve(nftIDs.size());
std::transform(
std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
return schema_->selectNFTURI.bind(nftID, ledgerSequence);
}
);
auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
// Combine the results into final NFT objects.
for (auto i = 0u; i < nftIDs.size(); ++i) {
if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
auto [seq, owner, isBurned] = *maybeRow;
NFT nft(nftIDs[i], seq, owner, isBurned);
if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
nft.uri = *maybeUri;
ret.nfts.push_back(nft);
}
}
return ret;
}
};
using CassandraBackend = BasicCassandraBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>;

View File

@@ -36,7 +36,16 @@ constexpr auto kBATCH_DELETER = [](CassBatch* ptr) { cass_batch_free(ptr); };
namespace data::cassandra::impl {
// TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases
/*
* There are 2 main batches of Cassandra Statements:
* LOGGED: Ensures all updates in the batch succeed together, or none do.
* Use this for critical, related changes (e.g., for the same user), but it is slower.
*
* UNLOGGED: For performance. Sends many separate updates in one network trip to be fast.
* Use this for bulk-loading unrelated data, but know there's NO all-or-nothing guarantee.
*
* More info here: https://docs.datastax.com/en/developer/cpp-driver-dse/1.10/features/basics/batches/index.html
*/
Batch::Batch(std::vector<Statement> const& statements)
: ManagedObject{cass_batch_new(CASS_BATCH_TYPE_UNLOGGED), kBATCH_DELETER}
{

View File

@@ -53,6 +53,7 @@ protected:
{"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
{"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(kKEYSPACE)},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("scylladb")},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},

View File

@@ -93,6 +93,7 @@ protected:
{"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
{"database.cassandra.keyspace",
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("scylladb")},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},

View File

@@ -103,6 +103,7 @@ protected:
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)},
{"database.cassandra.keyspace",
ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
{"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("scylladb")},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)},