refactor account_tx (#70)

Revert compound partition key to avoid large rows.
This commit is contained in:
Nathan Nichols
2022-01-03 14:47:56 -06:00
committed by GitHub
parent c7e31aff56
commit 9c93e948f5
2 changed files with 82 additions and 99 deletions

View File

@@ -246,9 +246,6 @@ CassandraBackend::writeAccountTransactions(
CassandraStatement statement(insertAccountTx_);
auto& [account, lgrSeq, txnIdx, hash] = params.data;
statement.bindNextBytes(account);
uint32_t index = lgrSeq >> 20 << 20;
statement.bindNextUInt(index);
statement.bindNextIntTuple(lgrSeq, txnIdx);
statement.bindNextBytes(hash);
return statement;
@@ -449,106 +446,71 @@ CassandraBackend::fetchAccountTransactions(
auto rng = fetchLedgerRange();
if (!rng)
return {{}, {}};
std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
res;
auto keylet = ripple::keylet::account(account);
std::vector<ripple::uint256> hashes;
auto cursor = cursorIn;
CassandraStatement statement = [this, forward]() {
if (forward)
return CassandraStatement{selectAccountTxForward_};
else
return CassandraStatement{selectAccountTx_};
}();
statement.bindNextBytes(account);
if (cursor)
{
statement.bindNextIntTuple(
cursor->ledgerSequence, cursor->transactionIndex);
BOOST_LOG_TRIVIAL(debug)
<< " account = " << ripple::strHex(account)
<< " tuple = " << cursor->ledgerSequence << " : "
<< cursor->transactionIndex;
}
else
{
int seq = forward ? rng->minSequence : rng->maxSequence;
int placeHolder = forward ? 0 : std::numeric_limits<uint32_t>::max();
statement.bindNextIntTuple(placeHolder, placeHolder);
BOOST_LOG_TRIVIAL(debug)
<< " account = " << ripple::strHex(account)
<< " idx = " << seq << " tuple = " << placeHolder;
}
statement.bindNextUInt(limit);
CassandraResult result = executeSyncRead(statement);
if (!result.hasResult())
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned";
return {};
}
std::vector<ripple::uint256> hashes = {};
auto numRows = result.numRows();
BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows);
do
{
hashes.push_back(result.getUInt256());
if (--numRows == 0)
{
CassandraStatement statement = [this, forward]() {
if (forward)
return CassandraStatement{selectAccountTxForward_};
else
return CassandraStatement{selectAccountTx_};
}();
statement.bindNextBytes(account);
if (cursor)
{
statement.bindNextUInt(cursor->ledgerSequence >> 20 << 20);
statement.bindNextIntTuple(
cursor->ledgerSequence, cursor->transactionIndex);
BOOST_LOG_TRIVIAL(debug)
<< " account = " << ripple::strHex(account)
<< " idx = " << (cursor->ledgerSequence >> 20 << 20)
<< " tuple = " << cursor->ledgerSequence << " : "
<< cursor->transactionIndex;
}
else
{
int seq = forward ? rng->minSequence : rng->maxSequence;
statement.bindNextUInt(seq >> 20 << 20);
int placeHolder = forward ? 0 : INT32_MAX;
statement.bindNextIntTuple(placeHolder, placeHolder);
BOOST_LOG_TRIVIAL(debug)
<< " account = " << ripple::strHex(account)
<< " idx = " << seq << " tuple = " << placeHolder;
}
uint32_t adjustedLimit = limit - hashes.size();
statement.bindNextUInt(adjustedLimit);
CassandraResult result = executeSyncRead(statement);
if (!result.hasResult())
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned";
break;
}
size_t numRows = result.numRows();
BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows);
do
{
hashes.push_back(result.getUInt256());
--numRows;
if (numRows == 0)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " setting cursor";
auto [lgrSeq, txnIdx] = result.getInt64Tuple();
cursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx};
if (forward)
++cursor->transactionIndex;
}
} while (result.nextRow());
}
if (hashes.size() < limit)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " less than limit";
uint32_t seq = cursor->ledgerSequence;
seq = seq >> 20;
BOOST_LOG_TRIVIAL(debug) << __func__ << " setting cursor";
auto [lgrSeq, txnIdx] = result.getInt64Tuple();
cursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx};
if (forward)
seq += 1;
else
seq -= 1;
seq = seq << 20;
cursor->ledgerSequence = seq;
cursor->transactionIndex = forward ? 0 : INT32_MAX;
BOOST_LOG_TRIVIAL(debug) << __func__ << " walking back";
CassandraStatement statement{selectObject_};
statement.bindNextBytes(keylet.key);
statement.bindNextInt(seq);
CassandraResult result = executeSyncRead(statement);
if (!result)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " account no longer exists";
cursor = {};
break;
}
++cursor->transactionIndex;
}
} while (hashes.size() < limit &&
cursor->ledgerSequence >= rng->minSequence);
} while (result.nextRow());
auto txns = fetchTransactions(hashes);
BOOST_LOG_TRIVIAL(debug) << __func__ << "txns = " << txns.size();
if (txns.size() >= limit)
if (txns.size() == limit)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " returning cursor";
return {txns, cursor};
}
return {txns, {}};
}
std::optional<ripple::uint256>
@@ -1084,11 +1046,11 @@ CassandraBackend::open(bool readOnly)
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx"
<< " ( account blob, idx int, seq_idx "
<< " ( account blob, seq_idx "
"tuple<bigint, bigint>, "
" hash blob, "
"PRIMARY KEY "
"((account,idx), seq_idx)) WITH "
"(account, seq_idx)) WITH "
"CLUSTERING ORDER BY (seq_idx desc)"
<< " AND default_time_to_live = " << std::to_string(ttl);
@@ -1237,22 +1199,20 @@ CassandraBackend::open(bool readOnly)
query.str("");
query << " INSERT INTO " << tablePrefix << "account_tx"
<< " (account, idx, seq_idx, hash) "
<< " VALUES (?,?,?,?)";
<< " (account, seq_idx, hash) "
<< " VALUES (?,?,?)";
if (!insertAccountTx_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx"
<< " WHERE account = ? "
<< " AND idx = ? "
<< " AND seq_idx < ? LIMIT ?";
if (!selectAccountTx_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx"
<< " WHERE account = ? "
<< " AND idx = ? "
<< " AND seq_idx >= ? ORDER BY seq_idx ASC LIMIT ?";
if (!selectAccountTxForward_.prepareStatement(query, session_.get()))
continue;

View File

@@ -75,10 +75,20 @@ doAccountTx(Context const& context)
auto minIndex = context.range.minSequence;
if (request.contains("ledger_index_min"))
{
if (!request.at("ledger_index_min").is_int64())
auto& min = request.at("ledger_index_min");
if (!min.is_int64())
return Status{Error::rpcINVALID_PARAMS, "ledgerSeqMinNotNumber"};
minIndex = value_to<std::uint32_t>(request.at("ledger_index_min"));
if (min.as_int64() != -1)
{
if (context.range.maxSequence < min.as_int64() ||
context.range.minSequence > min.as_int64())
return Status{Error::rpcINVALID_PARAMS, "ledgerSeqMaxOutOfRange"};
else
minIndex = value_to<std::uint32_t>(min);
}
if (forward && !cursor)
cursor = {minIndex, 0};
}
@@ -86,16 +96,27 @@ doAccountTx(Context const& context)
auto maxIndex = context.range.maxSequence;
if (request.contains("ledger_index_max"))
{
if (!request.at("ledger_index_max").is_int64())
auto& max = request.at("ledger_index_max");
if (!max.is_int64())
return Status{Error::rpcINVALID_PARAMS, "ledgerSeqMaxNotNumber"};
maxIndex = value_to<std::uint32_t>(request.at("ledger_index_max"));
if (max.as_int64() != -1)
{
if (context.range.maxSequence < max.as_int64() ||
context.range.minSequence > max.as_int64())
return Status{Error::rpcINVALID_PARAMS, "ledgerSeqMaxOutOfRange"};
else
maxIndex = value_to<std::uint32_t>(max);
}
if (minIndex > maxIndex)
return Status{Error::rpcINVALID_PARAMS, "invalidIndex"};
if (!forward && !cursor)
cursor = {maxIndex, INT32_MAX};
}
if (request.contains("ledger_index"))
{
if (!request.at("ledger_index").is_int64())
@@ -104,6 +125,7 @@ doAccountTx(Context const& context)
auto ledgerIndex = value_to<uint32_t>(request.at("ledger_index"));
maxIndex = minIndex = ledgerIndex;
}
if (request.contains("ledger_hash"))
{
if (!request.at("ledger_hash").is_string())
@@ -118,6 +140,7 @@ doAccountTx(Context const& context)
auto lgrInfo = context.backend->fetchLedgerByHash(ledgerHash);
maxIndex = minIndex = lgrInfo->seq;
}
if (!cursor)
{
if (forward)