Merge pull request #6 from ledhed2222/attempt-to-speed-up-tx-loading

attempt to speed up tx loading
This commit is contained in:
ledhed2222
2023-05-05 02:05:13 -04:00
committed by GitHub
2 changed files with 24 additions and 30 deletions

View File

@@ -550,7 +550,7 @@ CassandraBackend::fetchTransactions(
std::vector<TransactionAndMetadata> results{numHashes};
std::vector<std::shared_ptr<ReadCallbackData<result_type>>> cbs;
cbs.reserve(numHashes);
auto timeDiff = util::timed([&]() {
[[maybe_unused]] auto timeDiff = util::timed([&]() {
for (std::size_t i = 0; i < hashes.size(); ++i)
{
CassandraStatement statement{selectTransaction_};
@@ -580,9 +580,9 @@ CassandraBackend::fetchTransactions(
throw DatabaseTimeout();
}
log_.debug() << "Fetched " << numHashes
<< " transactions from Cassandra in " << timeDiff
<< " milliseconds";
// log_.debug() << "Fetched " << numHashes
// << " transactions from Cassandra in " << timeDiff
// << " milliseconds";
return results;
}

View File

@@ -17,10 +17,10 @@ static std::uint32_t const NFT_WRITE_BATCH_SIZE = 10000;
static void
wait(boost::asio::steady_timer& timer, std::string const& reason)
{
BOOST_LOG_TRIVIAL(info) << reason << ". Waiting";
BOOST_LOG_TRIVIAL(info) << reason << ". Waiting then retrying";
timer.expires_after(WAIT_TIME);
timer.wait();
BOOST_LOG_TRIVIAL(info) << "Done";
BOOST_LOG_TRIVIAL(info) << "Done waiting";
}
static std::vector<NFTsData>
@@ -49,25 +49,26 @@ maybeDoNFTWrite(
return doNFTWrite(nfts, backend, tag);
}
static std::optional<Backend::TransactionAndMetadata>
doTryFetchTransaction(
static std::vector<Backend::TransactionAndMetadata>
doTryFetchTransactions(
boost::asio::steady_timer& timer,
Backend::CassandraBackend& backend,
ripple::uint256 const& hash,
std::vector<ripple::uint256> const& hashes,
boost::asio::yield_context& yield,
std::uint32_t const attempts = 0)
{
try
{
return backend.fetchTransaction(hash, yield);
return backend.fetchTransactions(hashes, yield);
}
catch (Backend::DatabaseTimeout const& e)
{
if (attempts >= MAX_RETRIES)
throw e;
wait(timer, "Transaction read error");
return doTryFetchTransaction(timer, backend, hash, yield, attempts + 1);
wait(timer, "Transactions read error");
return doTryFetchTransactions(
timer, backend, hashes, yield, attempts + 1);
}
}
@@ -158,6 +159,8 @@ doMigration(
CassResult const* result =
doTryGetTxPageResult(nftTxQuery, timer, backend);
std::vector<ripple::uint256> txHashes;
// For each tx in page...
CassIterator* txPageIterator = cass_iterator_from_result(result);
while (cass_iterator_next(txPageIterator))
@@ -178,32 +181,23 @@ doMigration(
"Could not retrieve hash from nf_token_transactions");
}
auto const txHash = ripple::uint256::fromVoid(buf);
auto const tx =
doTryFetchTransaction(timer, backend, txHash, yield);
if (!tx)
{
cass_iterator_free(txPageIterator);
cass_result_free(result);
cass_statement_free(nftTxQuery);
std::stringstream ss;
ss << "Could not fetch tx with hash "
<< ripple::to_string(txHash);
throw std::runtime_error(ss.str());
}
txHashes.push_back(ripple::uint256::fromVoid(buf));
}
// Not really sure how cassandra paging works, but we want to skip
// any transactions that were loaded since the migration started
if (tx->ledgerSequence > ledgerRange->maxSequence)
auto txs = doTryFetchTransactions(timer, backend, txHashes, yield);
for (auto const& tx : txs)
{
if (tx.ledgerSequence > ledgerRange->maxSequence)
continue;
ripple::STTx const sttx{ripple::SerialIter{
tx->transaction.data(), tx->transaction.size()}};
tx.transaction.data(), tx.transaction.size()}};
if (sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT)
continue;
ripple::TxMeta const txMeta{
sttx.getTransactionID(), tx->ledgerSequence, tx->metadata};
sttx.getTransactionID(), tx.ledgerSequence, tx.metadata};
toWrite.push_back(
std::get<1>(getNFTDataFromTx(txMeta, sttx)).value());
}