attempt to speed up tx loading

This commit is contained in:
ledhed2222
2023-05-05 01:39:20 -04:00
parent 646ca2c6d9
commit d4e2240126

View File

@@ -17,10 +17,10 @@ static std::uint32_t const NFT_WRITE_BATCH_SIZE = 10000;
static void
wait(boost::asio::steady_timer& timer, std::string const& reason)
{
BOOST_LOG_TRIVIAL(info) << reason << ". Waiting";
BOOST_LOG_TRIVIAL(info) << reason << ". Waiting then retrying";
timer.expires_after(WAIT_TIME);
timer.wait();
BOOST_LOG_TRIVIAL(info) << "Done";
BOOST_LOG_TRIVIAL(info) << "Done waiting";
}
static std::vector<NFTsData>
@@ -49,25 +49,26 @@ maybeDoNFTWrite(
return doNFTWrite(nfts, backend, tag);
}
static std::optional<Backend::TransactionAndMetadata>
doTryFetchTransaction(
static std::vector<Backend::TransactionAndMetadata>
doTryFetchTransactions(
boost::asio::steady_timer& timer,
Backend::CassandraBackend& backend,
ripple::uint256 const& hash,
std::vector<ripple::uint256> const& hashes,
boost::asio::yield_context& yield,
std::uint32_t const attempts = 0)
{
try
{
return backend.fetchTransaction(hash, yield);
return backend.fetchTransactions(hashes, yield);
}
catch (Backend::DatabaseTimeout const& e)
{
if (attempts >= MAX_RETRIES)
throw e;
wait(timer, "Transaction read error");
return doTryFetchTransaction(timer, backend, hash, yield, attempts + 1);
wait(timer, "Transactions read error");
return doTryFetchTransactions(
timer, backend, hashes, yield, attempts + 1);
}
}
@@ -158,6 +159,8 @@ doMigration(
CassResult const* result =
doTryGetTxPageResult(nftTxQuery, timer, backend);
std::vector<ripple::uint256> txHashes;
// For each tx in page...
CassIterator* txPageIterator = cass_iterator_from_result(result);
while (cass_iterator_next(txPageIterator))
@@ -178,36 +181,35 @@ doMigration(
"Could not retrieve hash from nf_token_transactions");
}
auto const txHash = ripple::uint256::fromVoid(buf);
auto const tx =
doTryFetchTransaction(timer, backend, txHash, yield);
if (!tx)
{
cass_iterator_free(txPageIterator);
cass_result_free(result);
cass_statement_free(nftTxQuery);
std::stringstream ss;
ss << "Could not fetch tx with hash "
<< ripple::to_string(txHash);
throw std::runtime_error(ss.str());
}
// Not really sure how cassandra paging works, but we want to skip
// any transactions that were loaded since the migration started
if (tx->ledgerSequence > ledgerRange->maxSequence)
continue;
ripple::STTx const sttx{ripple::SerialIter{
tx->transaction.data(), tx->transaction.size()}};
if (sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT)
continue;
ripple::TxMeta const txMeta{
sttx.getTransactionID(), tx->ledgerSequence, tx->metadata};
toWrite.push_back(
std::get<1>(getNFTDataFromTx(txMeta, sttx)).value());
txHashes.push_back(ripple::uint256::fromVoid(buf));
}
auto txs = doTryFetchTransactions(timer, backend, txHashes, yield);
txs.erase(
std::remove_if(
txs.begin(),
txs.end(),
[&ledgerRange](Backend::TransactionAndMetadata const& tx) {
if (tx.ledgerSequence > ledgerRange->maxSequence)
return true;
ripple::STTx const sttx{ripple::SerialIter{
tx.transaction.data(), tx.transaction.size()}};
return sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT;
}),
txs.end());
std::transform(
txs.begin(),
txs.end(),
toWrite.end(),
[](Backend::TransactionAndMetadata const& tx) {
ripple::STTx const sttx{ripple::SerialIter{
tx.transaction.data(), tx.transaction.size()}};
ripple::TxMeta const txMeta{
sttx.getTransactionID(), tx.ledgerSequence, tx.metadata};
return std::get<1>(getNFTDataFromTx(txMeta, sttx)).value();
});
toWrite = maybeDoNFTWrite(toWrite, backend, "TX");
morePages = cass_result_has_more_pages(result);