From fe5a45a9c801c827ec79b711f0f24c31314d1522 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Thu, 19 Oct 2023 14:00:28 +0100 Subject: [PATCH] Resume functionality (#941) --- README.md | 12 + src/main/main.cpp | 846 +++++++++++++++----------------- src/main/migration/Helpers.h | 295 +++++++++++ src/main/migration/Migrations.h | 215 ++++++++ 4 files changed, 906 insertions(+), 462 deletions(-) create mode 100644 src/main/migration/Helpers.h create mode 100644 src/main/migration/Migrations.h diff --git a/README.md b/README.md index 6794e90e1..358de6088 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,18 @@ so: ./clio_migrator ``` +#### Repair and Resume modes +The migrator will fail if some transactions are corrupted or missing from the cassandra DB. To battle this the migrator can be ran in `repair` mode. +If enabled via the `-repair host:port` argument the migrator will attempt to download the transaction from the specified Clio/`rippled` server and write it to the DB. +```bash +./clio_migrator path/to/config --repair 127.0.0.1:6006 # repair from `rippled` serving on 127.0.0.1:6006 +``` + +If the migrator failed and exit (or crashed) you can resume from where it stopped by using the `--resume` option. +```bash +./clio_migrator path/to/config --resume # can be used together with --repair option +``` + ### OPTIONAL: running the verifier After the migration completes, it is optional to perform a database verification to ensure the URIs are migrated correctly. Again, use the old config file you copied in Step 0 above. diff --git a/src/main/main.cpp b/src/main/main.cpp index baa0685fc..175994083 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -3,529 +3,396 @@ #include #include #include
+#include
+#include
#include #include -#include -#include #include -#include +#include #include #include -static std::uint32_t const MAX_RETRIES = 5; -static std::chrono::seconds const WAIT_TIME = std::chrono::seconds(60); -static std::uint32_t const NFT_WRITE_BATCH_SIZE = 10000; - -static void -wait( - boost::asio::steady_timer& timer, - std::string const& reason, - std::chrono::seconds timeout = WAIT_TIME) +class Step1Impl { - clio::LogService::info() << reason << ". Waiting then retrying"; - timer.expires_after(timeout); - timer.wait(); - clio::LogService::info() << "Done waiting"; -} + std::string tag_; + std::shared_ptr backend_; + std::reference_wrapper resumeProvider_; + boost::json::object resumeData_; -static std::optional -doRequestFromRippled( - clio::Config const& config, - boost::json::object const& request) -{ - auto source = config.array("etl_sources").at(0); - auto const ip = source.value("ip"); - auto const wsPort = source.value("ws_port"); + boost::asio::steady_timer timer_; + std::optional repairAddress_; - clio::LogService::debug() - << "Attempting to forward request to tx. " - << "request = " << boost::json::serialize(request); - - boost::json::object response; - - namespace beast = boost::beast; - namespace http = beast::http; - namespace websocket = beast::websocket; - namespace net = boost::asio; - using tcp = boost::asio::ip::tcp; - - try +public: + Step1Impl( + std::string tag, + boost::asio::io_context& ioc, + std::shared_ptr backend, + ResumeContextProvider& resumeProvider, + boost::json::object resumeData, + std::optional repairAddress) + : tag_{std::move(tag)} + , backend_{backend} + , resumeProvider_{std::ref(resumeProvider)} + , resumeData_{std::move(resumeData)} + , timer_{ioc} + , repairAddress_{repairAddress} { - boost::asio::io_context ioc; - tcp::resolver resolver{ioc}; + } - auto ws = std::make_unique>(ioc); - auto const results = resolver.resolve(ip, wsPort); + void + perform( + boost::asio::yield_context yield, + Backend::LedgerRange const& ledgerRange) + { + /* + * Step 1 - Look at all NFT transactions recorded in + * `nf_token_transactions` and reload any NFTokenMint transactions. + * These will contain the URI of any tokens that were minted after our + * start sequence. We look at transactions for this step instead of + * directly at the tokens in `nf_tokens` because we also want to cover + * the extreme edge case of a token that is re-minted with a different + * URI. + */ + std::vector toWrite; - ws->next_layer().expires_after(std::chrono::seconds(15)); - ws->next_layer().connect(results); + clio::LogService::info() << "Running " << tag_; + resumeProvider_.get().write({tag_, {}}); // at the start of step1 - ws->handshake(ip, "/"); - ws->write(net::buffer(boost::json::serialize(request))); + std::stringstream query; + query << "SELECT hash FROM " << backend_->tablePrefix() + << "nf_token_transactions"; + CassStatement* nftTxQuery = cass_statement_new(query.str().c_str(), 0); + cass_statement_set_paging_size(nftTxQuery, 1000); + cass_bool_t morePages = cass_true; - beast::flat_buffer buffer; - ws->read(buffer); - - auto begin = static_cast(buffer.data().data()); - auto end = begin + buffer.data().size(); - auto parsed = boost::json::parse(std::string(begin, end)); - - if (!parsed.is_object()) + if (not resumeData_.empty() and resumeData_.contains("token") and + resumeData_.at("token").is_string() and + not resumeData_.at("token").as_string().empty()) { - clio::LogService::error() - << "Error parsing response: " << std::string{begin, end}; - return {}; + clio::LogService::info() << " -- Restoring previous state.."; + + auto encodedState = + std::string{resumeData_.at("token").as_string().c_str()}; + auto state = ripple::base64_decode(encodedState); + + cass_statement_set_paging_state_token( + nftTxQuery, state.c_str(), state.size()); + + clio::LogService::info() + << " Resuming from page " << encodedState; } - return parsed.as_object(); - } - catch (std::exception const& e) - { - clio::LogService::fatal() << "Encountered exception : " << e.what(); - return {}; - } -} - -static std::optional -requestFromRippled( - boost::asio::steady_timer& timer, - clio::Config const& config, - boost::json::object const& request, - std::uint32_t const attempts = 0) -{ - auto response = doRequestFromRippled(config, request); - if (response.has_value()) - return response; - - if (attempts >= MAX_RETRIES) - return std::nullopt; - - wait(timer, "Failed to request from rippled", std::chrono::seconds{1}); - return requestFromRippled(timer, config, request, attempts + 1); -} - -static std::string -hexStringToBinaryString(std::string hex) -{ - auto blob = ripple::strUnHex(hex); - std::string strBlob; - for (auto c : *blob) - strBlob += c; - return strBlob; -} - -static void -maybeWriteTransaction( - Backend::CassandraBackend& backend, - std::optional const& tx) -{ - if (!tx.has_value()) - throw std::runtime_error("Could not repair transaction"); - - auto package = tx.value(); - if (!package.contains("result") || !package.at("result").is_object() || - package.at("result").as_object().contains("error")) - throw std::runtime_error("Received non-success response from rippled"); - - auto data = package.at("result").as_object(); - - auto const date = data.at("date").as_int64(); - auto const ledgerIndex = data.at("ledger_index").as_int64(); - auto hashStr = hexStringToBinaryString(data.at("hash").as_string().c_str()); - auto metaStr = hexStringToBinaryString(data.at("meta").as_string().c_str()); - auto txStr = hexStringToBinaryString(data.at("tx").as_string().c_str()); - - backend.writeTransaction( - std::move(hashStr), - ledgerIndex, - date, - std::move(txStr), - std::move(metaStr)); - backend.sync(); -} - -static void -repairCorruptedTx( - boost::asio::steady_timer& timer, - clio::Config const& config, - Backend::CassandraBackend& backend, - ripple::uint256 const& hash) -{ - clio::LogService::info() << " - repairing " << hash; - auto const data = requestFromRippled( - timer, - config, + // For all NFT txs, paginated in groups of 1000... + while (morePages) { - {"method", "tx"}, - {"transaction", to_string(hash)}, - {"binary", true}, - }); + CassResult const* result = + doTryGetTxPageResult(nftTxQuery, timer_, backend_); - maybeWriteTransaction(backend, data); -} + std::vector txHashes; -static std::vector -doNFTWrite( - std::vector& nfts, - Backend::CassandraBackend& backend, - std::string const& tag) -{ - auto const size = nfts.size(); - if (size == 0) - return nfts; - backend.writeNFTs(std::move(nfts)); - backend.sync(); - clio::LogService::info() << tag << ": Wrote " << size << " records"; - return {}; -} - -static std::vector -maybeDoNFTWrite( - std::vector& nfts, - Backend::CassandraBackend& backend, - std::string const& tag) -{ - if (nfts.size() < NFT_WRITE_BATCH_SIZE) - return nfts; - return doNFTWrite(nfts, backend, tag); -} - -static std::vector -doTryFetchTransactions( - boost::asio::steady_timer& timer, - Backend::CassandraBackend& backend, - std::vector const& hashes, - boost::asio::yield_context& yield, - std::uint32_t const attempts = 0) -{ - try - { - return backend.fetchTransactions(hashes, yield); - } - catch (Backend::DatabaseTimeout const& e) - { - if (attempts >= MAX_RETRIES) - throw e; - - wait(timer, "Transactions read error"); - return doTryFetchTransactions( - timer, backend, hashes, yield, attempts + 1); - } -} - -static Backend::LedgerPage -doTryFetchLedgerPage( - boost::asio::steady_timer& timer, - Backend::CassandraBackend& backend, - std::optional const& cursor, - std::uint32_t const sequence, - boost::asio::yield_context& yield, - std::uint32_t const attempts = 0) -{ - try - { - return backend.fetchLedgerPage(cursor, sequence, 10000, false, yield); - } - catch (Backend::DatabaseTimeout const& e) - { - if (attempts >= MAX_RETRIES) - throw e; - - wait(timer, "Page read error"); - return doTryFetchLedgerPage( - timer, backend, cursor, sequence, yield, attempts + 1); - } -} - -static const CassResult* -doTryGetTxPageResult( - CassStatement* const query, - boost::asio::steady_timer& timer, - Backend::CassandraBackend& backend, - std::uint32_t const attempts = 0) -{ - CassFuture* fut = cass_session_execute(backend.cautionGetSession(), query); - CassResult const* result = cass_future_get_result(fut); - cass_future_free(fut); - - if (result != nullptr) - return result; - - if (attempts >= MAX_RETRIES) - throw std::runtime_error("Already retried too many times"); - - wait(timer, "Unexpected empty result from tx paging"); - return doTryGetTxPageResult(query, timer, backend, attempts + 1); -} - -static void -doMigrationStepOne( - clio::Config const& config, - Backend::CassandraBackend& backend, - boost::asio::steady_timer& timer, - boost::asio::yield_context& yield, - Backend::LedgerRange const& ledgerRange, - bool repairEnabled = false) -{ - /* - * Step 1 - Look at all NFT transactions recorded in - * `nf_token_transactions` and reload any NFTokenMint transactions. These - * will contain the URI of any tokens that were minted after our start - * sequence. We look at transactions for this step instead of directly at - * the tokens in `nf_tokens` because we also want to cover the extreme - * edge case of a token that is re-minted with a different URI. - */ - std::string const stepTag = "Step 1 - transaction loading"; - std::vector toWrite; - - std::stringstream query; - query << "SELECT hash FROM " << backend.tablePrefix() - << "nf_token_transactions"; - CassStatement* nftTxQuery = cass_statement_new(query.str().c_str(), 0); - cass_statement_set_paging_size(nftTxQuery, 1000); - cass_bool_t morePages = cass_true; - - // For all NFT txs, paginated in groups of 1000... - while (morePages) - { - CassResult const* result = - doTryGetTxPageResult(nftTxQuery, timer, backend); - - std::vector txHashes; - - // For each tx in page... - CassIterator* txPageIterator = cass_iterator_from_result(result); - while (cass_iterator_next(txPageIterator)) - { - cass_byte_t const* buf; - std::size_t bufSize; - - CassError const rc = cass_value_get_bytes( - cass_row_get_column(cass_iterator_get_row(txPageIterator), 0), - &buf, - &bufSize); - if (rc != CASS_OK) + // For each tx in page... + CassIterator* txPageIterator = cass_iterator_from_result(result); + while (cass_iterator_next(txPageIterator)) { - cass_iterator_free(txPageIterator); - cass_result_free(result); - cass_statement_free(nftTxQuery); - throw std::runtime_error( - "Could not retrieve hash from nf_token_transactions"); + cass_byte_t const* buf; + std::size_t bufSize; + + CassError const rc = cass_value_get_bytes( + cass_row_get_column( + cass_iterator_get_row(txPageIterator), 0), + &buf, + &bufSize); + if (rc != CASS_OK) + { + cass_iterator_free(txPageIterator); + cass_result_free(result); + cass_statement_free(nftTxQuery); + throw std::runtime_error( + "Could not retrieve hash from nf_token_transactions"); + } + + txHashes.push_back(ripple::uint256::fromVoid(buf)); } - txHashes.push_back(ripple::uint256::fromVoid(buf)); - } + auto txs = + doTryFetchTransactions(timer_, backend_, txHashes, yield); + if (txs.size() != txHashes.size()) + throw std::runtime_error( + "Amount of hashes does not match amount of retrieved " + "transactions"); - auto txs = doTryFetchTransactions(timer, backend, txHashes, yield); - if (txs.size() != txHashes.size()) - throw std::runtime_error( - "Amount of hashes does not match amount of retrieved " - "transactions"); - - for (int32_t idx = 0; idx < txHashes.size(); ++idx) - { - auto const& tx = txs.at(idx); - auto const& hash = txHashes.at(idx); - - if (tx.ledgerSequence > ledgerRange.maxSequence) - continue; - - try + for (int32_t idx = 0; idx < txHashes.size(); ++idx) { - ripple::STTx const sttx{ripple::SerialIter{ - tx.transaction.data(), tx.transaction.size()}}; - if (sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT) + auto const& tx = txs.at(idx); + auto const& hash = txHashes.at(idx); + + if (tx.ledgerSequence > ledgerRange.maxSequence) continue; - ripple::TxMeta const txMeta{ - sttx.getTransactionID(), tx.ledgerSequence, tx.metadata}; - toWrite.push_back( - std::get<1>(getNFTDataFromTx(txMeta, sttx)).value()); + try + { + ripple::STTx const sttx{ripple::SerialIter{ + tx.transaction.data(), tx.transaction.size()}}; + if (sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT) + continue; + + ripple::TxMeta const txMeta{ + sttx.getTransactionID(), + tx.ledgerSequence, + tx.metadata}; + toWrite.push_back( + std::get<1>(getNFTDataFromTx(txMeta, sttx)).value()); + } + catch (std::exception const& e) + { + clio::LogService::warn() + << "Corrupted tx detected: " << hash; + std::cerr << "Corrupted tx detected: " << hash << std::endl; + + if (not repairAddress_.has_value()) + { + clio::LogService::fatal() + << "Not attempting to repair. Rerun with `--repair " + "[host:port]` to repair corrupted transactions."; + exit(-1); + } + + repairCorruptedTx(timer_, *repairAddress_, backend_, hash); + + auto maybeTx = backend_->fetchTransaction(hash, yield); + + if (!maybeTx.has_value()) + { + clio::LogService::fatal() + << "Could not fetch written transaction for hash " + << hash << "; Repair failed."; + exit(-1); + } + + txs[idx] = maybeTx.value(); + --idx; // repeat the try section for the repaired tx + std::cerr << "+ tx repaired: " << hash << std::endl; + } } - catch (std::exception const& e) + + toWrite = maybeDoNFTWrite(toWrite, backend_, tag_); + + morePages = cass_result_has_more_pages(result); + if (morePages) { - clio::LogService::warn() << "Corrupted tx detected: " << hash; - std::cerr << "Corrupted tx detected: " << hash << std::endl; + char const* state = nullptr; + std::size_t sz; + cass_result_paging_state_token(result, &state, &sz); + cass_statement_set_paging_state_token(nftTxQuery, state, sz); - if (not repairEnabled) + // only update resume token if data was actually written to DB + if (toWrite.empty()) { - clio::LogService::fatal() - << "Not attempting to repair. Rerun with -repair to " - "repair corrupted transactions."; - exit(-1); + resumeProvider_.get().write( + {tag_, + {{"token", + ripple::base64_encode(std::string{state, sz})}}}); } - - repairCorruptedTx(timer, config, backend, hash); - - auto maybeTx = backend.fetchTransaction(hash, yield); - - if (!maybeTx.has_value()) - { - clio::LogService::fatal() - << "Could not fetch written transaction for hash " - << hash << "; Repair failed."; - exit(-1); - } - - txs[idx] = maybeTx.value(); - --idx; // repeat the try section for the repaired tx - std::cerr << "+ tx repaired: " << hash << std::endl; } + + cass_iterator_free(txPageIterator); + cass_result_free(result); } - toWrite = maybeDoNFTWrite(toWrite, backend, stepTag); + cass_statement_free(nftTxQuery); + doNFTWrite(toWrite, backend_, tag_); + } +}; - morePages = cass_result_has_more_pages(result); - if (morePages) - cass_statement_set_paging_state(nftTxQuery, result); - cass_iterator_free(txPageIterator); - cass_result_free(result); +class Step2Impl +{ + std::string tag_; + std::shared_ptr backend_; + std::reference_wrapper resumeProvider_; + boost::json::object resumeData_; + + boost::asio::steady_timer timer_; + +public: + Step2Impl( + std::string tag, + boost::asio::io_context& ioc, + std::shared_ptr backend, + ResumeContextProvider& resumeProvider, + boost::json::object resumeData) + : tag_{std::move(tag)} + , backend_{backend} + , resumeProvider_{std::ref(resumeProvider)} + , resumeData_{std::move(resumeData)} + , timer_{ioc} + { } - cass_statement_free(nftTxQuery); - doNFTWrite(toWrite, backend, stepTag); -} - -static void -doMigrationStepTwo( - Backend::CassandraBackend& backend, - boost::asio::steady_timer& timer, - boost::asio::yield_context& yield, - Backend::LedgerRange const& ledgerRange) -{ - /* - * Step 2 - Pull every object from our initial ledger and load all NFTs - * found in any NFTokenPage object. Prior to this migration, we were not - * pulling out NFTs from the initial ledger, so all these NFTs would be - * missed. This will also record the URI of any NFTs minted prior to the - * start sequence. - */ - std::string const stepTag = "Step 2 - initial ledger loading"; - std::vector toWrite; - std::optional cursor; - - // For each object page in initial ledger - do + void + perform( + boost::asio::yield_context yield, + Backend::LedgerRange const& ledgerRange) { - auto const page = doTryFetchLedgerPage( - timer, backend, cursor, ledgerRange.minSequence, yield); + /* + * Step 2 - Pull every object from our initial ledger and load all NFTs + * found in any NFTokenPage object. Prior to this migration, we were not + * pulling out NFTs from the initial ledger, so all these NFTs would be + * missed. This will also record the URI of any NFTs minted prior to the + * start sequence. + */ + std::vector toWrite; + std::optional cursor; - // For each object in page - for (auto const& object : page.objects) + clio::LogService::info() << "Running " << tag_; + resumeProvider_.get().write({tag_, {}}); // at the start of step2 + + if (not resumeData_.empty() and resumeData_.contains("cursor") and + resumeData_.at("cursor").is_string() and + not resumeData_.at("cursor").as_string().empty()) { - auto const objectNFTs = getNFTDataFromObj( - ledgerRange.minSequence, - std::string(object.key.begin(), object.key.end()), - std::string(object.blob.begin(), object.blob.end())); - toWrite.insert(toWrite.end(), objectNFTs.begin(), objectNFTs.end()); + clio::LogService::info() << " -- Restoring previous state.."; + cursor = + ripple::strUnHex(resumeData_.at("cursor").as_string().c_str()); + clio::LogService::info() << " Resuming from " << *cursor; } - toWrite = maybeDoNFTWrite(toWrite, backend, stepTag); - cursor = page.cursor; - } while (cursor.has_value()); + // For each object page in initial ledger + do + { + auto const page = doTryFetchLedgerPage( + timer_, backend_, cursor, ledgerRange.minSequence, yield); - doNFTWrite(toWrite, backend, stepTag); -} + // For each object in page + for (auto const& object : page.objects) + { + auto const objectNFTs = getNFTDataFromObj( + ledgerRange.minSequence, + std::string(object.key.begin(), object.key.end()), + std::string(object.blob.begin(), object.blob.end())); + toWrite.insert( + toWrite.end(), objectNFTs.begin(), objectNFTs.end()); + } -static void -doMigrationStepThree(Backend::CassandraBackend& backend) + toWrite = maybeDoNFTWrite(toWrite, backend_, tag_); + cursor = page.cursor; + + // only update resume token if data was actually written to DB + if (cursor.has_value() && toWrite.empty()) + resumeProvider_.get().write( + {tag_, {{"cursor", std::string{ripple::strHex(*cursor)}}}}); + + } while (cursor.has_value()); + + doNFTWrite(toWrite, backend_, tag_); + } +}; + +class Step3Impl { - /* - * Step 3 - Drop the old `issuer_nf_tokens` table, which is replaced by - * `issuer_nf_tokens_v2`. Normally, we should probably not drop old tables - * in migrations, but here it is safe since the old table wasn't yet being - * used to serve any data anyway. - */ - std::stringstream query; - query << "DROP TABLE " << backend.tablePrefix() << "issuer_nf_tokens"; - CassStatement* issuerDropTableQuery = - cass_statement_new(query.str().c_str(), 0); - CassFuture* fut = - cass_session_execute(backend.cautionGetSession(), issuerDropTableQuery); - CassError const rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(issuerDropTableQuery); - backend.sync(); + std::string tag_; + std::shared_ptr backend_; + std::reference_wrapper resumeProvider_; - if (rc != CASS_OK) - clio::LogService::warn() << "Could not drop old issuer_nf_tokens " - "table. If it still exists, " - "you should drop it yourself\n"; -} - -static void -doMigration( - clio::Config const& config, - Backend::CassandraBackend& backend, - boost::asio::steady_timer& timer, - boost::asio::yield_context& yield, - bool repairEnabled = false) -{ - clio::LogService::info() << "Beginning migration"; - auto const ledgerRange = backend.hardFetchLedgerRangeNoThrow(yield); - - /* - * Step 0 - If we haven't downloaded the initial ledger yet, just short - * circuit. - */ - if (!ledgerRange) +public: + Step3Impl( + std::string tag, + std::shared_ptr backend, + ResumeContextProvider& resumeProvider) + : tag_{std::move(tag)} + , backend_{backend} + , resumeProvider_{std::ref(resumeProvider)} { - clio::LogService::info() << "There is no data to migrate"; - return; } - doMigrationStepOne( - config, backend, timer, yield, *ledgerRange, repairEnabled); - clio::LogService::info() << "\nStep 1 done!\n"; + void + perform() + { + /* + * Step 3 - Drop the old `issuer_nf_tokens` table, which is replaced by + * `issuer_nf_tokens_v2`. Normally, we should probably not drop old + * tables in migrations, but here it is safe since the old table wasn't + * yet being used to serve any data anyway. + */ + clio::LogService::info() << "Running " << tag_; + resumeProvider_.get().write({tag_, {}}); // at the start of step3 - doMigrationStepTwo(backend, timer, yield, *ledgerRange); - clio::LogService::info() << "\nStep 2 done!\n"; + std::stringstream query; + query << "DROP TABLE " << backend_->tablePrefix() << "issuer_nf_tokens"; + CassStatement* issuerDropTableQuery = + cass_statement_new(query.str().c_str(), 0); + CassFuture* fut = cass_session_execute( + backend_->cautionGetSession(), issuerDropTableQuery); + CassError const rc = cass_future_error_code(fut); + cass_future_free(fut); + cass_statement_free(issuerDropTableQuery); + backend_->sync(); - doMigrationStepThree(backend); - clio::LogService::info() << "\nStep 3 done!\n"; - - clio::LogService::info() - << "\nCompleted migration from " << ledgerRange->minSequence << " to " - << ledgerRange->maxSequence << "!\n"; -} + if (rc != CASS_OK) + clio::LogService::warn() << "Could not drop old issuer_nf_tokens " + "table. If it still exists, " + "you should drop it yourself\n"; + } +}; static void usage() { std::cerr << "\nUsage:\n" - << " with repair: clio_migrator path/to/config -repair 2> " - "repair.log\n" - << " without repair: clio_migrator path/to/config" << std::endl; + << " without repair: clio_migrator path/to/config\n" + << " with repair: clio_migrator path/to/config --repair " + "127.0.0.1:6006 2> repair.log\n" + << "resume previous run: clio_migrator path/to/config --resume\n" + << " use both together: clio_migrator path/to/config -Rr " + "192.168.0.10:51233" + << std::endl; } int main(int argc, char* argv[]) +try { - if (argc < 2) + namespace po = boost::program_options; + auto repairAddress = std::optional{}; + auto resumeEnabled = false; + + // clang-format off + po::options_description description("Options"); + description.add_options() + ("help,h", "print help message and exit") + ("resume,R", "attempt to resume with previous progress") + ("repair,r", po::value(), "specify repair server. format: `host:port`") + ("conf,c", po::value(), "specify a configuration file") + ; + // clang-format on + po::positional_options_description positional; + positional.add("conf", 1); + + po::variables_map parsed; + po::store( + po::command_line_parser(argc, argv) + .options(description) + .positional(positional) + .run(), + parsed); + po::notify(parsed); + + if (parsed.count("conf") == 0u) { - std::cerr << "Didn't provide config path." << std::endl; + std::cout << description << std::endl; usage(); - return EXIT_FAILURE; + std::exit(EXIT_FAILURE); } - auto repairEnabled = false; - if (argc >= 3) + if (parsed.count("help") != 0u) { - if (not boost::iequals(argv[2], "-repair")) - { - std::cerr << "Final argument must be `-repair`." << std::endl; - usage(); - return EXIT_FAILURE; - } - clio::LogService::info() - << "Enabling REPAIR mode. Missing/broken transactions will be " - "downloaded from rippled and overwritten."; - repairEnabled = true; + std::cout << description << std::endl; + usage(); + std::exit(EXIT_SUCCESS); } - std::string const configPath = argv[1]; + std::string const configPath = parsed["conf"].as(); auto const config = clio::ConfigReader::open(configPath); if (!config) { @@ -543,20 +410,75 @@ main(int argc, char* argv[]) return EXIT_FAILURE; } - boost::asio::io_context ioc; - boost::asio::steady_timer timer{ioc}; - auto workGuard = boost::asio::make_work_guard(ioc); - auto backend = Backend::make_Backend(ioc, config); + if (parsed.count("repair") != 0u) + { + repairAddress = parsed["repair"].as(); + parseHostPort(*repairAddress); // throws on wrong format - boost::asio::spawn( + clio::LogService::info() + << "Enabling REPAIR mode. Missing/broken transactions will be " + "downloaded from Clio/rippled at " + << *repairAddress << " and overwritten."; + } + + if (parsed.count("resume") != 0u) + { + resumeEnabled = true; + clio::LogService::info() + << "Enabling RESUME mode. Will attempt to restore previously saved " + "state from `resume.json`."; + } + + boost::asio::io_context ioc; + auto backend = Backend::make_Backend(ioc, config); + auto resumeProvider = ResumeContextProvider( + std::filesystem::current_path() / "resume.json", resumeEnabled); + auto migrator = Migrator{ ioc, - [&config, &backend, &workGuard, &timer, &repairEnabled]( - boost::asio::yield_context yield) { - doMigration(config, *backend, timer, yield, repairEnabled); - workGuard.reset(); - }); + config, + backend, + resumeProvider, + { + Step( + "Step 1 - transaction loading", + [&](auto tag, + auto yield, + auto const& ledgerRange, + auto resumeData) { + Step1Impl( + tag, + ioc, + backend, + resumeProvider, + resumeData, + repairAddress) + .perform(yield, ledgerRange); + }), + Step( + "Step 2 - initial ledger loading", + [&](auto tag, + auto yield, + auto const& ledgerRange, + auto resumeData) { + Step2Impl(tag, ioc, backend, resumeProvider, resumeData) + .perform(yield, ledgerRange); + }), + Step( + "Step 3 - cleanup", + [&](auto tag, + auto yield, + auto const& ledgerRange, + auto resumeData) { + Step3Impl(tag, backend, resumeProvider).perform(); + }), + }}; ioc.run(); + clio::LogService::info() << "SUCCESS!"; return EXIT_SUCCESS; } +catch (std::exception const& ex) +{ + std::cerr << ex.what() << std::endl; +} diff --git a/src/main/migration/Helpers.h b/src/main/migration/Helpers.h new file mode 100644 index 000000000..8b9acd5b6 --- /dev/null +++ b/src/main/migration/Helpers.h @@ -0,0 +1,295 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +static std::uint32_t const MAX_RETRIES = 5; +static std::chrono::seconds const WAIT_TIME = std::chrono::seconds(60); +static std::uint32_t const NFT_WRITE_BATCH_SIZE = 10000; + +static void +wait( + boost::asio::steady_timer& timer, + std::string const& reason, + std::chrono::seconds timeout = WAIT_TIME) +{ + clio::LogService::info() << reason << ". Waiting then retrying"; + timer.expires_after(timeout); + timer.wait(); + clio::LogService::info() << "Done waiting"; +} + +static std::pair +parseHostPort(std::string input) +{ + std::vector components; + boost::split(components, input, boost::is_any_of(":")); + + if (components.size() != 2) + throw std::logic_error( + "Host and port must be specified as `host:port` string. Got " + "instead: `" + + input + "`"); + + return std::make_pair(components.at(0), components.at(1)); +} + +static std::optional +doRequestFromRippled( + std::string repairAddress, + boost::json::object const& request) +{ + auto const [ip, wsPort] = parseHostPort(repairAddress); + clio::LogService::debug() + << "Attempting to forward request to repair server. " + << "request = " << boost::json::serialize(request); + + boost::json::object response; + + namespace beast = boost::beast; + namespace http = beast::http; + namespace websocket = beast::websocket; + namespace net = boost::asio; + using tcp = boost::asio::ip::tcp; + + try + { + boost::asio::io_context ioc; + tcp::resolver resolver{ioc}; + + auto ws = std::make_unique>(ioc); + auto const results = resolver.resolve(ip, wsPort); + + ws->next_layer().expires_after(std::chrono::seconds(15)); + ws->next_layer().connect(results); + + ws->handshake(ip, "/"); + ws->write(net::buffer(boost::json::serialize(request))); + + beast::flat_buffer buffer; + ws->read(buffer); + + auto begin = static_cast(buffer.data().data()); + auto end = begin + buffer.data().size(); + auto parsed = boost::json::parse(std::string(begin, end)); + + if (!parsed.is_object()) + { + clio::LogService::error() + << "Error parsing response: " << std::string{begin, end}; + return {}; + } + + return parsed.as_object(); + } + catch (std::exception const& e) + { + clio::LogService::fatal() << "Encountered exception : " << e.what(); + return {}; + } +} + +static std::optional +requestFromRippled( + boost::asio::steady_timer& timer, + std::string repairAddress, + boost::json::object const& request, + std::uint32_t const attempts = 0) +{ + auto response = doRequestFromRippled(repairAddress, request); + if (response.has_value()) + return response; + + if (attempts >= MAX_RETRIES) + return std::nullopt; + + wait(timer, "Failed to request from rippled", std::chrono::seconds{1}); + return requestFromRippled(timer, repairAddress, request, attempts + 1); +} + +static std::string +hexStringToBinaryString(std::string hex) +{ + auto blob = ripple::strUnHex(hex); + std::string strBlob; + for (auto c : *blob) + strBlob += c; + return strBlob; +} + +static void +maybeWriteTransaction( + std::shared_ptr const& backend, + std::optional const& tx) +{ + if (!tx.has_value()) + throw std::runtime_error("Could not repair transaction"); + + auto package = tx.value(); + if (!package.contains("result") || !package.at("result").is_object() || + package.at("result").as_object().contains("error")) + throw std::runtime_error("Received non-success response from rippled"); + + auto data = package.at("result").as_object(); + + auto const date = data.at("date").as_int64(); + auto const ledgerIndex = data.at("ledger_index").as_int64(); + auto hashStr = hexStringToBinaryString(data.at("hash").as_string().c_str()); + auto metaStr = hexStringToBinaryString(data.at("meta").as_string().c_str()); + auto txStr = hexStringToBinaryString(data.at("tx").as_string().c_str()); + + backend->writeTransaction( + std::move(hashStr), + ledgerIndex, + date, + std::move(txStr), + std::move(metaStr)); + backend->sync(); +} + +static void +repairCorruptedTx( + boost::asio::steady_timer& timer, + std::string repairAddress, + std::shared_ptr const& backend, + ripple::uint256 const& hash) +{ + clio::LogService::info() << " - repairing " << hash; + auto const data = requestFromRippled( + timer, + repairAddress, + { + {"method", "tx"}, + {"transaction", to_string(hash)}, + {"binary", true}, + }); + + maybeWriteTransaction(backend, data); +} + +static std::vector +doNFTWrite( + std::vector& nfts, + std::shared_ptr const& backend, + std::string const& tag) +{ + auto const size = nfts.size(); + if (size == 0) + return nfts; + backend->writeNFTs(std::move(nfts)); + backend->sync(); + clio::LogService::info() << tag << ": Wrote " << size << " records"; + return {}; +} + +static std::vector +maybeDoNFTWrite( + std::vector& nfts, + std::shared_ptr const& backend, + std::string const& tag) +{ + if (nfts.size() < NFT_WRITE_BATCH_SIZE) + return nfts; + return doNFTWrite(nfts, backend, tag); +} + +static std::vector +doTryFetchTransactions( + boost::asio::steady_timer& timer, + std::shared_ptr const& backend, + std::vector const& hashes, + boost::asio::yield_context& yield, + std::uint32_t const attempts = 0) +{ + try + { + return backend->fetchTransactions(hashes, yield); + } + catch (Backend::DatabaseTimeout const& e) + { + if (attempts >= MAX_RETRIES) + throw e; + + wait(timer, "Transactions read error"); + return doTryFetchTransactions( + timer, backend, hashes, yield, attempts + 1); + } +} + +static Backend::LedgerPage +doTryFetchLedgerPage( + boost::asio::steady_timer& timer, + std::shared_ptr const& backend, + std::optional const& cursor, + std::uint32_t const sequence, + boost::asio::yield_context& yield, + std::uint32_t const attempts = 0) +{ + try + { + return backend->fetchLedgerPage(cursor, sequence, 10000, false, yield); + } + catch (Backend::DatabaseTimeout const& e) + { + if (attempts >= MAX_RETRIES) + throw e; + + wait(timer, "Page read error"); + return doTryFetchLedgerPage( + timer, backend, cursor, sequence, yield, attempts + 1); + } +} + +static const CassResult* +doTryGetTxPageResult( + CassStatement* const query, + boost::asio::steady_timer& timer, + std::shared_ptr const& backend, + std::uint32_t const attempts = 0) +{ + CassFuture* fut = cass_session_execute(backend->cautionGetSession(), query); + CassResult const* result = cass_future_get_result(fut); + cass_future_free(fut); + + if (result != nullptr) + return result; + + if (attempts >= MAX_RETRIES) + throw std::runtime_error("Already retried too many times"); + + wait(timer, "Unexpected empty result from tx paging"); + return doTryGetTxPageResult(query, timer, backend, attempts + 1); +} diff --git a/src/main/migration/Migrations.h b/src/main/migration/Migrations.h new file mode 100644 index 000000000..02ccdf4fe --- /dev/null +++ b/src/main/migration/Migrations.h @@ -0,0 +1,215 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include +#include +#include + +#include +#include + +#include +#include +#include + +struct ResumeContext +{ + std::string tag; + boost::json::object data; + + ResumeContext(std::string tag, boost::json::object data) + : tag{std::move(tag)}, data{std::move(data)} + { + } +}; + +class ResumeContextProvider +{ + std::filesystem::path path_; + bool enable_ = false; + +public: + ResumeContextProvider(std::filesystem::path path, bool enable) + : path_{path}, enable_{enable} + { + if (enable) + clio::LogService::info() + << "Resume context path: " << path_.string(); + } + + std::optional + load() + { + if (not enable_) + return std::nullopt; + + if (not std::filesystem::exists(path_)) + return std::nullopt; + + auto const is = std::ifstream{path_.string()}; + if (not is.is_open()) + return std::nullopt; + + auto buffer = std::stringstream{}; + buffer << is.rdbuf(); + + auto const value = boost::json::parse(buffer.str()); + if (not value.is_object()) + return std::nullopt; + + auto const& obj = value.as_object(); + if (not obj.contains("step") or not obj.contains("state")) + return std::nullopt; + + return std::make_optional( + std::string{obj.at("step").as_string().c_str()}, + obj.at("state").as_object()); + } + + void + write(ResumeContext ctx) + { + std::ofstream os(path_.string()); + if (os.good()) + { + auto obj = boost::json::object{ + {"step", ctx.tag}, + {"state", ctx.data}, + }; + os << boost::json::serialize(obj) << std::endl; + } + } +}; + +class Step +{ + std::string tag_; + std::function + worker_; + +public: + Step(std::string tag, auto&& fn) + : tag_{std::move(tag)}, worker_{std::move(fn)} + { + } + + void + perform( + boost::asio::yield_context yield, + Backend::LedgerRange const& ledgerRange, + boost::json::object resume = {}) + { + worker_(tag_, yield, ledgerRange, std::move(resume)); + } + + std::string + tag() const + { + return tag_; + } +}; + +class Migrator +{ + std::reference_wrapper ioc_; + std::reference_wrapper config_; + std::shared_ptr backend_; + std::reference_wrapper resumeProvider_; + + boost::asio::steady_timer timer_; + std::vector steps_; + +public: + Migrator( + boost::asio::io_context& ioc, + clio::Config const& config, + std::shared_ptr backend, + ResumeContextProvider& resumeProvider, + std::vector steps) + : ioc_{std::ref(ioc)} + , config_{std::cref(config)} + , backend_{backend} + , resumeProvider_{std::ref(resumeProvider)} + , timer_{ioc} + , steps_{std::move(steps)} + { + boost::asio::spawn( + ioc, + [this, workGuard = boost::asio::make_work_guard(ioc)](auto yield) { + run(yield); + }); + } + +private: + void + run(boost::asio::yield_context yield) + { + clio::LogService::info() << "Beginning migration"; + auto const ledgerRange = backend_->hardFetchLedgerRangeNoThrow(yield); + + /* + * Step 0 - If we haven't downloaded the initial ledger yet, just short + * circuit. + */ + if (!ledgerRange) + { + clio::LogService::info() << "There is no data to migrate"; + return; + } + + auto resume = resumeProvider_.get().load(); + + for (auto& step : steps_) + { + if (resume) + { + if (resume->tag == step.tag()) + { + step.perform( + yield, + *ledgerRange, + resume->data); // resume if possible + } + else + { + clio::LogService::info() << "-- Skipping " << step.tag(); + continue; // skip this step + } + } + else + { + step.perform(yield, *ledgerRange); // start step from scratch + } + + clio::LogService::info() << step.tag() << " done!\n"; + resume = std::nullopt; // already used our resume state for + // previous step + } + + clio::LogService::info() + << "Completed migration from " << ledgerRange->minSequence << " to " + << ledgerRange->maxSequence << "!\n"; + } +};