From c3e04426d35c3d03571769302fbd382c589fc6fc Mon Sep 17 00:00:00 2001 From: Peter Chen Date: Tue, 19 Aug 2025 16:19:44 -0400 Subject: [PATCH] fix bug and comments --- src/data/CassandraBackend.hpp | 246 ++++++++++-------- src/data/cassandra/impl/Batch.cpp | 11 +- .../integration/data/BackendFactoryTests.cpp | 1 + .../data/cassandra/BackendTests.cpp | 1 + .../CassandraMigrationManagerTests.cpp | 1 + 5 files changed, 154 insertions(+), 106 deletions(-) diff --git a/src/data/CassandraBackend.hpp b/src/data/CassandraBackend.hpp index 01f19d8d3..434516f3d 100644 --- a/src/data/CassandraBackend.hpp +++ b/src/data/CassandraBackend.hpp @@ -225,9 +225,11 @@ public: { waitForWritesToFinish(); - // only run if require to write in first ledger + // !range means the table 'ledger_range' is not populated it; This would be first write to the table + // In this case, insert both min_sequence/max_sequence range into the table if (!range_) { executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_); + executor_.writeSync(schema_->insertLedgerRange, true, ledgerSequence_); } if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) { @@ -514,115 +516,16 @@ public: boost::asio::yield_context yield ) const override { - NFTsAndCursor ret; std::vector nftIDs; - // --- A specific taxon is requested --- if (taxon.has_value()) { - Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); - statement.bindAt(1, *taxon); - statement.bindAt(2, cursorIn.value_or(ripple::uint256(0))); - statement.bindAt(3, Limit{limit}); - - auto const res = executor_.read(yield, statement); - if (res && res.value().hasRows()) { - for (auto const [nftID] : extract(res.value())) - nftIDs.push_back(nftID); - } - } - // --- No taxon is specified (general pagination) --- - else { - if (settingsProvider_.getSettings().provider == "aws_keyspace") { - // --- Amazon Keyspaces Workflow --- - auto const startTaxon = - cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0; - auto const startTokenID = cursorIn.value_or(ripple::uint256(0)); - - // Execute the first query to try and fill the page from the current taxon. - Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); - firstQuery.bindAt(1, startTaxon); - firstQuery.bindAt(2, startTokenID); - firstQuery.bindAt(3, Limit{limit}); - - auto const firstRes = executor_.read(yield, firstQuery); - if (firstRes) { - for (auto const [nftID] : extract(firstRes.value())) - nftIDs.push_back(nftID); - } - - // If the page is not full, execute the second query to get the rest. - if (nftIDs.size() < limit) { - auto const remainingLimit = limit - nftIDs.size(); - Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces->bind(issuer); - secondQuery.bindAt(1, startTaxon); - secondQuery.bindAt(2, Limit{remainingLimit}); - - auto const secondRes = executor_.read(yield, secondQuery); - if (secondRes) { - for (auto const [nftID] : extract(secondRes.value())) - nftIDs.push_back(nftID); - } - } - - } else if (settingsProvider_.getSettings().provider == "scylladb") { - auto r = schema_->selectNFTsByIssuerScylla->bind(issuer); - r.bindAt( - 1, - std::make_tuple( - cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0, - cursorIn.value_or(ripple::uint256(0)) - ) - ); - r.bindAt(2, Limit{limit}); - - auto const res = executor_.read(yield, r); - if (res && res.value().hasRows()) { - for (auto const [nftID] : extract(res.value())) - nftIDs.push_back(nftID); - } - } + nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield); + } else { + // --- No taxon is specified (general pagination) --- + nftIDs = fetchNFTIDsWithoutTaxon(issuer, limit, cursorIn, yield); } - if (nftIDs.empty()) { - LOG(log_.debug()) << "No rows returned"; - return {}; - } - - if (nftIDs.size() == limit) - ret.cursor = nftIDs.back(); - - std::vector selectNFTStatements; - selectNFTStatements.reserve(nftIDs.size()); - - std::transform( - std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) { - return schema_->selectNFT.bind(nftID, ledgerSequence); - } - ); - - auto const nftInfos = executor_.readEach(yield, selectNFTStatements); - - std::vector selectNFTURIStatements; - selectNFTURIStatements.reserve(nftIDs.size()); - - std::transform( - std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) { - return schema_->selectNFTURI.bind(nftID, ledgerSequence); - } - ); - - auto const nftUris = executor_.readEach(yield, selectNFTURIStatements); - - for (auto i = 0u; i < nftIDs.size(); i++) { - if (auto const maybeRow = nftInfos[i].template get(); maybeRow) { - auto [seq, owner, isBurned] = *maybeRow; - NFT nft(nftIDs[i], seq, owner, isBurned); - if (auto const maybeUri = nftUris[i].template get(); maybeUri) - nft.uri = *maybeUri; - ret.nfts.push_back(nft); - } - } - return ret; + return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield); } MPTHoldersAndCursor @@ -1153,6 +1056,139 @@ private: return true; } + + std::vector + fetchNFTIDsByTaxon( + ripple::AccountID const& issuer, + std::uint32_t const taxon, + std::uint32_t const limit, + std::optional const& cursorIn, + boost::asio::yield_context yield + ) const + { + std::vector nftIDs; + Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); + statement.bindAt(1, taxon); + statement.bindAt(2, cursorIn.value_or(ripple::uint256(0))); + statement.bindAt(3, Limit{limit}); + + auto const res = executor_.read(yield, statement); + if (res && res.value().hasRows()) { + for (auto const [nftID] : extract(res.value())) + nftIDs.push_back(nftID); + } + return nftIDs; + } + + std::vector + fetchNFTIDsWithoutTaxon( + ripple::AccountID const& issuer, + std::uint32_t const limit, + std::optional const& cursorIn, + boost::asio::yield_context yield + ) const + { + std::vector nftIDs; + if (settingsProvider_.getSettings().provider == "aws_keyspace") { + // --- Amazon Keyspaces Workflow --- + auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0; + auto const startTokenID = cursorIn.value_or(ripple::uint256(0)); + + Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer); + firstQuery.bindAt(1, startTaxon); + firstQuery.bindAt(2, startTokenID); + firstQuery.bindAt(3, Limit{limit}); + + auto const firstRes = executor_.read(yield, firstQuery); + if (firstRes) { + for (auto const [nftID] : extract(firstRes.value())) + nftIDs.push_back(nftID); + } + + if (nftIDs.size() < limit) { + auto const remainingLimit = limit - nftIDs.size(); + Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces->bind(issuer); + secondQuery.bindAt(1, startTaxon); + secondQuery.bindAt(2, Limit{remainingLimit}); + + auto const secondRes = executor_.read(yield, secondQuery); + if (secondRes) { + for (auto const [nftID] : extract(secondRes.value())) + nftIDs.push_back(nftID); + } + } + } else if (settingsProvider_.getSettings().provider == "scylladb") { + auto r = schema_->selectNFTsByIssuerScylla->bind(issuer); + r.bindAt( + 1, + std::make_tuple( + cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0, + cursorIn.value_or(ripple::uint256(0)) + ) + ); + r.bindAt(2, Limit{limit}); + + auto const res = executor_.read(yield, r); + if (res && res.value().hasRows()) { + for (auto const [nftID] : extract(res.value())) + nftIDs.push_back(nftID); + } + } + return nftIDs; + } + + /** + * @brief Takes a list of NFT IDs, fetches their full data, and assembles the final result with a cursor. + */ + NFTsAndCursor + populateNFTsAndCreateCursor( + std::vector const& nftIDs, + std::uint32_t const ledgerSequence, + std::uint32_t const limit, + boost::asio::yield_context yield + ) const + { + if (nftIDs.empty()) { + LOG(log_.debug()) << "No rows returned"; + return {}; + } + + NFTsAndCursor ret; + if (nftIDs.size() == limit) + ret.cursor = nftIDs.back(); + + // Prepare and execute queries to fetch NFT info and URIs in parallel. + std::vector selectNFTStatements; + selectNFTStatements.reserve(nftIDs.size()); + std::transform( + std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) { + return schema_->selectNFT.bind(nftID, ledgerSequence); + } + ); + + std::vector selectNFTURIStatements; + selectNFTURIStatements.reserve(nftIDs.size()); + std::transform( + std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) { + return schema_->selectNFTURI.bind(nftID, ledgerSequence); + } + ); + + auto const nftInfos = executor_.readEach(yield, selectNFTStatements); + auto const nftUris = executor_.readEach(yield, selectNFTURIStatements); + + // Combine the results into final NFT objects. + for (auto i = 0u; i < nftIDs.size(); ++i) { + if (auto const maybeRow = nftInfos[i].template get(); maybeRow) { + auto [seq, owner, isBurned] = *maybeRow; + NFT nft(nftIDs[i], seq, owner, isBurned); + if (auto const maybeUri = nftUris[i].template get(); maybeUri) + nft.uri = *maybeUri; + ret.nfts.push_back(nft); + } + } + return ret; + } }; using CassandraBackend = BasicCassandraBackend>; diff --git a/src/data/cassandra/impl/Batch.cpp b/src/data/cassandra/impl/Batch.cpp index 71d9d2da5..48841a278 100644 --- a/src/data/cassandra/impl/Batch.cpp +++ b/src/data/cassandra/impl/Batch.cpp @@ -36,7 +36,16 @@ constexpr auto kBATCH_DELETER = [](CassBatch* ptr) { cass_batch_free(ptr); }; namespace data::cassandra::impl { -// TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases +/* + * There are 2 main batches of Cassandra Statements: + * LOGGED: Ensures all updates in the batch succeed together, or none do. + * Use this for critical, related changes (e.g., for the same user), but it is slower. + * + * UNLOGGED: For performance. Sends many separate updates in one network trip to be fast. + * Use this for bulk-loading unrelated data, but know there's NO all-or-nothing guarantee. + * + * More info here: https://docs.datastax.com/en/developer/cpp-driver-dse/1.10/features/basics/batches/index.html + */ Batch::Batch(std::vector const& statements) : ManagedObject{cass_batch_new(CASS_BATCH_TYPE_UNLOGGED), kBATCH_DELETER} { diff --git a/tests/integration/data/BackendFactoryTests.cpp b/tests/integration/data/BackendFactoryTests.cpp index ab97cd6d5..cfe2371a2 100644 --- a/tests/integration/data/BackendFactoryTests.cpp +++ b/tests/integration/data/BackendFactoryTests.cpp @@ -53,6 +53,7 @@ protected: {"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(kKEYSPACE)}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("scylladb")}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)}, diff --git a/tests/integration/data/cassandra/BackendTests.cpp b/tests/integration/data/cassandra/BackendTests.cpp index cd79da215..1cf916b62 100644 --- a/tests/integration/data/cassandra/BackendTests.cpp +++ b/tests/integration/data/cassandra/BackendTests.cpp @@ -93,6 +93,7 @@ protected: {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()}, {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("scylladb")}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()}, {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)}, diff --git a/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp b/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp index 393b4249c..3321bc22d 100644 --- a/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp +++ b/tests/integration/migration/cassandra/CassandraMigrationManagerTests.cpp @@ -103,6 +103,7 @@ protected: ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)}, {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)}, + {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("scylladb")}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)}, {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)},