diff --git a/src/main/main.cpp b/src/main/main.cpp index 6776c19fe..8b3a02a8c 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -17,10 +17,10 @@ static std::uint32_t const NFT_WRITE_BATCH_SIZE = 10000; static void wait(boost::asio::steady_timer& timer, std::string const& reason) { - BOOST_LOG_TRIVIAL(info) << reason << ". Waiting"; + BOOST_LOG_TRIVIAL(info) << reason << ". Waiting then retrying"; timer.expires_after(WAIT_TIME); timer.wait(); - BOOST_LOG_TRIVIAL(info) << "Done"; + BOOST_LOG_TRIVIAL(info) << "Done waiting"; } static std::vector @@ -49,25 +49,26 @@ maybeDoNFTWrite( return doNFTWrite(nfts, backend, tag); } -static std::optional -doTryFetchTransaction( +static std::vector +doTryFetchTransactions( boost::asio::steady_timer& timer, Backend::CassandraBackend& backend, - ripple::uint256 const& hash, + std::vector const& hashes, boost::asio::yield_context& yield, std::uint32_t const attempts = 0) { try { - return backend.fetchTransaction(hash, yield); + return backend.fetchTransactions(hashes, yield); } catch (Backend::DatabaseTimeout const& e) { if (attempts >= MAX_RETRIES) throw e; - wait(timer, "Transaction read error"); - return doTryFetchTransaction(timer, backend, hash, yield, attempts + 1); + wait(timer, "Transactions read error"); + return doTryFetchTransactions( + timer, backend, hashes, yield, attempts + 1); } } @@ -158,6 +159,8 @@ doMigration( CassResult const* result = doTryGetTxPageResult(nftTxQuery, timer, backend); + std::vector txHashes; + // For each tx in page... CassIterator* txPageIterator = cass_iterator_from_result(result); while (cass_iterator_next(txPageIterator)) @@ -178,36 +181,35 @@ doMigration( "Could not retrieve hash from nf_token_transactions"); } - auto const txHash = ripple::uint256::fromVoid(buf); - auto const tx = - doTryFetchTransaction(timer, backend, txHash, yield); - if (!tx) - { - cass_iterator_free(txPageIterator); - cass_result_free(result); - cass_statement_free(nftTxQuery); - std::stringstream ss; - ss << "Could not fetch tx with hash " - << ripple::to_string(txHash); - throw std::runtime_error(ss.str()); - } - - // Not really sure how cassandra paging works, but we want to skip - // any transactions that were loaded since the migration started - if (tx->ledgerSequence > ledgerRange->maxSequence) - continue; - - ripple::STTx const sttx{ripple::SerialIter{ - tx->transaction.data(), tx->transaction.size()}}; - if (sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT) - continue; - - ripple::TxMeta const txMeta{ - sttx.getTransactionID(), tx->ledgerSequence, tx->metadata}; - toWrite.push_back( - std::get<1>(getNFTDataFromTx(txMeta, sttx)).value()); + txHashes.push_back(ripple::uint256::fromVoid(buf)); } + auto txs = doTryFetchTransactions(timer, backend, txHashes, yield); + txs.erase( + std::remove_if( + txs.begin(), + txs.end(), + [&ledgerRange](Backend::TransactionAndMetadata const& tx) { + if (tx.ledgerSequence > ledgerRange->maxSequence) + return true; + ripple::STTx const sttx{ripple::SerialIter{ + tx.transaction.data(), tx.transaction.size()}}; + return sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT; + }), + txs.end()); + + std::transform( + txs.begin(), + txs.end(), + toWrite.end(), + [](Backend::TransactionAndMetadata const& tx) { + ripple::STTx const sttx{ripple::SerialIter{ + tx.transaction.data(), tx.transaction.size()}}; + ripple::TxMeta const txMeta{ + sttx.getTransactionID(), tx.ledgerSequence, tx.metadata}; + return std::get<1>(getNFTDataFromTx(txMeta, sttx)).value(); + }); + toWrite = maybeDoNFTWrite(toWrite, backend, "TX"); morePages = cass_result_has_more_pages(result);