mirror of
				https://github.com/XRPLF/clio.git
				synced 2025-11-04 11:55:51 +00:00 
			
		
		
		
	Compare commits
	
		
			16 Commits
		
	
	
		
			7e4e12385f
			...
			experiment
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					7eaf0005e4 | ||
| 
						 | 
					497721ee7c | ||
| 
						 | 
					26530108e3 | ||
| 
						 | 
					fc88abdaeb | ||
| 
						 | 
					8bc36c2c0b | ||
| 
						 | 
					4e9558f76b | ||
| 
						 | 
					84db880ce7 | ||
| 
						 | 
					f88ce31363 | ||
| 
						 | 
					e03f5e46c0 | ||
| 
						 | 
					30da8d8f63 | ||
| 
						 | 
					8f6bec2e25 | ||
| 
						 | 
					0d9a83fd4d | ||
| 
						 | 
					47c2af0421 | ||
| 
						 | 
					c3e04426d3 | ||
| 
						 | 
					d598396445 | ||
| 
						 | 
					bbd2884e3b | 
@@ -89,6 +89,14 @@ This document provides a list of all available Clio configuration properties in
 | 
			
		||||
- **Constraints**: The minimum value is `1`. The maximum value is `4294967295`.
 | 
			
		||||
- **Description**: Represents the number of threads that will be used for database operations.
 | 
			
		||||
 | 
			
		||||
### database.cassandra.provider
 | 
			
		||||
 | 
			
		||||
- **Required**: True
 | 
			
		||||
- **Type**: string
 | 
			
		||||
- **Default value**: `cassandra`
 | 
			
		||||
- **Constraints**: The value must be one of the following: `cassandra`, `aws_keyspace`.
 | 
			
		||||
- **Description**: The specific database backend provider we are using.
 | 
			
		||||
 | 
			
		||||
### database.cassandra.core_connections_per_host
 | 
			
		||||
 | 
			
		||||
- **Required**: True
 | 
			
		||||
 
 | 
			
		||||
