diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 12d2741e..c581cd41 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -246,9 +246,6 @@ CassandraBackend::writeAccountTransactions( CassandraStatement statement(insertAccountTx_); auto& [account, lgrSeq, txnIdx, hash] = params.data; statement.bindNextBytes(account); - uint32_t index = lgrSeq >> 20 << 20; - statement.bindNextUInt(index); - statement.bindNextIntTuple(lgrSeq, txnIdx); statement.bindNextBytes(hash); return statement; @@ -449,106 +446,71 @@ CassandraBackend::fetchAccountTransactions( auto rng = fetchLedgerRange(); if (!rng) return {{}, {}}; - std::pair< - std::vector, - std::optional> - res; + auto keylet = ripple::keylet::account(account); - std::vector hashes; auto cursor = cursorIn; + + CassandraStatement statement = [this, forward]() { + if (forward) + return CassandraStatement{selectAccountTxForward_}; + else + return CassandraStatement{selectAccountTx_}; + }(); + + statement.bindNextBytes(account); + if (cursor) + { + statement.bindNextIntTuple( + cursor->ledgerSequence, cursor->transactionIndex); + BOOST_LOG_TRIVIAL(debug) + << " account = " << ripple::strHex(account) + << " tuple = " << cursor->ledgerSequence << " : " + << cursor->transactionIndex; + } + else + { + int seq = forward ? rng->minSequence : rng->maxSequence; + int placeHolder = forward ? 0 : std::numeric_limits::max(); + + statement.bindNextIntTuple(placeHolder, placeHolder); + BOOST_LOG_TRIVIAL(debug) + << " account = " << ripple::strHex(account) + << " idx = " << seq << " tuple = " << placeHolder; + } + statement.bindNextUInt(limit); + + CassandraResult result = executeSyncRead(statement); + if (!result.hasResult()) + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned"; + return {}; + } + + std::vector hashes = {}; + auto numRows = result.numRows(); + BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows); do { + hashes.push_back(result.getUInt256()); + if (--numRows == 0) { - CassandraStatement statement = [this, forward]() { - if (forward) - return CassandraStatement{selectAccountTxForward_}; - else - return CassandraStatement{selectAccountTx_}; - }(); - statement.bindNextBytes(account); - if (cursor) - { - statement.bindNextUInt(cursor->ledgerSequence >> 20 << 20); - statement.bindNextIntTuple( - cursor->ledgerSequence, cursor->transactionIndex); - BOOST_LOG_TRIVIAL(debug) - << " account = " << ripple::strHex(account) - << " idx = " << (cursor->ledgerSequence >> 20 << 20) - << " tuple = " << cursor->ledgerSequence << " : " - << cursor->transactionIndex; - } - else - { - int seq = forward ? rng->minSequence : rng->maxSequence; - statement.bindNextUInt(seq >> 20 << 20); - int placeHolder = forward ? 0 : INT32_MAX; - - statement.bindNextIntTuple(placeHolder, placeHolder); - BOOST_LOG_TRIVIAL(debug) - << " account = " << ripple::strHex(account) - << " idx = " << seq << " tuple = " << placeHolder; - } - uint32_t adjustedLimit = limit - hashes.size(); - statement.bindNextUInt(adjustedLimit); - CassandraResult result = executeSyncRead(statement); - if (!result.hasResult()) - { - BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned"; - break; - } - - size_t numRows = result.numRows(); - - BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows); - do - { - hashes.push_back(result.getUInt256()); - --numRows; - if (numRows == 0) - { - BOOST_LOG_TRIVIAL(debug) << __func__ << " setting cursor"; - auto [lgrSeq, txnIdx] = result.getInt64Tuple(); - cursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; - if (forward) - ++cursor->transactionIndex; - } - } while (result.nextRow()); - } - if (hashes.size() < limit) - { - BOOST_LOG_TRIVIAL(debug) << __func__ << " less than limit"; - uint32_t seq = cursor->ledgerSequence; - seq = seq >> 20; + BOOST_LOG_TRIVIAL(debug) << __func__ << " setting cursor"; + auto [lgrSeq, txnIdx] = result.getInt64Tuple(); + cursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; if (forward) - seq += 1; - else - seq -= 1; - seq = seq << 20; - cursor->ledgerSequence = seq; - cursor->transactionIndex = forward ? 0 : INT32_MAX; - BOOST_LOG_TRIVIAL(debug) << __func__ << " walking back"; - CassandraStatement statement{selectObject_}; - statement.bindNextBytes(keylet.key); - statement.bindNextInt(seq); - CassandraResult result = executeSyncRead(statement); - if (!result) - { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " account no longer exists"; - cursor = {}; - break; - } + ++cursor->transactionIndex; } - } while (hashes.size() < limit && - cursor->ledgerSequence >= rng->minSequence); + } while (result.nextRow()); auto txns = fetchTransactions(hashes); BOOST_LOG_TRIVIAL(debug) << __func__ << "txns = " << txns.size(); - if (txns.size() >= limit) + + if (txns.size() == limit) { BOOST_LOG_TRIVIAL(debug) << __func__ << " returning cursor"; return {txns, cursor}; } + return {txns, {}}; } std::optional @@ -1084,11 +1046,11 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx" - << " ( account blob, idx int, seq_idx " + << " ( account blob, seq_idx " "tuple, " " hash blob, " "PRIMARY KEY " - "((account,idx), seq_idx)) WITH " + "(account, seq_idx)) WITH " "CLUSTERING ORDER BY (seq_idx desc)" << " AND default_time_to_live = " << std::to_string(ttl); @@ -1237,22 +1199,20 @@ CassandraBackend::open(bool readOnly) query.str(""); query << " INSERT INTO " << tablePrefix << "account_tx" - << " (account, idx, seq_idx, hash) " - << " VALUES (?,?,?,?)"; + << " (account, seq_idx, hash) " + << " VALUES (?,?,?)"; if (!insertAccountTx_.prepareStatement(query, session_.get())) continue; query.str(""); query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx" << " WHERE account = ? " - << " AND idx = ? " << " AND seq_idx < ? LIMIT ?"; if (!selectAccountTx_.prepareStatement(query, session_.get())) continue; query.str(""); query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx" << " WHERE account = ? " - << " AND idx = ? " << " AND seq_idx >= ? ORDER BY seq_idx ASC LIMIT ?"; if (!selectAccountTxForward_.prepareStatement(query, session_.get())) continue; diff --git a/src/rpc/handlers/AccountTx.cpp b/src/rpc/handlers/AccountTx.cpp index e55fbac4..bd89c95c 100644 --- a/src/rpc/handlers/AccountTx.cpp +++ b/src/rpc/handlers/AccountTx.cpp @@ -75,10 +75,20 @@ doAccountTx(Context const& context) auto minIndex = context.range.minSequence; if (request.contains("ledger_index_min")) { - if (!request.at("ledger_index_min").is_int64()) + auto& min = request.at("ledger_index_min"); + + if (!min.is_int64()) return Status{Error::rpcINVALID_PARAMS, "ledgerSeqMinNotNumber"}; - minIndex = value_to(request.at("ledger_index_min")); + if (min.as_int64() != -1) + { + if (context.range.maxSequence < min.as_int64() || + context.range.minSequence > min.as_int64()) + return Status{Error::rpcINVALID_PARAMS, "ledgerSeqMaxOutOfRange"}; + else + minIndex = value_to(min); + } + if (forward && !cursor) cursor = {minIndex, 0}; } @@ -86,16 +96,27 @@ doAccountTx(Context const& context) auto maxIndex = context.range.maxSequence; if (request.contains("ledger_index_max")) { - if (!request.at("ledger_index_max").is_int64()) + auto& max = request.at("ledger_index_max"); + + if (!max.is_int64()) return Status{Error::rpcINVALID_PARAMS, "ledgerSeqMaxNotNumber"}; - maxIndex = value_to(request.at("ledger_index_max")); + if (max.as_int64() != -1) + { + if (context.range.maxSequence < max.as_int64() || + context.range.minSequence > max.as_int64()) + return Status{Error::rpcINVALID_PARAMS, "ledgerSeqMaxOutOfRange"}; + else + maxIndex = value_to(max); + } if (minIndex > maxIndex) return Status{Error::rpcINVALID_PARAMS, "invalidIndex"}; + if (!forward && !cursor) cursor = {maxIndex, INT32_MAX}; } + if (request.contains("ledger_index")) { if (!request.at("ledger_index").is_int64()) @@ -104,6 +125,7 @@ doAccountTx(Context const& context) auto ledgerIndex = value_to(request.at("ledger_index")); maxIndex = minIndex = ledgerIndex; } + if (request.contains("ledger_hash")) { if (!request.at("ledger_hash").is_string()) @@ -118,6 +140,7 @@ doAccountTx(Context const& context) auto lgrInfo = context.backend->fetchLedgerByHash(ledgerHash); maxIndex = minIndex = lgrInfo->seq; } + if (!cursor) { if (forward)