Resume functionality (#941)

This commit is contained in:
Alex Kremer
2023-10-19 14:00:28 +01:00
committed by GitHub
parent 4293b56aed
commit fe5a45a9c8
4 changed files with 906 additions and 462 deletions

View File

@@ -57,6 +57,18 @@ so:
./clio_migrator <config path>
```
#### Repair and Resume modes
The migrator will fail if some transactions are corrupted or missing from the cassandra DB. To battle this the migrator can be ran in `repair` mode.
If enabled via the `-repair host:port` argument the migrator will attempt to download the transaction from the specified Clio/`rippled` server and write it to the DB.
```bash
./clio_migrator path/to/config --repair 127.0.0.1:6006 # repair from `rippled` serving on 127.0.0.1:6006
```
If the migrator failed and exit (or crashed) you can resume from where it stopped by using the `--resume` option.
```bash
./clio_migrator path/to/config --resume # can be used together with --repair option
```
### OPTIONAL: running the verifier
After the migration completes, it is optional to perform a database verification to ensure the URIs are migrated correctly.
Again, use the old config file you copied in Step 0 above.

View File

@@ -3,529 +3,396 @@
#include <config/Config.h>
#include <etl/NFTHelpers.h>
#include <main/Build.h>
#include <main/migration/Helpers.h>
#include <main/migration/Migrations.h>
#include <rpc/RPCHelpers.h>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/http.hpp>
#include <boost/json.hpp>
#include <boost/log/trivial.hpp>
#include <boost/program_options.hpp>
#include <cassandra.h>
#include <iostream>
static std::uint32_t const MAX_RETRIES = 5;
static std::chrono::seconds const WAIT_TIME = std::chrono::seconds(60);
static std::uint32_t const NFT_WRITE_BATCH_SIZE = 10000;
static void
wait(
boost::asio::steady_timer& timer,
std::string const& reason,
std::chrono::seconds timeout = WAIT_TIME)
class Step1Impl
{
clio::LogService::info() << reason << ". Waiting then retrying";
timer.expires_after(timeout);
timer.wait();
clio::LogService::info() << "Done waiting";
}
std::string tag_;
std::shared_ptr<Backend::CassandraBackend> backend_;
std::reference_wrapper<ResumeContextProvider> resumeProvider_;
boost::json::object resumeData_;
static std::optional<boost::json::object>
doRequestFromRippled(
clio::Config const& config,
boost::json::object const& request)
{
auto source = config.array("etl_sources").at(0);
auto const ip = source.value<std::string>("ip");
auto const wsPort = source.value<std::string>("ws_port");
boost::asio::steady_timer timer_;
std::optional<std::string> repairAddress_;
clio::LogService::debug()
<< "Attempting to forward request to tx. "
<< "request = " << boost::json::serialize(request);
boost::json::object response;
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
try
public:
Step1Impl(
std::string tag,
boost::asio::io_context& ioc,
std::shared_ptr<Backend::CassandraBackend> backend,
ResumeContextProvider& resumeProvider,
boost::json::object resumeData,
std::optional<std::string> repairAddress)
: tag_{std::move(tag)}
, backend_{backend}
, resumeProvider_{std::ref(resumeProvider)}
, resumeData_{std::move(resumeData)}
, timer_{ioc}
, repairAddress_{repairAddress}
{
boost::asio::io_context ioc;
tcp::resolver resolver{ioc};
}
auto ws = std::make_unique<websocket::stream<beast::tcp_stream>>(ioc);
auto const results = resolver.resolve(ip, wsPort);
void
perform(
boost::asio::yield_context yield,
Backend::LedgerRange const& ledgerRange)
{
/*
* Step 1 - Look at all NFT transactions recorded in
* `nf_token_transactions` and reload any NFTokenMint transactions.
* These will contain the URI of any tokens that were minted after our
* start sequence. We look at transactions for this step instead of
* directly at the tokens in `nf_tokens` because we also want to cover
* the extreme edge case of a token that is re-minted with a different
* URI.
*/
std::vector<NFTsData> toWrite;
ws->next_layer().expires_after(std::chrono::seconds(15));
ws->next_layer().connect(results);
clio::LogService::info() << "Running " << tag_;
resumeProvider_.get().write({tag_, {}}); // at the start of step1
ws->handshake(ip, "/");
ws->write(net::buffer(boost::json::serialize(request)));
std::stringstream query;
query << "SELECT hash FROM " << backend_->tablePrefix()
<< "nf_token_transactions";
CassStatement* nftTxQuery = cass_statement_new(query.str().c_str(), 0);
cass_statement_set_paging_size(nftTxQuery, 1000);
cass_bool_t morePages = cass_true;
beast::flat_buffer buffer;
ws->read(buffer);
auto begin = static_cast<char const*>(buffer.data().data());
auto end = begin + buffer.data().size();
auto parsed = boost::json::parse(std::string(begin, end));
if (!parsed.is_object())
if (not resumeData_.empty() and resumeData_.contains("token") and
resumeData_.at("token").is_string() and
not resumeData_.at("token").as_string().empty())
{
clio::LogService::error()
<< "Error parsing response: " << std::string{begin, end};
return {};
clio::LogService::info() << " -- Restoring previous state..";
auto encodedState =
std::string{resumeData_.at("token").as_string().c_str()};
auto state = ripple::base64_decode(encodedState);
cass_statement_set_paging_state_token(
nftTxQuery, state.c_str(), state.size());
clio::LogService::info()
<< " Resuming from page " << encodedState;
}
return parsed.as_object();
}
catch (std::exception const& e)
{
clio::LogService::fatal() << "Encountered exception : " << e.what();
return {};
}
}
static std::optional<boost::json::object>
requestFromRippled(
boost::asio::steady_timer& timer,
clio::Config const& config,
boost::json::object const& request,
std::uint32_t const attempts = 0)
{
auto response = doRequestFromRippled(config, request);
if (response.has_value())
return response;
if (attempts >= MAX_RETRIES)
return std::nullopt;
wait(timer, "Failed to request from rippled", std::chrono::seconds{1});
return requestFromRippled(timer, config, request, attempts + 1);
}
static std::string
hexStringToBinaryString(std::string hex)
{
auto blob = ripple::strUnHex(hex);
std::string strBlob;
for (auto c : *blob)
strBlob += c;
return strBlob;
}
static void
maybeWriteTransaction(
Backend::CassandraBackend& backend,
std::optional<boost::json::object> const& tx)
{
if (!tx.has_value())
throw std::runtime_error("Could not repair transaction");
auto package = tx.value();
if (!package.contains("result") || !package.at("result").is_object() ||
package.at("result").as_object().contains("error"))
throw std::runtime_error("Received non-success response from rippled");
auto data = package.at("result").as_object();
auto const date = data.at("date").as_int64();
auto const ledgerIndex = data.at("ledger_index").as_int64();
auto hashStr = hexStringToBinaryString(data.at("hash").as_string().c_str());
auto metaStr = hexStringToBinaryString(data.at("meta").as_string().c_str());
auto txStr = hexStringToBinaryString(data.at("tx").as_string().c_str());
backend.writeTransaction(
std::move(hashStr),
ledgerIndex,
date,
std::move(txStr),
std::move(metaStr));
backend.sync();
}
static void
repairCorruptedTx(
boost::asio::steady_timer& timer,
clio::Config const& config,
Backend::CassandraBackend& backend,
ripple::uint256 const& hash)
{
clio::LogService::info() << " - repairing " << hash;
auto const data = requestFromRippled(
timer,
config,
// For all NFT txs, paginated in groups of 1000...
while (morePages)
{
{"method", "tx"},
{"transaction", to_string(hash)},
{"binary", true},
});
CassResult const* result =
doTryGetTxPageResult(nftTxQuery, timer_, backend_);
maybeWriteTransaction(backend, data);
}
std::vector<ripple::uint256> txHashes;
static std::vector<NFTsData>
doNFTWrite(
std::vector<NFTsData>& nfts,
Backend::CassandraBackend& backend,
std::string const& tag)
{
auto const size = nfts.size();
if (size == 0)
return nfts;
backend.writeNFTs(std::move(nfts));
backend.sync();
clio::LogService::info() << tag << ": Wrote " << size << " records";
return {};
}
static std::vector<NFTsData>
maybeDoNFTWrite(
std::vector<NFTsData>& nfts,
Backend::CassandraBackend& backend,
std::string const& tag)
{
if (nfts.size() < NFT_WRITE_BATCH_SIZE)
return nfts;
return doNFTWrite(nfts, backend, tag);
}
static std::vector<Backend::TransactionAndMetadata>
doTryFetchTransactions(
boost::asio::steady_timer& timer,
Backend::CassandraBackend& backend,
std::vector<ripple::uint256> const& hashes,
boost::asio::yield_context& yield,
std::uint32_t const attempts = 0)
{
try
{
return backend.fetchTransactions(hashes, yield);
}
catch (Backend::DatabaseTimeout const& e)
{
if (attempts >= MAX_RETRIES)
throw e;
wait(timer, "Transactions read error");
return doTryFetchTransactions(
timer, backend, hashes, yield, attempts + 1);
}
}
static Backend::LedgerPage
doTryFetchLedgerPage(
boost::asio::steady_timer& timer,
Backend::CassandraBackend& backend,
std::optional<ripple::uint256> const& cursor,
std::uint32_t const sequence,
boost::asio::yield_context& yield,
std::uint32_t const attempts = 0)
{
try
{
return backend.fetchLedgerPage(cursor, sequence, 10000, false, yield);
}
catch (Backend::DatabaseTimeout const& e)
{
if (attempts >= MAX_RETRIES)
throw e;
wait(timer, "Page read error");
return doTryFetchLedgerPage(
timer, backend, cursor, sequence, yield, attempts + 1);
}
}
static const CassResult*
doTryGetTxPageResult(
CassStatement* const query,
boost::asio::steady_timer& timer,
Backend::CassandraBackend& backend,
std::uint32_t const attempts = 0)
{
CassFuture* fut = cass_session_execute(backend.cautionGetSession(), query);
CassResult const* result = cass_future_get_result(fut);
cass_future_free(fut);
if (result != nullptr)
return result;
if (attempts >= MAX_RETRIES)
throw std::runtime_error("Already retried too many times");
wait(timer, "Unexpected empty result from tx paging");
return doTryGetTxPageResult(query, timer, backend, attempts + 1);
}
static void
doMigrationStepOne(
clio::Config const& config,
Backend::CassandraBackend& backend,
boost::asio::steady_timer& timer,
boost::asio::yield_context& yield,
Backend::LedgerRange const& ledgerRange,
bool repairEnabled = false)
{
/*
* Step 1 - Look at all NFT transactions recorded in
* `nf_token_transactions` and reload any NFTokenMint transactions. These
* will contain the URI of any tokens that were minted after our start
* sequence. We look at transactions for this step instead of directly at
* the tokens in `nf_tokens` because we also want to cover the extreme
* edge case of a token that is re-minted with a different URI.
*/
std::string const stepTag = "Step 1 - transaction loading";
std::vector<NFTsData> toWrite;
std::stringstream query;
query << "SELECT hash FROM " << backend.tablePrefix()
<< "nf_token_transactions";
CassStatement* nftTxQuery = cass_statement_new(query.str().c_str(), 0);
cass_statement_set_paging_size(nftTxQuery, 1000);
cass_bool_t morePages = cass_true;
// For all NFT txs, paginated in groups of 1000...
while (morePages)
{
CassResult const* result =
doTryGetTxPageResult(nftTxQuery, timer, backend);
std::vector<ripple::uint256> txHashes;
// For each tx in page...
CassIterator* txPageIterator = cass_iterator_from_result(result);
while (cass_iterator_next(txPageIterator))
{
cass_byte_t const* buf;
std::size_t bufSize;
CassError const rc = cass_value_get_bytes(
cass_row_get_column(cass_iterator_get_row(txPageIterator), 0),
&buf,
&bufSize);
if (rc != CASS_OK)
// For each tx in page...
CassIterator* txPageIterator = cass_iterator_from_result(result);
while (cass_iterator_next(txPageIterator))
{
cass_iterator_free(txPageIterator);
cass_result_free(result);
cass_statement_free(nftTxQuery);
throw std::runtime_error(
"Could not retrieve hash from nf_token_transactions");
cass_byte_t const* buf;
std::size_t bufSize;
CassError const rc = cass_value_get_bytes(
cass_row_get_column(
cass_iterator_get_row(txPageIterator), 0),
&buf,
&bufSize);
if (rc != CASS_OK)
{
cass_iterator_free(txPageIterator);
cass_result_free(result);
cass_statement_free(nftTxQuery);
throw std::runtime_error(
"Could not retrieve hash from nf_token_transactions");
}
txHashes.push_back(ripple::uint256::fromVoid(buf));
}
txHashes.push_back(ripple::uint256::fromVoid(buf));
}
auto txs =
doTryFetchTransactions(timer_, backend_, txHashes, yield);
if (txs.size() != txHashes.size())
throw std::runtime_error(
"Amount of hashes does not match amount of retrieved "
"transactions");
auto txs = doTryFetchTransactions(timer, backend, txHashes, yield);
if (txs.size() != txHashes.size())
throw std::runtime_error(
"Amount of hashes does not match amount of retrieved "
"transactions");
for (int32_t idx = 0; idx < txHashes.size(); ++idx)
{
auto const& tx = txs.at(idx);
auto const& hash = txHashes.at(idx);
if (tx.ledgerSequence > ledgerRange.maxSequence)
continue;
try
for (int32_t idx = 0; idx < txHashes.size(); ++idx)
{
ripple::STTx const sttx{ripple::SerialIter{
tx.transaction.data(), tx.transaction.size()}};
if (sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT)
auto const& tx = txs.at(idx);
auto const& hash = txHashes.at(idx);
if (tx.ledgerSequence > ledgerRange.maxSequence)
continue;
ripple::TxMeta const txMeta{
sttx.getTransactionID(), tx.ledgerSequence, tx.metadata};
toWrite.push_back(
std::get<1>(getNFTDataFromTx(txMeta, sttx)).value());
try
{
ripple::STTx const sttx{ripple::SerialIter{
tx.transaction.data(), tx.transaction.size()}};
if (sttx.getTxnType() != ripple::TxType::ttNFTOKEN_MINT)
continue;
ripple::TxMeta const txMeta{
sttx.getTransactionID(),
tx.ledgerSequence,
tx.metadata};
toWrite.push_back(
std::get<1>(getNFTDataFromTx(txMeta, sttx)).value());
}
catch (std::exception const& e)
{
clio::LogService::warn()
<< "Corrupted tx detected: " << hash;
std::cerr << "Corrupted tx detected: " << hash << std::endl;
if (not repairAddress_.has_value())
{
clio::LogService::fatal()
<< "Not attempting to repair. Rerun with `--repair "
"[host:port]` to repair corrupted transactions.";
exit(-1);
}
repairCorruptedTx(timer_, *repairAddress_, backend_, hash);
auto maybeTx = backend_->fetchTransaction(hash, yield);
if (!maybeTx.has_value())
{
clio::LogService::fatal()
<< "Could not fetch written transaction for hash "
<< hash << "; Repair failed.";
exit(-1);
}
txs[idx] = maybeTx.value();
--idx; // repeat the try section for the repaired tx
std::cerr << "+ tx repaired: " << hash << std::endl;
}
}
catch (std::exception const& e)
toWrite = maybeDoNFTWrite(toWrite, backend_, tag_);
morePages = cass_result_has_more_pages(result);
if (morePages)
{
clio::LogService::warn() << "Corrupted tx detected: " << hash;
std::cerr << "Corrupted tx detected: " << hash << std::endl;
char const* state = nullptr;
std::size_t sz;
cass_result_paging_state_token(result, &state, &sz);
cass_statement_set_paging_state_token(nftTxQuery, state, sz);
if (not repairEnabled)
// only update resume token if data was actually written to DB
if (toWrite.empty())
{
clio::LogService::fatal()
<< "Not attempting to repair. Rerun with -repair to "
"repair corrupted transactions.";
exit(-1);
resumeProvider_.get().write(
{tag_,
{{"token",
ripple::base64_encode(std::string{state, sz})}}});
}
repairCorruptedTx(timer, config, backend, hash);
auto maybeTx = backend.fetchTransaction(hash, yield);
if (!maybeTx.has_value())
{
clio::LogService::fatal()
<< "Could not fetch written transaction for hash "
<< hash << "; Repair failed.";
exit(-1);
}
txs[idx] = maybeTx.value();
--idx; // repeat the try section for the repaired tx
std::cerr << "+ tx repaired: " << hash << std::endl;
}
cass_iterator_free(txPageIterator);
cass_result_free(result);
}
toWrite = maybeDoNFTWrite(toWrite, backend, stepTag);
cass_statement_free(nftTxQuery);
doNFTWrite(toWrite, backend_, tag_);
}
};
morePages = cass_result_has_more_pages(result);
if (morePages)
cass_statement_set_paging_state(nftTxQuery, result);
cass_iterator_free(txPageIterator);
cass_result_free(result);
class Step2Impl
{
std::string tag_;
std::shared_ptr<Backend::CassandraBackend> backend_;
std::reference_wrapper<ResumeContextProvider> resumeProvider_;
boost::json::object resumeData_;
boost::asio::steady_timer timer_;
public:
Step2Impl(
std::string tag,
boost::asio::io_context& ioc,
std::shared_ptr<Backend::CassandraBackend> backend,
ResumeContextProvider& resumeProvider,
boost::json::object resumeData)
: tag_{std::move(tag)}
, backend_{backend}
, resumeProvider_{std::ref(resumeProvider)}
, resumeData_{std::move(resumeData)}
, timer_{ioc}
{
}
cass_statement_free(nftTxQuery);
doNFTWrite(toWrite, backend, stepTag);
}
static void
doMigrationStepTwo(
Backend::CassandraBackend& backend,
boost::asio::steady_timer& timer,
boost::asio::yield_context& yield,
Backend::LedgerRange const& ledgerRange)
{
/*
* Step 2 - Pull every object from our initial ledger and load all NFTs
* found in any NFTokenPage object. Prior to this migration, we were not
* pulling out NFTs from the initial ledger, so all these NFTs would be
* missed. This will also record the URI of any NFTs minted prior to the
* start sequence.
*/
std::string const stepTag = "Step 2 - initial ledger loading";
std::vector<NFTsData> toWrite;
std::optional<ripple::uint256> cursor;
// For each object page in initial ledger
do
void
perform(
boost::asio::yield_context yield,
Backend::LedgerRange const& ledgerRange)
{
auto const page = doTryFetchLedgerPage(
timer, backend, cursor, ledgerRange.minSequence, yield);
/*
* Step 2 - Pull every object from our initial ledger and load all NFTs
* found in any NFTokenPage object. Prior to this migration, we were not
* pulling out NFTs from the initial ledger, so all these NFTs would be
* missed. This will also record the URI of any NFTs minted prior to the
* start sequence.
*/
std::vector<NFTsData> toWrite;
std::optional<ripple::uint256> cursor;
// For each object in page
for (auto const& object : page.objects)
clio::LogService::info() << "Running " << tag_;
resumeProvider_.get().write({tag_, {}}); // at the start of step2
if (not resumeData_.empty() and resumeData_.contains("cursor") and
resumeData_.at("cursor").is_string() and
not resumeData_.at("cursor").as_string().empty())
{
auto const objectNFTs = getNFTDataFromObj(
ledgerRange.minSequence,
std::string(object.key.begin(), object.key.end()),
std::string(object.blob.begin(), object.blob.end()));
toWrite.insert(toWrite.end(), objectNFTs.begin(), objectNFTs.end());
clio::LogService::info() << " -- Restoring previous state..";
cursor =
ripple::strUnHex(resumeData_.at("cursor").as_string().c_str());
clio::LogService::info() << " Resuming from " << *cursor;
}
toWrite = maybeDoNFTWrite(toWrite, backend, stepTag);
cursor = page.cursor;
} while (cursor.has_value());
// For each object page in initial ledger
do
{
auto const page = doTryFetchLedgerPage(
timer_, backend_, cursor, ledgerRange.minSequence, yield);
doNFTWrite(toWrite, backend, stepTag);
}
// For each object in page
for (auto const& object : page.objects)
{
auto const objectNFTs = getNFTDataFromObj(
ledgerRange.minSequence,
std::string(object.key.begin(), object.key.end()),
std::string(object.blob.begin(), object.blob.end()));
toWrite.insert(
toWrite.end(), objectNFTs.begin(), objectNFTs.end());
}
static void
doMigrationStepThree(Backend::CassandraBackend& backend)
toWrite = maybeDoNFTWrite(toWrite, backend_, tag_);
cursor = page.cursor;
// only update resume token if data was actually written to DB
if (cursor.has_value() && toWrite.empty())
resumeProvider_.get().write(
{tag_, {{"cursor", std::string{ripple::strHex(*cursor)}}}});
} while (cursor.has_value());
doNFTWrite(toWrite, backend_, tag_);
}
};
class Step3Impl
{
/*
* Step 3 - Drop the old `issuer_nf_tokens` table, which is replaced by
* `issuer_nf_tokens_v2`. Normally, we should probably not drop old tables
* in migrations, but here it is safe since the old table wasn't yet being
* used to serve any data anyway.
*/
std::stringstream query;
query << "DROP TABLE " << backend.tablePrefix() << "issuer_nf_tokens";
CassStatement* issuerDropTableQuery =
cass_statement_new(query.str().c_str(), 0);
CassFuture* fut =
cass_session_execute(backend.cautionGetSession(), issuerDropTableQuery);
CassError const rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(issuerDropTableQuery);
backend.sync();
std::string tag_;
std::shared_ptr<Backend::CassandraBackend> backend_;
std::reference_wrapper<ResumeContextProvider> resumeProvider_;
if (rc != CASS_OK)
clio::LogService::warn() << "Could not drop old issuer_nf_tokens "
"table. If it still exists, "
"you should drop it yourself\n";
}
static void
doMigration(
clio::Config const& config,
Backend::CassandraBackend& backend,
boost::asio::steady_timer& timer,
boost::asio::yield_context& yield,
bool repairEnabled = false)
{
clio::LogService::info() << "Beginning migration";
auto const ledgerRange = backend.hardFetchLedgerRangeNoThrow(yield);
/*
* Step 0 - If we haven't downloaded the initial ledger yet, just short
* circuit.
*/
if (!ledgerRange)
public:
Step3Impl(
std::string tag,
std::shared_ptr<Backend::CassandraBackend> backend,
ResumeContextProvider& resumeProvider)
: tag_{std::move(tag)}
, backend_{backend}
, resumeProvider_{std::ref(resumeProvider)}
{
clio::LogService::info() << "There is no data to migrate";
return;
}
doMigrationStepOne(
config, backend, timer, yield, *ledgerRange, repairEnabled);
clio::LogService::info() << "\nStep 1 done!\n";
void
perform()
{
/*
* Step 3 - Drop the old `issuer_nf_tokens` table, which is replaced by
* `issuer_nf_tokens_v2`. Normally, we should probably not drop old
* tables in migrations, but here it is safe since the old table wasn't
* yet being used to serve any data anyway.
*/
clio::LogService::info() << "Running " << tag_;
resumeProvider_.get().write({tag_, {}}); // at the start of step3
doMigrationStepTwo(backend, timer, yield, *ledgerRange);
clio::LogService::info() << "\nStep 2 done!\n";
std::stringstream query;
query << "DROP TABLE " << backend_->tablePrefix() << "issuer_nf_tokens";
CassStatement* issuerDropTableQuery =
cass_statement_new(query.str().c_str(), 0);
CassFuture* fut = cass_session_execute(
backend_->cautionGetSession(), issuerDropTableQuery);
CassError const rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(issuerDropTableQuery);
backend_->sync();
doMigrationStepThree(backend);
clio::LogService::info() << "\nStep 3 done!\n";
clio::LogService::info()
<< "\nCompleted migration from " << ledgerRange->minSequence << " to "
<< ledgerRange->maxSequence << "!\n";
}
if (rc != CASS_OK)
clio::LogService::warn() << "Could not drop old issuer_nf_tokens "
"table. If it still exists, "
"you should drop it yourself\n";
}
};
static void
usage()
{
std::cerr << "\nUsage:\n"
<< " with repair: clio_migrator path/to/config -repair 2> "
"repair.log\n"
<< " without repair: clio_migrator path/to/config" << std::endl;
<< " without repair: clio_migrator path/to/config\n"
<< " with repair: clio_migrator path/to/config --repair "
"127.0.0.1:6006 2> repair.log\n"
<< "resume previous run: clio_migrator path/to/config --resume\n"
<< " use both together: clio_migrator path/to/config -Rr "
"192.168.0.10:51233"
<< std::endl;
}
int
main(int argc, char* argv[])
try
{
if (argc < 2)
namespace po = boost::program_options;
auto repairAddress = std::optional<std::string>{};
auto resumeEnabled = false;
// clang-format off
po::options_description description("Options");
description.add_options()
("help,h", "print help message and exit")
("resume,R", "attempt to resume with previous progress")
("repair,r", po::value<std::string>(), "specify repair server. format: `host:port`")
("conf,c", po::value<std::string>(), "specify a configuration file")
;
// clang-format on
po::positional_options_description positional;
positional.add("conf", 1);
po::variables_map parsed;
po::store(
po::command_line_parser(argc, argv)
.options(description)
.positional(positional)
.run(),
parsed);
po::notify(parsed);
if (parsed.count("conf") == 0u)
{
std::cerr << "Didn't provide config path." << std::endl;
std::cout << description << std::endl;
usage();
return EXIT_FAILURE;
std::exit(EXIT_FAILURE);
}
auto repairEnabled = false;
if (argc >= 3)
if (parsed.count("help") != 0u)
{
if (not boost::iequals(argv[2], "-repair"))
{
std::cerr << "Final argument must be `-repair`." << std::endl;
usage();
return EXIT_FAILURE;
}
clio::LogService::info()
<< "Enabling REPAIR mode. Missing/broken transactions will be "
"downloaded from rippled and overwritten.";
repairEnabled = true;
std::cout << description << std::endl;
usage();
std::exit(EXIT_SUCCESS);
}
std::string const configPath = argv[1];
std::string const configPath = parsed["conf"].as<std::string>();
auto const config = clio::ConfigReader::open(configPath);
if (!config)
{
@@ -543,20 +410,75 @@ main(int argc, char* argv[])
return EXIT_FAILURE;
}
boost::asio::io_context ioc;
boost::asio::steady_timer timer{ioc};
auto workGuard = boost::asio::make_work_guard(ioc);
auto backend = Backend::make_Backend(ioc, config);
if (parsed.count("repair") != 0u)
{
repairAddress = parsed["repair"].as<std::string>();
parseHostPort(*repairAddress); // throws on wrong format
boost::asio::spawn(
clio::LogService::info()
<< "Enabling REPAIR mode. Missing/broken transactions will be "
"downloaded from Clio/rippled at "
<< *repairAddress << " and overwritten.";
}
if (parsed.count("resume") != 0u)
{
resumeEnabled = true;
clio::LogService::info()
<< "Enabling RESUME mode. Will attempt to restore previously saved "
"state from `resume.json`.";
}
boost::asio::io_context ioc;
auto backend = Backend::make_Backend(ioc, config);
auto resumeProvider = ResumeContextProvider(
std::filesystem::current_path() / "resume.json", resumeEnabled);
auto migrator = Migrator{
ioc,
[&config, &backend, &workGuard, &timer, &repairEnabled](
boost::asio::yield_context yield) {
doMigration(config, *backend, timer, yield, repairEnabled);
workGuard.reset();
});
config,
backend,
resumeProvider,
{
Step(
"Step 1 - transaction loading",
[&](auto tag,
auto yield,
auto const& ledgerRange,
auto resumeData) {
Step1Impl(
tag,
ioc,
backend,
resumeProvider,
resumeData,
repairAddress)
.perform(yield, ledgerRange);
}),
Step(
"Step 2 - initial ledger loading",
[&](auto tag,
auto yield,
auto const& ledgerRange,
auto resumeData) {
Step2Impl(tag, ioc, backend, resumeProvider, resumeData)
.perform(yield, ledgerRange);
}),
Step(
"Step 3 - cleanup",
[&](auto tag,
auto yield,
auto const& ledgerRange,
auto resumeData) {
Step3Impl(tag, backend, resumeProvider).perform();
}),
}};
ioc.run();
clio::LogService::info() << "SUCCESS!";
return EXIT_SUCCESS;
}
catch (std::exception const& ex)
{
std::cerr << ex.what() << std::endl;
}