@@ -225,8 +225,11 @@ public:
 | 
			
		||||
    {
 | 
			
		||||
        waitForWritesToFinish();
 | 
			
		||||
 | 
			
		||||
        if (!range_) {
 | 
			
		||||
            executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_);
 | 
			
		||||
        // !range_.has_value() means the table 'ledger_range' is not populated; This would be the first write to the
 | 
			
		||||
        // table In this case, insert both min_sequence/max_sequence range into the table
 | 
			
		||||
        if (!range_.has_value()) {
 | 
			
		||||
            executor_.writeSync(schema_->insertLedgerRange, false, ledgerSequence_);
 | 
			
		||||
            executor_.writeSync(schema_->insertLedgerRange, true, ledgerSequence_);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (not executeSyncUpdate(schema_->updateLedgerRange.bind(ledgerSequence_, true, ledgerSequence_ - 1))) {
 | 
			
		||||
@@ -513,80 +516,14 @@ public:
 | 
			
		||||
        boost::asio::yield_context yield
 | 
			
		||||
    ) const override
 | 
			
		||||
    {
 | 
			
		||||
        NFTsAndCursor ret;
 | 
			
		||||
 | 
			
		||||
        Statement const idQueryStatement = [&taxon, &issuer, &cursorIn, &limit, this]() {
 | 
			
		||||
            if (taxon.has_value()) {
 | 
			
		||||
                auto r = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
 | 
			
		||||
                r.bindAt(1, *taxon);
 | 
			
		||||
                r.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
 | 
			
		||||
                r.bindAt(3, Limit{limit});
 | 
			
		||||
                return r;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            auto r = schema_->selectNFTIDsByIssuer.bind(issuer);
 | 
			
		||||
            r.bindAt(
 | 
			
		||||
                1,
 | 
			
		||||
                std::make_tuple(
 | 
			
		||||
                    cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
 | 
			
		||||
                    cursorIn.value_or(ripple::uint256(0))
 | 
			
		||||
                )
 | 
			
		||||
            );
 | 
			
		||||
            r.bindAt(2, Limit{limit});
 | 
			
		||||
            return r;
 | 
			
		||||
        }();
 | 
			
		||||
 | 
			
		||||
        // Query for all the NFTs issued by the account, potentially filtered by the taxon
 | 
			
		||||
        auto const res = executor_.read(yield, idQueryStatement);
 | 
			
		||||
 | 
			
		||||
        auto const& idQueryResults = res.value();
 | 
			
		||||
        if (not idQueryResults.hasRows()) {
 | 
			
		||||
            LOG(log_.debug()) << "No rows returned";
 | 
			
		||||
            return {};
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        std::vector<ripple::uint256> nftIDs;
 | 
			
		||||
        for (auto const [nftID] : extract<ripple::uint256>(idQueryResults))
 | 
			
		||||
            nftIDs.push_back(nftID);
 | 
			
		||||
 | 
			
		||||
        if (nftIDs.empty())
 | 
			
		||||
            return ret;
 | 
			
		||||
 | 
			
		||||
        if (nftIDs.size() == limit)
 | 
			
		||||
            ret.cursor = nftIDs.back();
 | 
			
		||||
 | 
			
		||||
        std::vector<Statement> selectNFTStatements;
 | 
			
		||||
        selectNFTStatements.reserve(nftIDs.size());
 | 
			
		||||
 | 
			
		||||
        std::transform(
 | 
			
		||||
            std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
 | 
			
		||||
                return schema_->selectNFT.bind(nftID, ledgerSequence);
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
 | 
			
		||||
 | 
			
		||||
        std::vector<Statement> selectNFTURIStatements;
 | 
			
		||||
        selectNFTURIStatements.reserve(nftIDs.size());
 | 
			
		||||
 | 
			
		||||
        std::transform(
 | 
			
		||||
            std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
 | 
			
		||||
                return schema_->selectNFTURI.bind(nftID, ledgerSequence);
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
 | 
			
		||||
 | 
			
		||||
        for (auto i = 0u; i < nftIDs.size(); i++) {
 | 
			
		||||
            if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
 | 
			
		||||
                auto [seq, owner, isBurned] = *maybeRow;
 | 
			
		||||
                NFT nft(nftIDs[i], seq, owner, isBurned);
 | 
			
		||||
                if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
 | 
			
		||||
                    nft.uri = *maybeUri;
 | 
			
		||||
                ret.nfts.push_back(nft);
 | 
			
		||||
            }
 | 
			
		||||
        if (taxon.has_value()) {
 | 
			
		||||
            nftIDs = fetchNFTIDsByTaxon(issuer, *taxon, limit, cursorIn, yield);
 | 
			
		||||
        } else {
 | 
			
		||||
            nftIDs = fetchNFTIDsWithoutTaxon(issuer, limit, cursorIn, yield);
 | 
			
		||||
        }
 | 
			
		||||
        return ret;
 | 
			
		||||
 | 
			
		||||
        return populateNFTsAndCreateCursor(nftIDs, ledgerSequence, limit, yield);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    MPTHoldersAndCursor
 | 
			
		||||
@@ -803,8 +740,9 @@ public:
 | 
			
		||||
        std::optional<ripple::AccountID> lastItem;
 | 
			
		||||
 | 
			
		||||
        while (liveAccounts.size() < number) {
 | 
			
		||||
            Statement const statement = lastItem ? schema_->selectAccountFromToken.bind(*lastItem, Limit{pageSize})
 | 
			
		||||
                                                 : schema_->selectAccountFromBeginning.bind(Limit{pageSize});
 | 
			
		||||
            Statement const statement = lastItem
 | 
			
		||||
                ? schema_->selectAccountFromTokenScylla->bind(*lastItem, Limit{pageSize})
 | 
			
		||||
                : schema_->selectAccountFromBeginningScylla->bind(Limit{pageSize});
 | 
			
		||||
 | 
			
		||||
            auto const res = executor_.read(yield, statement);
 | 
			
		||||
            if (res) {
 | 
			
		||||
@@ -1116,6 +1054,139 @@ private:
 | 
			
		||||
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::vector<ripple::uint256>
 | 
			
		||||
    fetchNFTIDsByTaxon(
 | 
			
		||||
        ripple::AccountID const& issuer,
 | 
			
		||||
        std::uint32_t const taxon,
 | 
			
		||||
        std::uint32_t const limit,
 | 
			
		||||
        std::optional<ripple::uint256> const& cursorIn,
 | 
			
		||||
        boost::asio::yield_context yield
 | 
			
		||||
    ) const
 | 
			
		||||
    {
 | 
			
		||||
        std::vector<ripple::uint256> nftIDs;
 | 
			
		||||
        Statement statement = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
 | 
			
		||||
        statement.bindAt(1, taxon);
 | 
			
		||||
        statement.bindAt(2, cursorIn.value_or(ripple::uint256(0)));
 | 
			
		||||
        statement.bindAt(3, Limit{limit});
 | 
			
		||||
 | 
			
		||||
        auto const res = executor_.read(yield, statement);
 | 
			
		||||
        if (res && res.value().hasRows()) {
 | 
			
		||||
            for (auto const [nftID] : extract<ripple::uint256>(res.value()))
 | 
			
		||||
                nftIDs.push_back(nftID);
 | 
			
		||||
        }
 | 
			
		||||
        return nftIDs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::vector<ripple::uint256>
 | 
			
		||||
    fetchNFTIDsWithoutTaxon(
 | 
			
		||||
        ripple::AccountID const& issuer,
 | 
			
		||||
        std::uint32_t const limit,
 | 
			
		||||
        std::optional<ripple::uint256> const& cursorIn,
 | 
			
		||||
        boost::asio::yield_context yield
 | 
			
		||||
    ) const
 | 
			
		||||
    {
 | 
			
		||||
        std::vector<ripple::uint256> nftIDs;
 | 
			
		||||
        if (settingsProvider_.getSettings().provider == "aws_keyspace") {
 | 
			
		||||
            // --- Amazon Keyspaces Workflow ---
 | 
			
		||||
            auto const startTaxon = cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0;
 | 
			
		||||
            auto const startTokenID = cursorIn.value_or(ripple::uint256(0));
 | 
			
		||||
 | 
			
		||||
            Statement firstQuery = schema_->selectNFTIDsByIssuerTaxon.bind(issuer);
 | 
			
		||||
            firstQuery.bindAt(1, startTaxon);
 | 
			
		||||
            firstQuery.bindAt(2, startTokenID);
 | 
			
		||||
            firstQuery.bindAt(3, Limit{limit});
 | 
			
		||||
 | 
			
		||||
            auto const firstRes = executor_.read(yield, firstQuery);
 | 
			
		||||
            if (firstRes) {
 | 
			
		||||
                for (auto const [nftID] : extract<ripple::uint256>(firstRes.value()))
 | 
			
		||||
                    nftIDs.push_back(nftID);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (nftIDs.size() < limit) {
 | 
			
		||||
                auto const remainingLimit = limit - nftIDs.size();
 | 
			
		||||
                Statement secondQuery = schema_->selectNFTsAfterTaxonKeyspaces->bind(issuer);
 | 
			
		||||
                secondQuery.bindAt(1, startTaxon);
 | 
			
		||||
                secondQuery.bindAt(2, Limit{remainingLimit});
 | 
			
		||||
 | 
			
		||||
                auto const secondRes = executor_.read(yield, secondQuery);
 | 
			
		||||
                if (secondRes) {
 | 
			
		||||
                    for (auto const [nftID] : extract<ripple::uint256>(secondRes.value()))
 | 
			
		||||
                        nftIDs.push_back(nftID);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } else if (settingsProvider_.getSettings().provider == "scylladb") {
 | 
			
		||||
            auto r = schema_->selectNFTsByIssuerScylla->bind(issuer);
 | 
			
		||||
            r.bindAt(
 | 
			
		||||
                1,
 | 
			
		||||
                std::make_tuple(
 | 
			
		||||
                    cursorIn.has_value() ? ripple::nft::toUInt32(ripple::nft::getTaxon(*cursorIn)) : 0,
 | 
			
		||||
                    cursorIn.value_or(ripple::uint256(0))
 | 
			
		||||
                )
 | 
			
		||||
            );
 | 
			
		||||
            r.bindAt(2, Limit{limit});
 | 
			
		||||
 | 
			
		||||
            auto const res = executor_.read(yield, r);
 | 
			
		||||
            if (res && res.value().hasRows()) {
 | 
			
		||||
                for (auto const [nftID] : extract<ripple::uint256>(res.value()))
 | 
			
		||||
                    nftIDs.push_back(nftID);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return nftIDs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Takes a list of NFT IDs, fetches their full data, and assembles the final result with a cursor.
 | 
			
		||||
     */
 | 
			
		||||
    NFTsAndCursor
 | 
			
		||||
    populateNFTsAndCreateCursor(
 | 
			
		||||
        std::vector<ripple::uint256> const& nftIDs,
 | 
			
		||||
        std::uint32_t const ledgerSequence,
 | 
			
		||||
        std::uint32_t const limit,
 | 
			
		||||
        boost::asio::yield_context yield
 | 
			
		||||
    ) const
 | 
			
		||||
    {
 | 
			
		||||
        if (nftIDs.empty()) {
 | 
			
		||||
            LOG(log_.debug()) << "No rows returned";
 | 
			
		||||
            return {};
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        NFTsAndCursor ret;
 | 
			
		||||
        if (nftIDs.size() == limit)
 | 
			
		||||
            ret.cursor = nftIDs.back();
 | 
			
		||||
 | 
			
		||||
        // Prepare and execute queries to fetch NFT info and URIs in parallel.
 | 
			
		||||
        std::vector<Statement> selectNFTStatements;
 | 
			
		||||
        selectNFTStatements.reserve(nftIDs.size());
 | 
			
		||||
        std::transform(
 | 
			
		||||
            std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTStatements), [&](auto const& nftID) {
 | 
			
		||||
                return schema_->selectNFT.bind(nftID, ledgerSequence);
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        std::vector<Statement> selectNFTURIStatements;
 | 
			
		||||
        selectNFTURIStatements.reserve(nftIDs.size());
 | 
			
		||||
        std::transform(
 | 
			
		||||
            std::cbegin(nftIDs), std::cend(nftIDs), std::back_inserter(selectNFTURIStatements), [&](auto const& nftID) {
 | 
			
		||||
                return schema_->selectNFTURI.bind(nftID, ledgerSequence);
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        auto const nftInfos = executor_.readEach(yield, selectNFTStatements);
 | 
			
		||||
        auto const nftUris = executor_.readEach(yield, selectNFTURIStatements);
 | 
			
		||||
 | 
			
		||||
        // Combine the results into final NFT objects.
 | 
			
		||||
        for (auto i = 0u; i < nftIDs.size(); ++i) {
 | 
			
		||||
            if (auto const maybeRow = nftInfos[i].template get<uint32_t, ripple::AccountID, bool>(); maybeRow) {
 | 
			
		||||
                auto [seq, owner, isBurned] = *maybeRow;
 | 
			
		||||
                NFT nft(nftIDs[i], seq, owner, isBurned);
 | 
			
		||||
                if (auto const maybeUri = nftUris[i].template get<ripple::Blob>(); maybeUri)
 | 
			
		||||
                    nft.uri = *maybeUri;
 | 
			
		||||
                ret.nfts.push_back(nft);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return ret;
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
using CassandraBackend = BasicCassandraBackend<SettingsProvider, impl::DefaultExecutionStrategy<>>;
 | 
			
		||||
 
 | 
			
		||||
@@ -28,6 +28,7 @@
 | 
			
		||||
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <optional>
 | 
			
		||||
#include <string>
 | 
			
		||||
#include <string_view>
 | 
			
		||||
#include <vector>
 | 
			
		||||
@@ -347,6 +348,86 @@ public:
 | 
			
		||||
        Statements(SettingsProviderType const& settingsProvider, Handle const& handle)
 | 
			
		||||
            : settingsProvider_{settingsProvider}, handle_{std::cref(handle)}
 | 
			
		||||
        {
 | 
			
		||||
            // initialize scylladb supported queries
 | 
			
		||||
            if (settingsProvider_.get().getSettings().provider == "scylladb") {
 | 
			
		||||
                selectAccountFromBeginningScylla = [this]() {
 | 
			
		||||
                    return handle_.get().prepare(
 | 
			
		||||
                        fmt::format(
 | 
			
		||||
                            R"(
 | 
			
		||||
                        SELECT account
 | 
			
		||||
                          FROM {}
 | 
			
		||||
                         WHERE token(account) > 0
 | 
			
		||||
                           PER PARTITION LIMIT 1
 | 
			
		||||
                         LIMIT ?
 | 
			
		||||
                        )",
 | 
			
		||||
                            qualifiedTableName(settingsProvider_.get(), "account_tx")
 | 
			
		||||
                        )
 | 
			
		||||
                    );
 | 
			
		||||
                }();
 | 
			
		||||
 | 
			
		||||
                selectAccountFromTokenScylla = [this]() {
 | 
			
		||||
                    return handle_.get().prepare(
 | 
			
		||||
                        fmt::format(
 | 
			
		||||
                            R"(
 | 
			
		||||
                        SELECT account
 | 
			
		||||
                          FROM {}
 | 
			
		||||
                         WHERE token(account) > token(?)
 | 
			
		||||
                           PER PARTITION LIMIT 1
 | 
			
		||||
                         LIMIT ?
 | 
			
		||||
                        )",
 | 
			
		||||
                            qualifiedTableName(settingsProvider_.get(), "account_tx")
 | 
			
		||||
                        )
 | 
			
		||||
                    );
 | 
			
		||||
                }();
 | 
			
		||||
 | 
			
		||||
                selectNFTsByIssuerScylla = [this]() {
 | 
			
		||||
                    return handle_.get().prepare(
 | 
			
		||||
                        fmt::format(
 | 
			
		||||
                            R"(
 | 
			
		||||
                        SELECT token_id
 | 
			
		||||
                          FROM {}
 | 
			
		||||
                         WHERE issuer = ?
 | 
			
		||||
                           AND (taxon, token_id) > ?
 | 
			
		||||
                        ORDER BY taxon ASC, token_id ASC
 | 
			
		||||
                         LIMIT ?
 | 
			
		||||
                        )",
 | 
			
		||||
                            qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
 | 
			
		||||
                        )
 | 
			
		||||
                    );
 | 
			
		||||
                }();
 | 
			
		||||
 | 
			
		||||
                updateLedgerRange = [this]() {
 | 
			
		||||
                    return handle_.get().prepare(
 | 
			
		||||
                        fmt::format(
 | 
			
		||||
                            R"(
 | 
			
		||||
                UPDATE {}
 | 
			
		||||
                   SET sequence = ?
 | 
			
		||||
                 WHERE is_latest = ?
 | 
			
		||||
                    IF sequence IN (?, null)
 | 
			
		||||
                    )",
 | 
			
		||||
                            qualifiedTableName(settingsProvider_.get(), "ledger_range")
 | 
			
		||||
                        )
 | 
			
		||||
                    );
 | 
			
		||||
                }();
 | 
			
		||||
 | 
			
		||||
                // AWS_keyspace supported queries
 | 
			
		||||
            } else if (settingsProvider_.get().getSettings().provider == "aws_keyspace") {
 | 
			
		||||
                selectNFTsAfterTaxonKeyspaces = [this]() {
 | 
			
		||||
                    return handle_.get().prepare(
 | 
			
		||||
                        fmt::format(
 | 
			
		||||
                            R"(
 | 
			
		||||
                            SELECT token_id
 | 
			
		||||
                              FROM {}
 | 
			
		||||
                             WHERE issuer = ?
 | 
			
		||||
                               AND taxon > ?
 | 
			
		||||
                          ORDER BY taxon ASC, token_id ASC
 | 
			
		||||
                             LIMIT ?
 | 
			
		||||
                            )",
 | 
			
		||||
                            qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
 | 
			
		||||
                        )
 | 
			
		||||
                    );
 | 
			
		||||
                }();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        //
 | 
			
		||||
@@ -526,6 +607,17 @@ public:
 | 
			
		||||
        // Update (and "delete") queries
 | 
			
		||||
        //
 | 
			
		||||
 | 
			
		||||
        PreparedStatement insertLedgerRange = [this]() {
 | 
			
		||||
            return handle_.get().prepare(
 | 
			
		||||
                fmt::format(
 | 
			
		||||
                    R"(
 | 
			
		||||
                    INSERT INTO {} (is_latest, sequence) VALUES (?, ?) IF NOT EXISTS
 | 
			
		||||
                    )",
 | 
			
		||||
                    qualifiedTableName(settingsProvider_.get(), "ledger_range")
 | 
			
		||||
                )
 | 
			
		||||
            );
 | 
			
		||||
        }();
 | 
			
		||||
 | 
			
		||||
        PreparedStatement updateLedgerRange = [this]() {
 | 
			
		||||
            return handle_.get().prepare(
 | 
			
		||||
                fmt::format(
 | 
			
		||||
@@ -533,7 +625,7 @@ public:
 | 
			
		||||
                UPDATE {}
 | 
			
		||||
                   SET sequence = ?
 | 
			
		||||
                 WHERE is_latest = ?
 | 
			
		||||
                    IF sequence IN (?, null)
 | 
			
		||||
                    IF sequence = ?
 | 
			
		||||
                )",
 | 
			
		||||
                    qualifiedTableName(settingsProvider_.get(), "ledger_range")
 | 
			
		||||
                )
 | 
			
		||||
@@ -654,6 +746,10 @@ public:
 | 
			
		||||
            );
 | 
			
		||||
        }();
 | 
			
		||||
 | 
			
		||||
        /*
 | 
			
		||||
        Currently, these two SELECT statements is not used.
 | 
			
		||||
        If we ever use them, will need to change the PER PARTITION LIMIT to support for Keyspace
 | 
			
		||||
 | 
			
		||||
        PreparedStatement selectLedgerPageKeys = [this]() {
 | 
			
		||||
            return handle_.get().prepare(
 | 
			
		||||
                fmt::format(
 | 
			
		||||
@@ -687,6 +783,7 @@ public:
 | 
			
		||||
                )
 | 
			
		||||
            );
 | 
			
		||||
        }();
 | 
			
		||||
        */
 | 
			
		||||
 | 
			
		||||
        PreparedStatement getToken = [this]() {
 | 
			
		||||
            return handle_.get().prepare(
 | 
			
		||||
@@ -717,36 +814,6 @@ public:
 | 
			
		||||
            );
 | 
			
		||||
        }();
 | 
			
		||||
 | 
			
		||||
        PreparedStatement selectAccountFromBeginning = [this]() {
 | 
			
		||||
            return handle_.get().prepare(
 | 
			
		||||
                fmt::format(
 | 
			
		||||
                    R"(
 | 
			
		||||
                SELECT account
 | 
			
		||||
                  FROM {}
 | 
			
		||||
                 WHERE token(account) > 0
 | 
			
		||||
                   PER PARTITION LIMIT 1
 | 
			
		||||
                 LIMIT ?
 | 
			
		||||
                )",
 | 
			
		||||
                    qualifiedTableName(settingsProvider_.get(), "account_tx")
 | 
			
		||||
                )
 | 
			
		||||
            );
 | 
			
		||||
        }();
 | 
			
		||||
 | 
			
		||||
        PreparedStatement selectAccountFromToken = [this]() {
 | 
			
		||||
            return handle_.get().prepare(
 | 
			
		||||
                fmt::format(
 | 
			
		||||
                    R"(
 | 
			
		||||
                SELECT account
 | 
			
		||||
                  FROM {}
 | 
			
		||||
                 WHERE token(account) > token(?)
 | 
			
		||||
                   PER PARTITION LIMIT 1
 | 
			
		||||
                 LIMIT ?
 | 
			
		||||
                )",
 | 
			
		||||
                    qualifiedTableName(settingsProvider_.get(), "account_tx")
 | 
			
		||||
                )
 | 
			
		||||
            );
 | 
			
		||||
        }();
 | 
			
		||||
 | 
			
		||||
        PreparedStatement selectAccountTxForward = [this]() {
 | 
			
		||||
            return handle_.get().prepare(
 | 
			
		||||
                fmt::format(
 | 
			
		||||
@@ -827,22 +894,6 @@ public:
 | 
			
		||||
            );
 | 
			
		||||
        }();
 | 
			
		||||
 | 
			
		||||
        PreparedStatement selectNFTIDsByIssuer = [this]() {
 | 
			
		||||
            return handle_.get().prepare(
 | 
			
		||||
                fmt::format(
 | 
			
		||||
                    R"(
 | 
			
		||||
                SELECT token_id
 | 
			
		||||
                  FROM {}
 | 
			
		||||
                 WHERE issuer = ?
 | 
			
		||||
                   AND (taxon, token_id) > ?
 | 
			
		||||
              ORDER BY taxon ASC, token_id ASC
 | 
			
		||||
                 LIMIT ?
 | 
			
		||||
                )",
 | 
			
		||||
                    qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
 | 
			
		||||
                )
 | 
			
		||||
            );
 | 
			
		||||
        }();
 | 
			
		||||
 | 
			
		||||
        PreparedStatement selectNFTIDsByIssuerTaxon = [this]() {
 | 
			
		||||
            return handle_.get().prepare(
 | 
			
		||||
                fmt::format(
 | 
			
		||||
@@ -953,6 +1004,15 @@ public:
 | 
			
		||||
                )
 | 
			
		||||
            );
 | 
			
		||||
        }();
 | 
			
		||||
 | 
			
		||||
        // For ScyllaDB / Cassandra ONLY
 | 
			
		||||
        std::optional<PreparedStatement> selectAccountFromBeginningScylla;
 | 
			
		||||
        std::optional<PreparedStatement> selectAccountFromTokenScylla;
 | 
			
		||||
        std::optional<PreparedStatement> selectNFTsByIssuerScylla;
 | 
			
		||||
 | 
			
		||||
        // For AWS Keyspaces ONLY
 | 
			
		||||
        // NOTE: AWS keyspace is not able to load cache with accounts
 | 
			
		||||
        std::optional<PreparedStatement> selectNFTsAfterTaxonKeyspaces;
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
 
 | 
			
		||||
@@ -97,6 +97,7 @@ SettingsProvider::parseSettings() const
 | 
			
		||||
    settings.coreConnectionsPerHost = config_.get<uint32_t>("core_connections_per_host");
 | 
			
		||||
    settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io");
 | 
			
		||||
    settings.writeBatchSize = config_.get<std::size_t>("write_batch_size");
 | 
			
		||||
    settings.provider = config_.get<std::string>("provider");
 | 
			
		||||
 | 
			
		||||
    if (config_.getValueView("connect_timeout").hasValue()) {
 | 
			
		||||
        auto const connectTimeoutSecond = config_.get<uint32_t>("connect_timeout");
 | 
			
		||||
 
 | 
			
		||||
@@ -36,9 +36,18 @@ constexpr auto kBATCH_DELETER = [](CassBatch* ptr) { cass_batch_free(ptr); };
 | 
			
		||||
 | 
			
		||||
namespace data::cassandra::impl {
 | 
			
		||||
 | 
			
		||||
// TODO: Use an appropriate value instead of CASS_BATCH_TYPE_LOGGED for different use cases
 | 
			
		||||
/*
 | 
			
		||||
 * There are 2 main batches of Cassandra Statements:
 | 
			
		||||
 * LOGGED: Ensures all updates in the batch succeed together, or none do.
 | 
			
		||||
 * Use this for critical, related changes (e.g., for the same user), but it is slower.
 | 
			
		||||
 *
 | 
			
		||||
 * UNLOGGED: For performance. Sends many separate updates in one network trip to be fast.
 | 
			
		||||
 * Use this for bulk-loading unrelated data, but know there's NO all-or-nothing guarantee.
 | 
			
		||||
 *
 | 
			
		||||
 * More info here: https://docs.datastax.com/en/developer/cpp-driver-dse/1.10/features/basics/batches/index.html
 | 
			
		||||
 */
 | 
			
		||||
Batch::Batch(std::vector<Statement> const& statements)
 | 
			
		||||
    : ManagedObject{cass_batch_new(CASS_BATCH_TYPE_LOGGED), kBATCH_DELETER}
 | 
			
		||||
    : ManagedObject{cass_batch_new(CASS_BATCH_TYPE_UNLOGGED), kBATCH_DELETER}
 | 
			
		||||
{
 | 
			
		||||
    cass_batch_set_is_idempotent(*this, cass_true);
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -60,6 +60,13 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), k
 | 
			
		||||
    cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count());
 | 
			
		||||
    cass_cluster_set_request_timeout(*this, settings.requestTimeout.count());
 | 
			
		||||
 | 
			
		||||
    // TODO: AWS keyspace reads should be local_one to save cost
 | 
			
		||||
    if (settings.provider == "aws_keyspace") {
 | 
			
		||||
        if (auto const rc = cass_cluster_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM); rc != CASS_OK) {
 | 
			
		||||
            throw std::runtime_error(fmt::format("Error setting cassandra consistency: {}", cass_error_desc(rc)));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost);
 | 
			
		||||
        rc != CASS_OK) {
 | 
			
		||||
        throw std::runtime_error(fmt::format("Could not set core connections per host: {}", cass_error_desc(rc)));
 | 
			
		||||
 
 | 
			
		||||
@@ -45,6 +45,7 @@ struct Settings {
 | 
			
		||||
    static constexpr uint32_t kDEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000;
 | 
			
		||||
    static constexpr uint32_t kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000;
 | 
			
		||||
    static constexpr std::size_t kDEFAULT_BATCH_SIZE = 20;
 | 
			
		||||
    static constexpr std::string kDEFAULT_PROVIDER = "cassandra";
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Represents the configuration of contact points for cassandra.
 | 
			
		||||
@@ -83,11 +84,14 @@ struct Settings {
 | 
			
		||||
    uint32_t maxReadRequestsOutstanding = kDEFAULT_MAX_READ_REQUESTS_OUTSTANDING;
 | 
			
		||||
 | 
			
		||||
    /** @brief The number of connection per host to always have active */
 | 
			
		||||
    uint32_t coreConnectionsPerHost = 1u;
 | 
			
		||||
    uint32_t coreConnectionsPerHost = 3u;
 | 
			
		||||
 | 
			
		||||
    /** @brief Size of batches when writing */
 | 
			
		||||
    std::size_t writeBatchSize = kDEFAULT_BATCH_SIZE;
 | 
			
		||||
 | 
			
		||||
    /** @brief Provider to know if we are using scylladb or keyspace */
 | 
			
		||||
    std::string provider = kDEFAULT_PROVIDER;
 | 
			
		||||
 | 
			
		||||
    /** @brief Size of the IO queue */
 | 
			
		||||
    std::optional<uint32_t> queueSizeIO = std::nullopt;  // NOLINT(readability-redundant-member-init)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -58,14 +58,14 @@ public:
 | 
			
		||||
    explicit Statement(std::string_view query, Args&&... args)
 | 
			
		||||
        : ManagedObject{cass_statement_new_n(query.data(), query.size(), sizeof...(args)), kDELETER}
 | 
			
		||||
    {
 | 
			
		||||
        cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM);
 | 
			
		||||
        cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM);
 | 
			
		||||
        cass_statement_set_is_idempotent(*this, cass_true);
 | 
			
		||||
        bind<Args...>(std::forward<Args>(args)...);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* implicit */ Statement(CassStatement* ptr) : ManagedObject{ptr, kDELETER}
 | 
			
		||||
    {
 | 
			
		||||
        cass_statement_set_consistency(*this, CASS_CONSISTENCY_QUORUM);
 | 
			
		||||
        cass_statement_set_consistency(*this, CASS_CONSISTENCY_LOCAL_QUORUM);
 | 
			
		||||
        cass_statement_set_is_idempotent(*this, cass_true);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -84,6 +84,11 @@ static constexpr std::array<char const*, 1> kDATABASE_TYPE = {"cassandra"};
 | 
			
		||||
 */
 | 
			
		||||
static constexpr std::array<char const*, 2> kPROCESSING_POLICY = {"parallel", "sequent"};
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief specific values that are accepted for database provider in config.
 | 
			
		||||
 */
 | 
			
		||||
static constexpr std::array<char const*, 2> kPROVIDER = {"cassandra", "aws_keyspace"};
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief An interface to enforce constraints on certain values within ClioConfigDefinition.
 | 
			
		||||
 */
 | 
			
		||||
@@ -470,6 +475,7 @@ static constinit OneOf gValidateCassandraName{"database.type", kDATABASE_TYPE};
 | 
			
		||||
static constinit OneOf gValidateLoadMode{"cache.load", kLOAD_CACHE_MODE};
 | 
			
		||||
static constinit OneOf gValidateLogTag{"log.tag_style", kLOG_TAGS};
 | 
			
		||||
static constinit OneOf gValidateProcessingPolicy{"server.processing_policy", kPROCESSING_POLICY};
 | 
			
		||||
static constinit OneOf gValidateProvider{"database.cassandra.provider", kPROVIDER};
 | 
			
		||||
 | 
			
		||||
static constinit PositiveDouble gValidatePositiveDouble{};
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -285,6 +285,8 @@ getClioConfig()
 | 
			
		||||
         {"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
         {"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
         {"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
         {"database.cassandra.provider",
 | 
			
		||||
          ConfigValue{ConfigType::String}.defaultValue("cassandra").withConstraint(gValidateProvider)},
 | 
			
		||||
 | 
			
		||||
         {"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
 | 
			
		||||
         {"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
 | 
			
		||||
 
 | 
			
		||||
@@ -173,6 +173,7 @@ This document provides a list of all available Clio configuration properties in
 | 
			
		||||
               "Maximum number of outstanding read requests. Read requests are API calls that read from the database."},
 | 
			
		||||
        KV{.key = "database.cassandra.threads",
 | 
			
		||||
           .value = "Represents the number of threads that will be used for database operations."},
 | 
			
		||||
        KV{.key = "database.cassandra.provider", .value = "The specific database backend provider we are using."},
 | 
			
		||||
        KV{.key = "database.cassandra.core_connections_per_host",
 | 
			
		||||
           .value = "The number of core connections per host for the Cassandra database."},
 | 
			
		||||
        KV{.key = "database.cassandra.queue_size_io",
 | 
			
		||||
 
 | 
			
		||||
@@ -44,6 +44,7 @@ using namespace util::config;
 | 
			
		||||
 | 
			
		||||
struct BackendCassandraFactoryTest : SyncAsioContextTest, util::prometheus::WithPrometheus {
 | 
			
		||||
    static constexpr auto kKEYSPACE = "factory_test";
 | 
			
		||||
    static constexpr auto kPROVIDER = "cassandra";
 | 
			
		||||
 | 
			
		||||
protected:
 | 
			
		||||
    ClioConfigDefinition cfg_{
 | 
			
		||||
@@ -53,6 +54,7 @@ protected:
 | 
			
		||||
        {"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
        {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
 | 
			
		||||
        {"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue(kKEYSPACE)},
 | 
			
		||||
        {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue(kPROVIDER)},
 | 
			
		||||
        {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
 | 
			
		||||
        {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
        {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},
 | 
			
		||||
 
 | 
			
		||||
@@ -93,6 +93,7 @@ protected:
 | 
			
		||||
        {"database.cassandra.port", ConfigValue{ConfigType::Integer}.optional()},
 | 
			
		||||
        {"database.cassandra.keyspace",
 | 
			
		||||
         ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
 | 
			
		||||
        {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
 | 
			
		||||
        {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
 | 
			
		||||
        {"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
        {"database.cassandra.max_write_requests_outstanding", ConfigValue{ConfigType::Integer}.defaultValue(10'000)},
 | 
			
		||||
 
 | 
			
		||||
@@ -103,6 +103,7 @@ protected:
 | 
			
		||||
          ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendHost)},
 | 
			
		||||
         {"database.cassandra.keyspace",
 | 
			
		||||
          ConfigValue{ConfigType::String}.defaultValue(TestGlobals::instance().backendKeyspace)},
 | 
			
		||||
         {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
 | 
			
		||||
         {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
 | 
			
		||||
         {"database.cassandra.replication_factor", ConfigValue{ConfigType::Integer}.defaultValue(1)},
 | 
			
		||||
         {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.defaultValue(2)},
 | 
			
		||||
 
 | 
			
		||||
@@ -59,6 +59,7 @@ getParseSettingsConfig(boost::json::value val)
 | 
			
		||||
        {"database.cassandra.certificate", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
        {"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
        {"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
 | 
			
		||||
        {"database.cassandra.provider", ConfigValue{ConfigType::String}.defaultValue("cassandra")},
 | 
			
		||||
        {"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional()},
 | 
			
		||||
        {"database.cassandra.write_batch_size", ConfigValue{ConfigType::Integer}.defaultValue(20)},
 | 
			
		||||
        {"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.optional()},
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user