From bcd59ddf73be62d8e8a15c3a36ef28cb920af78a Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 16 Mar 2022 10:53:28 -0400 Subject: [PATCH] generate cursors for cache download with configurable number of diffs --- src/etl/ReportingETL.cpp | 49 +++++++++++++++++++++++++++++++--------- src/etl/ReportingETL.h | 4 ++++ 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index a528db814..db104299f 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -908,12 +908,28 @@ ReportingETL::loadCache(uint32_t seq) assert(false); return; } - std::vector diff = - Backend::synchronousAndRetryOnTimeout( - [&](auto yield) { return backend_->fetchLedgerDiff(seq, yield); }); + std::vector diff; + auto append = [](auto&& a, auto&& b) { + a.insert(std::end(a), std::begin(b), std::end(b)); + }; - std::sort( - diff.begin(), diff.end(), [](auto a, auto b) { return a.key < b.key; }); + for (size_t i = 0; i < numDiffs_; ++i) + { + append(diff, Backend::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchLedgerDiff(seq - i, yield); + })); + } + + std::sort(diff.begin(), diff.end(), [](auto a, auto b) { + return a.key < b.key || + (a.key == b.key && a.blob.size() < b.blob.size()); + }); + diff.erase( + std::unique( + diff.begin(), + diff.end(), + [](auto a, auto b) { return a.key == b.key; }), + diff.end()); std::vector> cursors; cursors.push_back({}); for (auto& obj : diff) @@ -929,17 +945,19 @@ ReportingETL::loadCache(uint32_t seq) cursorStr << ripple::strHex(*c) << ", "; } BOOST_LOG_TRIVIAL(info) - << "Loading cache. num cursors = " << cursors.size() - 1 - << ". cursors = " << cursorStr.str(); + << "Loading cache. num cursors = " << cursors.size() - 1; + BOOST_LOG_TRIVIAL(debug) << __func__ << " cursors = " << cursorStr.str(); + std::atomic_uint* numRemaining = new std::atomic_uint{cursors.size() - 1}; + auto startTime = std::chrono::system_clock::now(); for (size_t i = 0; i < cursors.size() - 1; ++i) { std::optional start = cursors[i]; std::optional end = cursors[i + 1]; boost::asio::spawn( ioContext_, - [this, seq, start, end, numRemaining]( + [this, seq, start, end, numRemaining, startTime]( boost::asio::yield_context yield) { std::optional cursor = start; while (true) @@ -960,9 +978,14 @@ ReportingETL::loadCache(uint32_t seq) } if (--(*numRemaining) == 0) { + auto endTime = std::chrono::system_clock::now(); + auto duration = + std::chrono::duration_cast( + endTime - startTime); BOOST_LOG_TRIVIAL(info) << "Finished loading cache. cache size = " - << backend_->cache().size(); + << backend_->cache().size() << ". Took " + << duration.count() << " seconds"; backend_->cache().setFull(); delete numRemaining; } @@ -982,9 +1005,9 @@ ReportingETL::loadCache(uint32_t seq) << "Cache not full. Cache size = " << backend_->cache().size() << ". Sleeping ..."; std::this_thread::sleep_for(std::chrono::seconds(10)); + BOOST_LOG_TRIVIAL(info) + << "Cache is full. Cache size = " << backend_->cache().size(); } - BOOST_LOG_TRIVIAL(info) - << "Cache is full. Cache size = " << backend_->cache().size(); } void @@ -1083,5 +1106,9 @@ ReportingETL::ReportingETL( if (entry == "none" || entry == "no") cacheLoadStyle_ = CacheLoadStyle::NOT_AT_ALL; } + if (cache.contains("num_diffs") && cache.at("num_diffs").as_int64()) + { + numDiffs_ = cache.at("num_diffs").as_int64(); + } } } diff --git a/src/etl/ReportingETL.h b/src/etl/ReportingETL.h index 690639005..e165fecce 100644 --- a/src/etl/ReportingETL.h +++ b/src/etl/ReportingETL.h @@ -50,6 +50,10 @@ private: CacheLoadStyle cacheLoadStyle_ = CacheLoadStyle::ASYNC; + // number of diffs to use to generate cursors to traverse the ledger in + // parallel during initial cache download + size_t numDiffs_ = 1; + std::thread worker_; boost::asio::io_context& ioContext_;