avoid wide rows with account_tx

This commit is contained in:
CJ Cobb
2021-07-14 20:29:55 +00:00
parent cb2975dd41
commit 649ecf4eda
3 changed files with 113 additions and 51 deletions

View File

@@ -222,6 +222,9 @@ CassandraBackend::writeAccountTransactions(
CassandraStatement statement(insertAccountTx_);
auto& [account, lgrSeq, txnIdx, hash] = params.data;
statement.bindBytes(account);
char firstByte = static_cast<char>(lgrSeq >> 20);
statement.bindBytes(&firstByte, 1);
statement.bindIntTuple(lgrSeq, txnIdx);
statement.bindBytes(hash);
return statement;
@@ -404,6 +407,101 @@ CassandraBackend::fetchAllTransactionHashesInLedger(
<< " milliseconds";
return hashes;
}
std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
CassandraBackend::fetchAccountTransactions(
ripple::AccountID const& account,
std::uint32_t limit,
std::optional<AccountTransactionsCursor> const& cursorIn) const
{
auto rng = fetchLedgerRange();
if (!rng)
return {{}, {}};
std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
res;
auto cursor = cursorIn;
do
{
auto interim = doFetchAccountTransactions(account, limit, cursor);
for (auto& txn : interim.first)
{
res.first.push_back(txn);
}
res.second = cursor = interim.second;
limit -= interim.first.size();
uint32_t seq = cursor->ledgerSequence;
seq = ((seq >> 20) - 1) << 20;
cursor->ledgerSequence = seq;
} while (res.first.size() < limit &&
cursor->ledgerSequence >= rng->minSequence);
if (res.first.size() < limit)
res.second = {};
return res;
}
std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
CassandraBackend::doFetchAccountTransactions(
ripple::AccountID const& account,
std::uint32_t limit,
std::optional<AccountTransactionsCursor> const& cursor) const
{
BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx";
CassandraStatement statement{selectAccountTx_};
statement.bindBytes(account);
if (cursor)
{
char firstByte = static_cast<char>(cursor->ledgerSequence >> 20);
statement.bindBytes(&firstByte, 1);
statement.bindIntTuple(
cursor->ledgerSequence, cursor->transactionIndex);
}
else
{
auto rng = fetchLedgerRange();
if (!rng)
return {{}, {}};
uint32_t max = rng->maxSequence;
char firstByte = static_cast<char>(max >> 20);
statement.bindBytes(&firstByte, 1);
statement.bindIntTuple(INT32_MAX, INT32_MAX);
}
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
if (!result.hasResult())
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned";
return {{}, {}};
}
std::vector<ripple::uint256> hashes;
size_t numRows = result.numRows();
std::optional<AccountTransactionsCursor> retCursor;
BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows);
do
{
hashes.push_back(result.getUInt256());
--numRows;
if (numRows == 0)
{
auto [lgrSeq, txnIdx] = result.getInt64Tuple();
retCursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx};
}
} while (result.nextRow());
BOOST_LOG_TRIVIAL(debug)
<< "doAccountTx - populated hashes. num hashes = " << hashes.size();
if (hashes.size())
{
return {fetchTransactions(hashes), retCursor};
}
return {{}, {}};
}
LedgerPage
CassandraBackend::doFetchLedgerPage(
@@ -996,10 +1094,11 @@ CassandraBackend::open(bool readOnly)
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx"
<< " ( account blob, seq_idx tuple<bigint, bigint>, "
<< " ( account blob, seq_first_byte blob, seq_idx "
"tuple<bigint, bigint>, "
" hash blob, "
"PRIMARY KEY "
"(account, seq_idx)) WITH "
"((account,seq_first_byte), seq_idx)) WITH "
"CLUSTERING ORDER BY (seq_idx desc)"
<< " AND default_time_to_live = " << std::to_string(ttl);
@@ -1138,14 +1237,15 @@ CassandraBackend::open(bool readOnly)
query.str("");
query << " INSERT INTO " << tablePrefix << "account_tx"
<< " (account, seq_idx, hash) "
<< " VALUES (?,?,?)";
<< " (account, seq_first_byte, seq_idx, hash) "
<< " VALUES (?,?,?,?)";
if (!insertAccountTx_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx"
<< " WHERE account = ? "
<< " AND seq_first_byte = ? "
<< " AND seq_idx < ? LIMIT ?";
if (!selectAccountTx_.prepareStatement(query, session_.get()))
continue;

View File

@@ -655,49 +655,14 @@ public:
fetchAccountTransactions(
ripple::AccountID const& account,
std::uint32_t limit,
std::optional<AccountTransactionsCursor> const& cursor) const override
{
BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx";
CassandraStatement statement{selectAccountTx_};
statement.bindBytes(account);
if (cursor)
statement.bindIntTuple(
cursor->ledgerSequence, cursor->transactionIndex);
else
statement.bindIntTuple(INT32_MAX, INT32_MAX);
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
if (!result.hasResult())
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned";
return {{}, {}};
}
std::vector<ripple::uint256> hashes;
size_t numRows = result.numRows();
bool returnCursor = numRows == limit;
std::optional<AccountTransactionsCursor> retCursor;
BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows);
do
{
hashes.push_back(result.getUInt256());
--numRows;
if (numRows == 0 && returnCursor)
{
auto [lgrSeq, txnIdx] = result.getInt64Tuple();
retCursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx};
}
} while (result.nextRow());
BOOST_LOG_TRIVIAL(debug)
<< "doAccountTx - populated hashes. num hashes = " << hashes.size();
if (hashes.size())
{
return {fetchTransactions(hashes), retCursor};
}
return {{}, {}};
}
std::optional<AccountTransactionsCursor> const& cursor) const override;
std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
doFetchAccountTransactions(
ripple::AccountID const& account,
std::uint32_t limit,
std::optional<AccountTransactionsCursor> const& cursor) const;
bool
doFinishWrites() const override