From 649ecf4eda57ef54c45de94da5eb19e1e3ffb092 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 14 Jul 2021 20:29:55 +0000 Subject: [PATCH] avoid wide rows with account_tx --- src/backend/CassandraBackend.cpp | 108 +++++++++++++++++++++++++++++-- src/backend/CassandraBackend.h | 51 +++------------ test.py | 5 +- 3 files changed, 113 insertions(+), 51 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 9f2fb5c6..c7719cd3 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -222,6 +222,9 @@ CassandraBackend::writeAccountTransactions( CassandraStatement statement(insertAccountTx_); auto& [account, lgrSeq, txnIdx, hash] = params.data; statement.bindBytes(account); + char firstByte = static_cast(lgrSeq >> 20); + statement.bindBytes(&firstByte, 1); + statement.bindIntTuple(lgrSeq, txnIdx); statement.bindBytes(hash); return statement; @@ -404,6 +407,101 @@ CassandraBackend::fetchAllTransactionHashesInLedger( << " milliseconds"; return hashes; } +std::pair< + std::vector, + std::optional> +CassandraBackend::fetchAccountTransactions( + ripple::AccountID const& account, + std::uint32_t limit, + std::optional const& cursorIn) const +{ + auto rng = fetchLedgerRange(); + if (!rng) + return {{}, {}}; + std::pair< + std::vector, + std::optional> + res; + auto cursor = cursorIn; + do + { + auto interim = doFetchAccountTransactions(account, limit, cursor); + for (auto& txn : interim.first) + { + res.first.push_back(txn); + } + res.second = cursor = interim.second; + limit -= interim.first.size(); + uint32_t seq = cursor->ledgerSequence; + seq = ((seq >> 20) - 1) << 20; + cursor->ledgerSequence = seq; + } while (res.first.size() < limit && + cursor->ledgerSequence >= rng->minSequence); + if (res.first.size() < limit) + res.second = {}; + return res; +} +std::pair< + std::vector, + std::optional> +CassandraBackend::doFetchAccountTransactions( + ripple::AccountID const& account, + std::uint32_t limit, + std::optional const& cursor) const +{ + BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx"; + CassandraStatement statement{selectAccountTx_}; + statement.bindBytes(account); + if (cursor) + { + char firstByte = static_cast(cursor->ledgerSequence >> 20); + statement.bindBytes(&firstByte, 1); + statement.bindIntTuple( + cursor->ledgerSequence, cursor->transactionIndex); + } + else + { + auto rng = fetchLedgerRange(); + if (!rng) + return {{}, {}}; + uint32_t max = rng->maxSequence; + char firstByte = static_cast(max >> 20); + statement.bindBytes(&firstByte, 1); + + statement.bindIntTuple(INT32_MAX, INT32_MAX); + } + statement.bindUInt(limit); + CassandraResult result = executeSyncRead(statement); + if (!result.hasResult()) + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned"; + return {{}, {}}; + } + + std::vector hashes; + size_t numRows = result.numRows(); + std::optional retCursor; + + BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows); + do + { + hashes.push_back(result.getUInt256()); + --numRows; + if (numRows == 0) + { + auto [lgrSeq, txnIdx] = result.getInt64Tuple(); + retCursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; + } + } while (result.nextRow()); + + BOOST_LOG_TRIVIAL(debug) + << "doAccountTx - populated hashes. num hashes = " << hashes.size(); + if (hashes.size()) + { + return {fetchTransactions(hashes), retCursor}; + } + return {{}, {}}; +} LedgerPage CassandraBackend::doFetchLedgerPage( @@ -996,10 +1094,11 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx" - << " ( account blob, seq_idx tuple, " + << " ( account blob, seq_first_byte blob, seq_idx " + "tuple, " " hash blob, " "PRIMARY KEY " - "(account, seq_idx)) WITH " + "((account,seq_first_byte), seq_idx)) WITH " "CLUSTERING ORDER BY (seq_idx desc)" << " AND default_time_to_live = " << std::to_string(ttl); @@ -1138,14 +1237,15 @@ CassandraBackend::open(bool readOnly) query.str(""); query << " INSERT INTO " << tablePrefix << "account_tx" - << " (account, seq_idx, hash) " - << " VALUES (?,?,?)"; + << " (account, seq_first_byte, seq_idx, hash) " + << " VALUES (?,?,?,?)"; if (!insertAccountTx_.prepareStatement(query, session_.get())) continue; query.str(""); query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx" << " WHERE account = ? " + << " AND seq_first_byte = ? " << " AND seq_idx < ? LIMIT ?"; if (!selectAccountTx_.prepareStatement(query, session_.get())) continue; diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index 92ea0889..f62343f4 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -655,49 +655,14 @@ public: fetchAccountTransactions( ripple::AccountID const& account, std::uint32_t limit, - std::optional const& cursor) const override - { - BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx"; - CassandraStatement statement{selectAccountTx_}; - statement.bindBytes(account); - if (cursor) - statement.bindIntTuple( - cursor->ledgerSequence, cursor->transactionIndex); - else - statement.bindIntTuple(INT32_MAX, INT32_MAX); - statement.bindUInt(limit); - CassandraResult result = executeSyncRead(statement); - if (!result.hasResult()) - { - BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned"; - return {{}, {}}; - } - - std::vector hashes; - size_t numRows = result.numRows(); - bool returnCursor = numRows == limit; - std::optional retCursor; - - BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows); - do - { - hashes.push_back(result.getUInt256()); - --numRows; - if (numRows == 0 && returnCursor) - { - auto [lgrSeq, txnIdx] = result.getInt64Tuple(); - retCursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; - } - } while (result.nextRow()); - - BOOST_LOG_TRIVIAL(debug) - << "doAccountTx - populated hashes. num hashes = " << hashes.size(); - if (hashes.size()) - { - return {fetchTransactions(hashes), retCursor}; - } - return {{}, {}}; - } + std::optional const& cursor) const override; + std::pair< + std::vector, + std::optional> + doFetchAccountTransactions( + ripple::AccountID const& account, + std::uint32_t limit, + std::optional const& cursor) const; bool doFinishWrites() const override diff --git a/test.py b/test.py index 07a45cf6..36126d8a 100755 --- a/test.py +++ b/test.py @@ -347,9 +347,6 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No else: print(res) break - if numCalls > numPages: - print("breaking") - break return results except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -1043,7 +1040,7 @@ def run(args): args.account = res["transaction"]["Account"] print("starting") res = asyncio.get_event_loop().run_until_complete( - account_tx_full(args.ip, args.port, args.account, args.binary,None,None,int(args.numPages))) + account_tx_full(args.ip, args.port, args.account, args.binary,None,None)) rng = getMinAndMax(res) print(len(res["transactions"])) print(args.account)