mirror of
https://github.com/XRPLF/clio.git
synced 2026-06-04 01:06:45 +00:00
@@ -10,36 +10,112 @@
|
||||
|
||||
#include <iostream>
|
||||
|
||||
void
|
||||
static std::uint32_t const MAX_RETRIES = 5;
|
||||
static std::chrono::seconds const WAIT_TIME = std::chrono::seconds(60);
|
||||
|
||||
static void
|
||||
wait(boost::asio::steady_timer& timer, std::string const reason)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info) << reason << ". Waiting";
|
||||
timer.expires_after(WAIT_TIME);
|
||||
timer.wait();
|
||||
BOOST_LOG_TRIVIAL(info) << "Done";
|
||||
}
|
||||
|
||||
static void
|
||||
doNFTWrite(
|
||||
std::vector<NFTsData>& nfts,
|
||||
Backend::CassandraBackend& backend,
|
||||
std::string const tag)
|
||||
{
|
||||
if (nfts.size() > 0)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< tag << ": About to write " << nfts.size() << " records...";
|
||||
backend.writeNFTs(std::move(nfts));
|
||||
backend.sync();
|
||||
BOOST_LOG_TRIVIAL(info) << tag << ": Done";
|
||||
}
|
||||
else
|
||||
BOOST_LOG_TRIVIAL(info) << tag << ": No records to write";
|
||||
if (nfts.size() <= 0)
|
||||
return;
|
||||
auto const size = nfts.size();
|
||||
backend.writeNFTs(std::move(nfts));
|
||||
backend.sync();
|
||||
BOOST_LOG_TRIVIAL(info) << tag << ": Wrote " << size << " records";
|
||||
}
|
||||
|
||||
void
|
||||
static std::optional<Backend::TransactionAndMetadata>
|
||||
doTryFetchTransaction(
|
||||
boost::asio::steady_timer& timer,
|
||||
Backend::CassandraBackend& backend,
|
||||
ripple::uint256 const& hash,
|
||||
boost::asio::yield_context& yield,
|
||||
std::uint32_t const attempts = 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
return backend.fetchTransaction(hash, yield);
|
||||
}
|
||||
catch (Backend::DatabaseTimeout const& e)
|
||||
{
|
||||
if (attempts >= MAX_RETRIES)
|
||||
throw e;
|
||||
|
||||
wait(timer, "Transaction read error");
|
||||
return doTryFetchTransaction(timer, backend, hash, yield, attempts + 1);
|
||||
}
|
||||
}
|
||||
|
||||
static Backend::LedgerPage
|
||||
doTryFetchLedgerPage(
|
||||
boost::asio::steady_timer& timer,
|
||||
Backend::CassandraBackend& backend,
|
||||
std::optional<ripple::uint256> const& cursor,
|
||||
std::uint32_t const sequence,
|
||||
boost::asio::yield_context& yield,
|
||||
std::uint32_t const attempts = 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
return backend.fetchLedgerPage(cursor, sequence, 2000, false, yield);
|
||||
}
|
||||
catch (Backend::DatabaseTimeout const& e)
|
||||
{
|
||||
if (attempts >= MAX_RETRIES)
|
||||
throw e;
|
||||
|
||||
wait(timer, "Page read error");
|
||||
return doTryFetchLedgerPage(
|
||||
timer, backend, cursor, sequence, yield, attempts + 1);
|
||||
}
|
||||
}
|
||||
|
||||
static const CassResult*
|
||||
doTryGetTxPageResult(
|
||||
CassStatement* const query,
|
||||
boost::asio::steady_timer& timer,
|
||||
Backend::CassandraBackend& backend,
|
||||
std::uint32_t const attempts = 0)
|
||||
{
|
||||
CassFuture* fut = cass_session_execute(backend.cautionGetSession(), query);
|
||||
CassResult const* result = cass_future_get_result(fut);
|
||||
cass_future_free(fut);
|
||||
|
||||
if (result != nullptr)
|
||||
return result;
|
||||
|
||||
if (attempts >= MAX_RETRIES)
|
||||
throw std::runtime_error("Already retried too many times");
|
||||
|
||||
wait(timer, "Unexpected empty result from tx paging");
|
||||
return doTryGetTxPageResult(query, timer, backend, attempts + 1);
|
||||
}
|
||||
|
||||
static void
|
||||
doMigration(
|
||||
Backend::CassandraBackend& backend,
|
||||
boost::asio::steady_timer& timer,
|
||||
boost::asio::yield_context yield)
|
||||
boost::asio::yield_context& yield)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info) << "Beginning migration";
|
||||
auto const ledgerRange = backend.hardFetchLedgerRangeNoThrow(yield);
|
||||
|
||||
/*
|
||||
* Step 0 - If we haven't downloaded the initial ledger yet, just short
|
||||
* circuit.
|
||||
*/
|
||||
auto const ledgerRange = backend.hardFetchLedgerRangeNoThrow(yield);
|
||||
if (!ledgerRange)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info) << "There is no data to migrate";
|
||||
@@ -47,7 +123,7 @@ doMigration(
|
||||
}
|
||||
|
||||
/*
|
||||
* Step 1 - Look at all NFT transactions, recording in
|
||||
* Step 1 - Look at all NFT transactions recorded in
|
||||
* `nf_token_transactions` and reload any NFTokenMint transactions. These
|
||||
* will contain the URI of any tokens that were minted after our start
|
||||
* sequence. We look at transactions for this step instead of directly at
|
||||
@@ -61,37 +137,13 @@ doMigration(
|
||||
cass_statement_set_paging_size(nftTxQuery, 1000);
|
||||
cass_bool_t morePages = cass_true;
|
||||
|
||||
std::uint32_t retries = 0;
|
||||
|
||||
// For all NFT txs, paginated in groups of 1000...
|
||||
while (morePages)
|
||||
{
|
||||
std::vector<NFTsData> toWrite;
|
||||
CassFuture* fut =
|
||||
cass_session_execute(backend.cautionGetSession(), nftTxQuery);
|
||||
CassResult const* result = cass_future_get_result(fut);
|
||||
if (result == nullptr)
|
||||
{
|
||||
cass_future_free(fut);
|
||||
|
||||
if (retries > 5)
|
||||
{
|
||||
cass_statement_free(nftTxQuery);
|
||||
throw std::runtime_error(
|
||||
"Already retried 5 times on same page. Aborting");
|
||||
}
|
||||
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "Unexepcted empty result from nf_token_transactions, "
|
||||
"waiting for a minute and trying again";
|
||||
timer.expires_after(std::chrono::seconds(60));
|
||||
timer.wait();
|
||||
BOOST_LOG_TRIVIAL(info) << "Done waiting";
|
||||
retries++;
|
||||
continue;
|
||||
}
|
||||
|
||||
retries = 0;
|
||||
CassResult const* result =
|
||||
doTryGetTxPageResult(nftTxQuery, timer, backend);
|
||||
|
||||
// For each tx in page...
|
||||
CassIterator* txPageIterator = cass_iterator_from_result(result);
|
||||
@@ -100,7 +152,7 @@ doMigration(
|
||||
cass_byte_t const* buf;
|
||||
std::size_t bufSize;
|
||||
|
||||
CassError rc = cass_value_get_bytes(
|
||||
CassError const rc = cass_value_get_bytes(
|
||||
cass_row_get_column(cass_iterator_get_row(txPageIterator), 0),
|
||||
&buf,
|
||||
&bufSize);
|
||||
@@ -108,25 +160,27 @@ doMigration(
|
||||
{
|
||||
cass_iterator_free(txPageIterator);
|
||||
cass_result_free(result);
|
||||
cass_future_free(fut);
|
||||
cass_statement_free(nftTxQuery);
|
||||
throw std::runtime_error(
|
||||
"Could not retrieve hash from nf_token_transactions");
|
||||
}
|
||||
|
||||
auto const txHash = ripple::uint256::fromVoid(buf);
|
||||
auto const tx = backend.fetchTransaction(txHash, yield);
|
||||
auto const tx =
|
||||
doTryFetchTransaction(timer, backend, txHash, yield);
|
||||
if (!tx)
|
||||
{
|
||||
cass_iterator_free(txPageIterator);
|
||||
cass_result_free(result);
|
||||
cass_future_free(fut);
|
||||
cass_statement_free(nftTxQuery);
|
||||
std::stringstream ss;
|
||||
ss << "Could not fetch tx with hash "
|
||||
<< ripple::to_string(txHash);
|
||||
throw std::runtime_error(ss.str());
|
||||
}
|
||||
|
||||
// Not really sure how cassandra paging works, but we want to skip
|
||||
// any transactions that were loaded since the migration started
|
||||
if (tx->ledgerSequence > ledgerRange->maxSequence)
|
||||
continue;
|
||||
|
||||
@@ -141,7 +195,6 @@ doMigration(
|
||||
std::get<1>(getNFTDataFromTx(txMeta, sttx)).value());
|
||||
}
|
||||
|
||||
// write what we have
|
||||
doNFTWrite(toWrite, backend, "TX");
|
||||
|
||||
morePages = cass_result_has_more_pages(result);
|
||||
@@ -149,10 +202,10 @@ doMigration(
|
||||
cass_statement_set_paging_state(nftTxQuery, result);
|
||||
cass_iterator_free(txPageIterator);
|
||||
cass_result_free(result);
|
||||
cass_future_free(fut);
|
||||
}
|
||||
|
||||
cass_statement_free(nftTxQuery);
|
||||
BOOST_LOG_TRIVIAL(info) << "\nDone with transaction loading!\n";
|
||||
|
||||
/*
|
||||
* Step 2 - Pull every object from our initial ledger and load all NFTs
|
||||
@@ -165,20 +218,21 @@ doMigration(
|
||||
|
||||
do
|
||||
{
|
||||
auto const page = backend.fetchLedgerPage(
|
||||
cursor, ledgerRange->minSequence, 2000, false, yield);
|
||||
auto const page = doTryFetchLedgerPage(
|
||||
timer, backend, cursor, ledgerRange->minSequence, yield);
|
||||
for (auto const& object : page.objects)
|
||||
{
|
||||
std::string blobStr(object.blob.begin(), object.blob.end());
|
||||
std::vector<NFTsData> toWrite = getNFTDataFromObj(
|
||||
ledgerRange->minSequence,
|
||||
ripple::to_string(object.key),
|
||||
blobStr);
|
||||
std::string(object.blob.begin(), object.blob.end()));
|
||||
doNFTWrite(toWrite, backend, "OBJ");
|
||||
}
|
||||
cursor = page.cursor;
|
||||
} while (cursor.has_value());
|
||||
|
||||
BOOST_LOG_TRIVIAL(info) << "\nDone with object loading!\n";
|
||||
|
||||
/*
|
||||
* Step 3 - Drop the old `issuer_nf_tokens` table, which is replaced by
|
||||
* `issuer_nf_tokens_v2`. Normally, we should probably not drop old tables
|
||||
@@ -191,19 +245,20 @@ doMigration(
|
||||
cass_statement_new(query.str().c_str(), 0);
|
||||
CassFuture* fut =
|
||||
cass_session_execute(backend.cautionGetSession(), issuerDropTableQuery);
|
||||
CassError rc = cass_future_error_code(fut);
|
||||
CassError const rc = cass_future_error_code(fut);
|
||||
cass_future_free(fut);
|
||||
cass_statement_free(issuerDropTableQuery);
|
||||
if (rc != CASS_OK)
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< "Could not drop old issuer_nf_tokens table. If it still exists, "
|
||||
"you should drop it yourself";
|
||||
|
||||
backend.sync();
|
||||
if (rc != CASS_OK)
|
||||
BOOST_LOG_TRIVIAL(warning) << "\nCould not drop old issuer_nf_tokens "
|
||||
"table. If it still exists, "
|
||||
"you should drop it yourself\n";
|
||||
else
|
||||
BOOST_LOG_TRIVIAL(info) << "\nDropped old 'issuer_nf_tokens' table!\n";
|
||||
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "Completed migration from " << ledgerRange->minSequence << " to "
|
||||
<< ledgerRange->maxSequence;
|
||||
<< "\nCompleted migration from " << ledgerRange->minSequence << " to "
|
||||
<< ledgerRange->maxSequence << "!\n";
|
||||
}
|
||||
|
||||
int
|
||||
@@ -232,14 +287,14 @@ main(int argc, char* argv[])
|
||||
}
|
||||
|
||||
boost::asio::io_context ioc;
|
||||
boost::asio::steady_timer timer{ioc};
|
||||
auto workGuard = boost::asio::make_work_guard(ioc);
|
||||
auto backend = Backend::make_Backend(ioc, config);
|
||||
|
||||
boost::asio::steady_timer timer(ioc);
|
||||
auto work = boost::asio::make_work_guard(ioc);
|
||||
boost::asio::spawn(
|
||||
ioc, [&backend, &work, &timer](boost::asio::yield_context yield) {
|
||||
ioc, [&backend, &workGuard, &timer](boost::asio::yield_context yield) {
|
||||
doMigration(*backend, timer, yield);
|
||||
work.reset();
|
||||
workGuard.reset();
|
||||
});
|
||||
|
||||
ioc.run();
|
||||
|
||||
Reference in New Issue
Block a user