View File

@@ -0,0 +1,295 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <backend/CassandraBackend.h>
#include <config/Config.h>
#include <log/Logger.h>
#include <ripple/basics/base64.h>
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/http.hpp>
#include <boost/json.hpp>
#include <cassandra.h>
#include <filesystem>
#include <optional>
#include <string>
static std::uint32_t const MAX_RETRIES = 5;
static std::chrono::seconds const WAIT_TIME = std::chrono::seconds(60);
static std::uint32_t const NFT_WRITE_BATCH_SIZE = 10000;
static void
wait(
boost::asio::steady_timer& timer,
std::string const& reason,
std::chrono::seconds timeout = WAIT_TIME)
{
clio::LogService::info() << reason << ". Waiting then retrying";
timer.expires_after(timeout);
timer.wait();
clio::LogService::info() << "Done waiting";
}
static std::pair<std::string, std::string>
parseHostPort(std::string input)
{
std::vector<std::string> components;
boost::split(components, input, boost::is_any_of(":"));
if (components.size() != 2)
throw std::logic_error(
"Host and port must be specified as `host:port` string. Got "
"instead: `" +
input + "`");
return std::make_pair(components.at(0), components.at(1));
}
static std::optional<boost::json::object>
doRequestFromRippled(
std::string repairAddress,
boost::json::object const& request)
{
auto const [ip, wsPort] = parseHostPort(repairAddress);
clio::LogService::debug()
<< "Attempting to forward request to repair server. "
<< "request = " << boost::json::serialize(request);
boost::json::object response;
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
try
{
boost::asio::io_context ioc;
tcp::resolver resolver{ioc};
auto ws = std::make_unique<websocket::stream<beast::tcp_stream>>(ioc);
auto const results = resolver.resolve(ip, wsPort);
ws->next_layer().expires_after(std::chrono::seconds(15));
ws->next_layer().connect(results);
ws->handshake(ip, "/");
ws->write(net::buffer(boost::json::serialize(request)));
beast::flat_buffer buffer;
ws->read(buffer);
auto begin = static_cast<char const*>(buffer.data().data());
auto end = begin + buffer.data().size();
auto parsed = boost::json::parse(std::string(begin, end));
if (!parsed.is_object())
{
clio::LogService::error()
<< "Error parsing response: " << std::string{begin, end};
return {};
}
return parsed.as_object();
}
catch (std::exception const& e)
{
clio::LogService::fatal() << "Encountered exception : " << e.what();
return {};
}
}
static std::optional<boost::json::object>
requestFromRippled(
boost::asio::steady_timer& timer,
std::string repairAddress,
boost::json::object const& request,
std::uint32_t const attempts = 0)
{
auto response = doRequestFromRippled(repairAddress, request);
if (response.has_value())
return response;
if (attempts >= MAX_RETRIES)
return std::nullopt;
wait(timer, "Failed to request from rippled", std::chrono::seconds{1});
return requestFromRippled(timer, repairAddress, request, attempts + 1);
}
static std::string
hexStringToBinaryString(std::string hex)
{
auto blob = ripple::strUnHex(hex);
std::string strBlob;
for (auto c : *blob)
strBlob += c;
return strBlob;
}
static void
maybeWriteTransaction(
std::shared_ptr<Backend::CassandraBackend> const& backend,
std::optional<boost::json::object> const& tx)
{
if (!tx.has_value())
throw std::runtime_error("Could not repair transaction");
auto package = tx.value();
if (!package.contains("result") || !package.at("result").is_object() ||
package.at("result").as_object().contains("error"))
throw std::runtime_error("Received non-success response from rippled");
auto data = package.at("result").as_object();
auto const date = data.at("date").as_int64();
auto const ledgerIndex = data.at("ledger_index").as_int64();
auto hashStr = hexStringToBinaryString(data.at("hash").as_string().c_str());
auto metaStr = hexStringToBinaryString(data.at("meta").as_string().c_str());
auto txStr = hexStringToBinaryString(data.at("tx").as_string().c_str());
backend->writeTransaction(
std::move(hashStr),
ledgerIndex,
date,
std::move(txStr),
std::move(metaStr));
backend->sync();
}
static void
repairCorruptedTx(
boost::asio::steady_timer& timer,
std::string repairAddress,
std::shared_ptr<Backend::CassandraBackend> const& backend,
ripple::uint256 const& hash)
{
clio::LogService::info() << " - repairing " << hash;
auto const data = requestFromRippled(
timer,
repairAddress,
{
{"method", "tx"},
{"transaction", to_string(hash)},
{"binary", true},
});
maybeWriteTransaction(backend, data);
}
static std::vector<NFTsData>
doNFTWrite(
std::vector<NFTsData>& nfts,
std::shared_ptr<Backend::CassandraBackend> const& backend,
std::string const& tag)
{
auto const size = nfts.size();
if (size == 0)
return nfts;
backend->writeNFTs(std::move(nfts));
backend->sync();
clio::LogService::info() << tag << ": Wrote " << size << " records";
return {};
}
static std::vector<NFTsData>
maybeDoNFTWrite(
std::vector<NFTsData>& nfts,
std::shared_ptr<Backend::CassandraBackend> const& backend,
std::string const& tag)
{
if (nfts.size() < NFT_WRITE_BATCH_SIZE)
return nfts;
return doNFTWrite(nfts, backend, tag);
}
static std::vector<Backend::TransactionAndMetadata>
doTryFetchTransactions(
boost::asio::steady_timer& timer,
std::shared_ptr<Backend::CassandraBackend> const& backend,
std::vector<ripple::uint256> const& hashes,
boost::asio::yield_context& yield,
std::uint32_t const attempts = 0)
{
try
{
return backend->fetchTransactions(hashes, yield);
}
catch (Backend::DatabaseTimeout const& e)
{
if (attempts >= MAX_RETRIES)
throw e;
wait(timer, "Transactions read error");
return doTryFetchTransactions(
timer, backend, hashes, yield, attempts + 1);
}
}
static Backend::LedgerPage
doTryFetchLedgerPage(
boost::asio::steady_timer& timer,
std::shared_ptr<Backend::CassandraBackend> const& backend,
std::optional<ripple::uint256> const& cursor,
std::uint32_t const sequence,
boost::asio::yield_context& yield,
std::uint32_t const attempts = 0)
{
try
{
return backend->fetchLedgerPage(cursor, sequence, 10000, false, yield);
}
catch (Backend::DatabaseTimeout const& e)
{
if (attempts >= MAX_RETRIES)
throw e;
wait(timer, "Page read error");
return doTryFetchLedgerPage(
timer, backend, cursor, sequence, yield, attempts + 1);
}
}
static const CassResult*
doTryGetTxPageResult(
CassStatement* const query,
boost::asio::steady_timer& timer,
std::shared_ptr<Backend::CassandraBackend> const& backend,
std::uint32_t const attempts = 0)
{
CassFuture* fut = cass_session_execute(backend->cautionGetSession(), query);
CassResult const* result = cass_future_get_result(fut);
cass_future_free(fut);
if (result != nullptr)
return result;
if (attempts >= MAX_RETRIES)
throw std::runtime_error("Already retried too many times");
wait(timer, "Unexpected empty result from tx paging");
return doTryGetTxPageResult(query, timer, backend, attempts + 1);
}

