diff --git a/src/main/main.cpp b/src/main/main.cpp index 24b38b6f7..e9fc4d930 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -3,8 +3,12 @@ #include #include #include
+#include #include +#include +#include +#include #include #include @@ -15,14 +19,151 @@ static std::chrono::seconds const WAIT_TIME = std::chrono::seconds(60); static std::uint32_t const NFT_WRITE_BATCH_SIZE = 10000; static void -wait(boost::asio::steady_timer& timer, std::string const& reason) +wait( + boost::asio::steady_timer& timer, + std::string const& reason, + std::chrono::seconds timeout = WAIT_TIME) { BOOST_LOG_TRIVIAL(info) << reason << ". Waiting then retrying"; - timer.expires_after(WAIT_TIME); + timer.expires_after(timeout); timer.wait(); BOOST_LOG_TRIVIAL(info) << "Done waiting"; } +static std::optional +doRequestFromRippled( + clio::Config const& config, + boost::json::object const& request) +{ + auto source = config.array("etl_sources").at(0); + auto const ip = source.value("ip"); + auto const wsPort = source.value("ws_port"); + + BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. " + << "request = " << boost::json::serialize(request); + + boost::json::object response; + + namespace beast = boost::beast; + namespace http = beast::http; + namespace websocket = beast::websocket; + namespace net = boost::asio; + using tcp = boost::asio::ip::tcp; + + try + { + boost::asio::io_context ioc; + tcp::resolver resolver{ioc}; + + auto ws = std::make_unique>(ioc); + auto const results = resolver.resolve(ip, wsPort); + + ws->next_layer().expires_after(std::chrono::seconds(15)); + ws->next_layer().connect(results); + + ws->handshake(ip, "/"); + ws->write(net::buffer(boost::json::serialize(request))); + + beast::flat_buffer buffer; + ws->read(buffer); + + auto begin = static_cast(buffer.data().data()); + auto end = begin + buffer.data().size(); + auto parsed = boost::json::parse(std::string(begin, end)); + + if (!parsed.is_object()) + { + BOOST_LOG_TRIVIAL(error) + << "Error parsing response: " << std::string{begin, end}; + return {}; + } + + return parsed.as_object(); + } + catch (std::exception const& e) + { + BOOST_LOG_TRIVIAL(fatal) << "Encountered exception : " << e.what(); + return {}; + } +} + +static std::optional +requestFromRippled( + boost::asio::steady_timer& timer, + clio::Config const& config, + boost::json::object const& request, + std::uint32_t const attempts = 0) +{ + auto response = doRequestFromRippled(config, request); + if (response.has_value()) + return response; + + if (attempts >= MAX_RETRIES) + return std::nullopt; + + wait(timer, "Failed to request from rippled", std::chrono::seconds{1}); + return requestFromRippled(timer, config, request, attempts + 1); +} + +static std::string +hexStringToBinaryString(std::string hex) +{ + auto blob = ripple::strUnHex(hex); + std::string strBlob; + for (auto c : *blob) + strBlob += c; + return strBlob; +} + +static void +maybeWriteTransaction( + Backend::CassandraBackend& backend, + std::optional const& tx) +{ + if (!tx.has_value()) + throw std::runtime_error("Could not repair transaction"); + + auto package = tx.value(); + if (!package.contains("result") || !package.at("result").is_object()) + throw std::runtime_error("Received non-success response from rippled"); + + auto data = package.at("result").as_object(); + + auto const date = data.at("date").as_int64(); + auto const ledgerIndex = data.at("ledger_index").as_int64(); + auto hashStr = hexStringToBinaryString(data.at("hash").as_string().c_str()); + auto metaStr = hexStringToBinaryString(data.at("meta").as_string().c_str()); + auto txStr = hexStringToBinaryString(data.at("tx").as_string().c_str()); + + backend.writeTransaction( + std::move(hashStr), + ledgerIndex, + date, + std::move(txStr), + std::move(metaStr)); + backend.sync(); +} + +static void +repairCorruptedTx( + boost::asio::steady_timer& timer, + clio::Config const& config, + Backend::CassandraBackend& backend, + ripple::uint256 const& hash) +{ + BOOST_LOG_TRIVIAL(info) << " - repairing " << hash; + auto const data = requestFromRippled( + timer, + config, + { + {"method", "tx"}, + {"transaction", to_string(hash)}, + {"binary", true}, + }); + + maybeWriteTransaction(backend, data); +} + static std::vector doNFTWrite( std::vector& nfts, @@ -119,10 +260,12 @@ doTryGetTxPageResult( static void doMigrationStepOne( + clio::Config const& config, Backend::CassandraBackend& backend, boost::asio::steady_timer& timer, boost::asio::yield_context& yield, - Backend::LedgerRange const& ledgerRange) + Backend::LedgerRange const& ledgerRange, + bool repairEnabled = false) { /* * Step 1 - Look at all NFT transactions recorded in @@ -173,23 +316,61 @@ doMigrationStepOne( txHashes.push_back(ripple::uint256::fromVoid(buf)); } - auto const txs = - doTryFetchTransactions(timer, backend, txHashes, yield); + auto txs = doTryFetchTransactions(timer, backend, txHashes, yield); + if (txs.size() != txHashes.size()) + throw std::runtime_error( + "Amount of hashes does not match amount of retrieved " + "transactions"); - for (auto const& tx : txs) + for (int32_t idx = 0; idx < txHashes.size(); ++idx) { + auto const& tx = txs.at(idx); + auto const& hash = txHashes.at(idx); + if (tx.ledgerSequence > ledgerRange.maxSequence) continue; - ripple::STTx const sttx{ripple::SerialIter{ - tx.transaction.data(), tx.transaction.size()}}; - if (sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT) - continue; + try + { + ripple::STTx const sttx{ripple::SerialIter{ + tx.transaction.data(), tx.transaction.size()}}; + if (sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT) + continue; - ripple::TxMeta const txMeta{ - sttx.getTransactionID(), tx.ledgerSequence, tx.metadata}; - toWrite.push_back( - std::get<1>(getNFTDataFromTx(txMeta, sttx)).value()); + ripple::TxMeta const txMeta{ + sttx.getTransactionID(), tx.ledgerSequence, tx.metadata}; + toWrite.push_back( + std::get<1>(getNFTDataFromTx(txMeta, sttx)).value()); + } + catch (std::exception const& e) + { + BOOST_LOG_TRIVIAL(warning) << "Corrupted tx detected: " << hash; + std::cerr << "Corrupted tx detected: " << hash << std::endl; + + if (not repairEnabled) + { + BOOST_LOG_TRIVIAL(fatal) + << "Not attempting to repair. Rerun with -repair to " + "repair corrupted transactions."; + exit(-1); + } + + repairCorruptedTx(timer, config, backend, hash); + + auto maybeTx = backend.fetchTransaction(hash, yield); + + if (!maybeTx.has_value()) + { + BOOST_LOG_TRIVIAL(fatal) + << "Could not fetch written transaction for hash " + << hash << "; Repair failed."; + exit(-1); + } + + txs[idx] = maybeTx.value(); + --idx; // repeat the try section for the repaired tx + std::cerr << "+ tx repaired: " << hash << std::endl; + } } toWrite = maybeDoNFTWrite(toWrite, backend, stepTag); @@ -274,9 +455,11 @@ doMigrationStepThree(Backend::CassandraBackend& backend) static void doMigration( + clio::Config const& config, Backend::CassandraBackend& backend, boost::asio::steady_timer& timer, - boost::asio::yield_context& yield) + boost::asio::yield_context& yield, + bool repairEnabled = false) { BOOST_LOG_TRIVIAL(info) << "Beginning migration"; auto const ledgerRange = backend.hardFetchLedgerRangeNoThrow(yield); @@ -291,7 +474,8 @@ doMigration( return; } - doMigrationStepOne(backend, timer, yield, *ledgerRange); + doMigrationStepOne( + config, backend, timer, yield, *ledgerRange, repairEnabled); BOOST_LOG_TRIVIAL(info) << "\nStep 1 done!\n"; doMigrationStepTwo(backend, timer, yield, *ledgerRange); @@ -305,15 +489,40 @@ doMigration( << ledgerRange->maxSequence << "!\n"; } +static void +usage() +{ + std::cerr << "\nUsage:\n" + << " with repair: clio_migrator path/to/config -repair 2> " + "repair.log\n" + << " without repair: clio_migrator path/to/config" << std::endl; +} + int main(int argc, char* argv[]) { if (argc < 2) { - std::cerr << "Didn't provide config path!" << std::endl; + std::cerr << "Didn't provide config path." << std::endl; + usage(); return EXIT_FAILURE; } + auto repairEnabled = false; + if (argc >= 3) + { + if (not boost::iequals(argv[2], "-repair")) + { + std::cerr << "Final argument must be `-repair`." << std::endl; + usage(); + return EXIT_FAILURE; + } + BOOST_LOG_TRIVIAL(info) + << "Enabling REPAIR mode. Missing/broken transactions will be " + "downloaded from rippled and overwritten."; + repairEnabled = true; + } + std::string const configPath = argv[1]; auto const config = clio::ConfigReader::open(configPath); if (!config) @@ -336,8 +545,10 @@ main(int argc, char* argv[]) auto backend = Backend::make_Backend(ioc, config); boost::asio::spawn( - ioc, [&backend, &workGuard, &timer](boost::asio::yield_context yield) { - doMigration(*backend, timer, yield); + ioc, + [&config, &backend, &workGuard, &timer, &repairEnabled]( + boost::asio::yield_context yield) { + doMigration(config, *backend, timer, yield, repairEnabled); workGuard.reset(); }); diff --git a/src/rpc/common/Validators.h b/src/rpc/common/Validators.h index 41546c68f..02a38eff5 100644 --- a/src/rpc/common/Validators.h +++ b/src/rpc/common/Validators.h @@ -230,7 +230,7 @@ public: * @brief Deduction guide to help disambiguate what it means to EqualTo a * "string" without specifying the type. */ -EqualTo(char const*)->EqualTo; +EqualTo(char const*) -> EqualTo; /** * @brief Validates that the value is one of the values passed in @@ -278,7 +278,7 @@ public: * @brief Deduction guide to help disambiguate what it means to OneOf a * few "strings" without specifying the type. */ -OneOf(std::initializer_list)->OneOf; +OneOf(std::initializer_list) -> OneOf; /** * @brief A meta-validator that specifies a list of specs to run against the