diff --git a/src/main/main.cpp b/src/main/main.cpp index 093d090a8..859ff2092 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -10,36 +10,112 @@ #include -void +static std::uint32_t const MAX_RETRIES = 5; +static std::chrono::seconds const WAIT_TIME = std::chrono::seconds(60); + +static void +wait(boost::asio::steady_timer& timer, std::string const reason) +{ + BOOST_LOG_TRIVIAL(info) << reason << ". Waiting"; + timer.expires_after(WAIT_TIME); + timer.wait(); + BOOST_LOG_TRIVIAL(info) << "Done"; +} + +static void doNFTWrite( std::vector& nfts, Backend::CassandraBackend& backend, std::string const tag) { - if (nfts.size() > 0) - { - BOOST_LOG_TRIVIAL(info) - << tag << ": About to write " << nfts.size() << " records..."; - backend.writeNFTs(std::move(nfts)); - backend.sync(); - BOOST_LOG_TRIVIAL(info) << tag << ": Done"; - } - else - BOOST_LOG_TRIVIAL(info) << tag << ": No records to write"; + if (nfts.size() <= 0) + return; + auto const size = nfts.size(); + backend.writeNFTs(std::move(nfts)); + backend.sync(); + BOOST_LOG_TRIVIAL(info) << tag << ": Wrote " << size << " records"; } -void +static std::optional +doTryFetchTransaction( + boost::asio::steady_timer& timer, + Backend::CassandraBackend& backend, + ripple::uint256 const& hash, + boost::asio::yield_context& yield, + std::uint32_t const attempts = 0) +{ + try + { + return backend.fetchTransaction(hash, yield); + } + catch (Backend::DatabaseTimeout const& e) + { + if (attempts >= MAX_RETRIES) + throw e; + + wait(timer, "Transaction read error"); + return doTryFetchTransaction(timer, backend, hash, yield, attempts + 1); + } +} + +static Backend::LedgerPage +doTryFetchLedgerPage( + boost::asio::steady_timer& timer, + Backend::CassandraBackend& backend, + std::optional const& cursor, + std::uint32_t const sequence, + boost::asio::yield_context& yield, + std::uint32_t const attempts = 0) +{ + try + { + return backend.fetchLedgerPage(cursor, sequence, 2000, false, yield); + } + catch (Backend::DatabaseTimeout const& e) + { + if (attempts >= MAX_RETRIES) + throw e; + + wait(timer, "Page read error"); + return doTryFetchLedgerPage( + timer, backend, cursor, sequence, yield, attempts + 1); + } +} + +static const CassResult* +doTryGetTxPageResult( + CassStatement* const query, + boost::asio::steady_timer& timer, + Backend::CassandraBackend& backend, + std::uint32_t const attempts = 0) +{ + CassFuture* fut = cass_session_execute(backend.cautionGetSession(), query); + CassResult const* result = cass_future_get_result(fut); + cass_future_free(fut); + + if (result != nullptr) + return result; + + if (attempts >= MAX_RETRIES) + throw std::runtime_error("Already retried too many times"); + + wait(timer, "Unexpected empty result from tx paging"); + return doTryGetTxPageResult(query, timer, backend, attempts + 1); +} + +static void doMigration( Backend::CassandraBackend& backend, boost::asio::steady_timer& timer, - boost::asio::yield_context yield) + boost::asio::yield_context& yield) { BOOST_LOG_TRIVIAL(info) << "Beginning migration"; + auto const ledgerRange = backend.hardFetchLedgerRangeNoThrow(yield); + /* * Step 0 - If we haven't downloaded the initial ledger yet, just short * circuit. */ - auto const ledgerRange = backend.hardFetchLedgerRangeNoThrow(yield); if (!ledgerRange) { BOOST_LOG_TRIVIAL(info) << "There is no data to migrate"; @@ -47,7 +123,7 @@ doMigration( } /* - * Step 1 - Look at all NFT transactions, recording in + * Step 1 - Look at all NFT transactions recorded in * `nf_token_transactions` and reload any NFTokenMint transactions. These * will contain the URI of any tokens that were minted after our start * sequence. We look at transactions for this step instead of directly at @@ -61,37 +137,13 @@ doMigration( cass_statement_set_paging_size(nftTxQuery, 1000); cass_bool_t morePages = cass_true; - std::uint32_t retries = 0; - // For all NFT txs, paginated in groups of 1000... while (morePages) { std::vector toWrite; - CassFuture* fut = - cass_session_execute(backend.cautionGetSession(), nftTxQuery); - CassResult const* result = cass_future_get_result(fut); - if (result == nullptr) - { - cass_future_free(fut); - if (retries > 5) - { - cass_statement_free(nftTxQuery); - throw std::runtime_error( - "Already retried 5 times on same page. Aborting"); - } - - BOOST_LOG_TRIVIAL(info) - << "Unexepcted empty result from nf_token_transactions, " - "waiting for a minute and trying again"; - timer.expires_after(std::chrono::seconds(60)); - timer.wait(); - BOOST_LOG_TRIVIAL(info) << "Done waiting"; - retries++; - continue; - } - - retries = 0; + CassResult const* result = + doTryGetTxPageResult(nftTxQuery, timer, backend); // For each tx in page... CassIterator* txPageIterator = cass_iterator_from_result(result); @@ -100,7 +152,7 @@ doMigration( cass_byte_t const* buf; std::size_t bufSize; - CassError rc = cass_value_get_bytes( + CassError const rc = cass_value_get_bytes( cass_row_get_column(cass_iterator_get_row(txPageIterator), 0), &buf, &bufSize); @@ -108,25 +160,27 @@ doMigration( { cass_iterator_free(txPageIterator); cass_result_free(result); - cass_future_free(fut); cass_statement_free(nftTxQuery); throw std::runtime_error( "Could not retrieve hash from nf_token_transactions"); } auto const txHash = ripple::uint256::fromVoid(buf); - auto const tx = backend.fetchTransaction(txHash, yield); + auto const tx = + doTryFetchTransaction(timer, backend, txHash, yield); if (!tx) { cass_iterator_free(txPageIterator); cass_result_free(result); - cass_future_free(fut); cass_statement_free(nftTxQuery); std::stringstream ss; ss << "Could not fetch tx with hash " << ripple::to_string(txHash); throw std::runtime_error(ss.str()); } + + // Not really sure how cassandra paging works, but we want to skip + // any transactions that were loaded since the migration started if (tx->ledgerSequence > ledgerRange->maxSequence) continue; @@ -141,7 +195,6 @@ doMigration( std::get<1>(getNFTDataFromTx(txMeta, sttx)).value()); } - // write what we have doNFTWrite(toWrite, backend, "TX"); morePages = cass_result_has_more_pages(result); @@ -149,10 +202,10 @@ doMigration( cass_statement_set_paging_state(nftTxQuery, result); cass_iterator_free(txPageIterator); cass_result_free(result); - cass_future_free(fut); } cass_statement_free(nftTxQuery); + BOOST_LOG_TRIVIAL(info) << "\nDone with transaction loading!\n"; /* * Step 2 - Pull every object from our initial ledger and load all NFTs @@ -165,20 +218,21 @@ doMigration( do { - auto const page = backend.fetchLedgerPage( - cursor, ledgerRange->minSequence, 2000, false, yield); + auto const page = doTryFetchLedgerPage( + timer, backend, cursor, ledgerRange->minSequence, yield); for (auto const& object : page.objects) { - std::string blobStr(object.blob.begin(), object.blob.end()); std::vector toWrite = getNFTDataFromObj( ledgerRange->minSequence, ripple::to_string(object.key), - blobStr); + std::string(object.blob.begin(), object.blob.end())); doNFTWrite(toWrite, backend, "OBJ"); } cursor = page.cursor; } while (cursor.has_value()); + BOOST_LOG_TRIVIAL(info) << "\nDone with object loading!\n"; + /* * Step 3 - Drop the old `issuer_nf_tokens` table, which is replaced by * `issuer_nf_tokens_v2`. Normally, we should probably not drop old tables @@ -191,19 +245,20 @@ doMigration( cass_statement_new(query.str().c_str(), 0); CassFuture* fut = cass_session_execute(backend.cautionGetSession(), issuerDropTableQuery); - CassError rc = cass_future_error_code(fut); + CassError const rc = cass_future_error_code(fut); cass_future_free(fut); cass_statement_free(issuerDropTableQuery); - if (rc != CASS_OK) - BOOST_LOG_TRIVIAL(warning) - << "Could not drop old issuer_nf_tokens table. If it still exists, " - "you should drop it yourself"; - backend.sync(); + if (rc != CASS_OK) + BOOST_LOG_TRIVIAL(warning) << "\nCould not drop old issuer_nf_tokens " + "table. If it still exists, " + "you should drop it yourself\n"; + else + BOOST_LOG_TRIVIAL(info) << "\nDropped old 'issuer_nf_tokens' table!\n"; BOOST_LOG_TRIVIAL(info) - << "Completed migration from " << ledgerRange->minSequence << " to " - << ledgerRange->maxSequence; + << "\nCompleted migration from " << ledgerRange->minSequence << " to " + << ledgerRange->maxSequence << "!\n"; } int @@ -232,14 +287,14 @@ main(int argc, char* argv[]) } boost::asio::io_context ioc; + boost::asio::steady_timer timer{ioc}; + auto workGuard = boost::asio::make_work_guard(ioc); auto backend = Backend::make_Backend(ioc, config); - boost::asio::steady_timer timer(ioc); - auto work = boost::asio::make_work_guard(ioc); boost::asio::spawn( - ioc, [&backend, &work, &timer](boost::asio::yield_context yield) { + ioc, [&backend, &workGuard, &timer](boost::asio::yield_context yield) { doMigration(*backend, timer, yield); - work.reset(); + workGuard.reset(); }); ioc.run();