View File

@@ -0,0 +1,215 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <backend/CassandraBackend.h>
#include <config/Config.h>
#include <log/Logger.h>
#include <ripple/basics/base64.h>
#include <boost/json.hpp>
#include <filesystem>
#include <optional>
#include <string>
struct ResumeContext
{
std::string tag;
boost::json::object data;
ResumeContext(std::string tag, boost::json::object data)
: tag{std::move(tag)}, data{std::move(data)}
{
}
};
class ResumeContextProvider
{
std::filesystem::path path_;
bool enable_ = false;
public:
ResumeContextProvider(std::filesystem::path path, bool enable)
: path_{path}, enable_{enable}
{
if (enable)
clio::LogService::info()
<< "Resume context path: " << path_.string();
}
std::optional<ResumeContext>
load()
{
if (not enable_)
return std::nullopt;
if (not std::filesystem::exists(path_))
return std::nullopt;
auto const is = std::ifstream{path_.string()};
if (not is.is_open())
return std::nullopt;
auto buffer = std::stringstream{};
buffer << is.rdbuf();
auto const value = boost::json::parse(buffer.str());
if (not value.is_object())
return std::nullopt;
auto const& obj = value.as_object();
if (not obj.contains("step") or not obj.contains("state"))
return std::nullopt;
return std::make_optional<ResumeContext>(
std::string{obj.at("step").as_string().c_str()},
obj.at("state").as_object());
}
void
write(ResumeContext ctx)
{
std::ofstream os(path_.string());
if (os.good())
{
auto obj = boost::json::object{
{"step", ctx.tag},
{"state", ctx.data},
};
os << boost::json::serialize(obj) << std::endl;
}
}
};
class Step
{
std::string tag_;
std::function<void(
std::string const&, // tag
boost::asio::yield_context,
Backend::LedgerRange const&,
boost::json::object)>
worker_;
public:
Step(std::string tag, auto&& fn)
: tag_{std::move(tag)}, worker_{std::move(fn)}
{
}
void
perform(
boost::asio::yield_context yield,
Backend::LedgerRange const& ledgerRange,
boost::json::object resume = {})
{
worker_(tag_, yield, ledgerRange, std::move(resume));
}
std::string
tag() const
{
return tag_;
}
};
class Migrator
{
std::reference_wrapper<boost::asio::io_context> ioc_;
std::reference_wrapper<clio::Config const> config_;
std::shared_ptr<Backend::CassandraBackend> backend_;
std::reference_wrapper<ResumeContextProvider> resumeProvider_;
boost::asio::steady_timer timer_;
std::vector<Step> steps_;
public:
Migrator(
boost::asio::io_context& ioc,
clio::Config const& config,
std::shared_ptr<Backend::CassandraBackend> backend,
ResumeContextProvider& resumeProvider,
std::vector<Step> steps)
: ioc_{std::ref(ioc)}
, config_{std::cref(config)}
, backend_{backend}
, resumeProvider_{std::ref(resumeProvider)}
, timer_{ioc}
, steps_{std::move(steps)}
{
boost::asio::spawn(
ioc,
[this, workGuard = boost::asio::make_work_guard(ioc)](auto yield) {
run(yield);
});
}
private:
void
run(boost::asio::yield_context yield)
{
clio::LogService::info() << "Beginning migration";
auto const ledgerRange = backend_->hardFetchLedgerRangeNoThrow(yield);
/*
* Step 0 - If we haven't downloaded the initial ledger yet, just short
* circuit.
*/
if (!ledgerRange)
{
clio::LogService::info() << "There is no data to migrate";
return;
}
auto resume = resumeProvider_.get().load();
for (auto& step : steps_)
{
if (resume)
{
if (resume->tag == step.tag())
{
step.perform(
yield,
*ledgerRange,
resume->data); // resume if possible
}
else
{
clio::LogService::info() << "-- Skipping " << step.tag();
continue; // skip this step
}
}
else
{
step.perform(yield, *ledgerRange); // start step from scratch
}
clio::LogService::info() << step.tag() << " done!\n";
resume = std::nullopt; // already used our resume state for
// previous step
}
clio::LogService::info()
<< "Completed migration from " << ledgerRange->minSequence << " to "
<< ledgerRange->maxSequence << "!\n";
}
};