From a8a87c2cff1a996b83b06167455edea0518fce2c Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 8 Mar 2022 18:20:20 -0500 Subject: [PATCH] parallel cache population --- src/etl/ReportingETL.cpp | 83 +++++++++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 22 deletions(-) diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 804178a09..610ad47e1 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -828,10 +828,10 @@ ReportingETL::monitor() BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Database already populated. Picking up from the tip of history"; + loadCacheAsync(rng->maxSequence); } assert(rng); uint32_t nextSequence = rng->maxSequence + 1; - loadCacheAsync(rng->maxSequence); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " @@ -900,28 +900,67 @@ ReportingETL::loadCacheAsync(uint32_t seq) assert(false); return; } - std::thread t{[this, seq]() { - BOOST_LOG_TRIVIAL(info) << "Loading cache"; - std::optional cursor; - while (true) - { - auto res = Backend::synchronousAndRetryOnTimeout( - [this, seq, &cursor](auto yield) { - return backend_->fetchLedgerPage(cursor, seq, 4096, yield); - }); - backend_->cache().update(res.objects, seq, true); - if (!res.cursor) - break; - BOOST_LOG_TRIVIAL(debug) - << "Loading cache. cache size = " << backend_->cache().size() - << " - cursor = " << ripple::strHex(res.cursor.value()); - cursor = std::move(res.cursor); - } - BOOST_LOG_TRIVIAL(info) << "Finished loading cache"; + std::vector diff = + Backend::synchronousAndRetryOnTimeout( + [&](auto yield) { return backend_->fetchLedgerDiff(seq, yield); }); - backend_->cache().setFull(); - }}; - t.detach(); + std::sort( + diff.begin(), diff.end(), [](auto a, auto b) { return a.key < b.key; }); + std::vector> cursors; + cursors.push_back({}); + for (auto& obj : diff) + { + if (obj.blob.size()) + cursors.push_back({obj.key}); + } + cursors.push_back({}); + std::stringstream cursorStr; + for (auto& c : cursors) + { + if (c) + cursorStr << ripple::strHex(*c) << ", "; + } + BOOST_LOG_TRIVIAL(info) + << "Loading cache. num cursors = " << cursors.size() - 1 + << ". cursors = " << cursorStr.str(); + std::atomic_uint* numRemaining = new std::atomic_uint{cursors.size() - 1}; + + for (size_t i = 0; i < cursors.size() - 1; ++i) + { + std::optional start = cursors[i]; + std::optional end = cursors[i + 1]; + std::thread t{[this, seq, start, end, numRemaining]() { + std::optional cursor = start; + while (true) + { + auto res = Backend::synchronousAndRetryOnTimeout( + [this, seq, &cursor](auto yield) { + return backend_->fetchLedgerPage( + cursor, seq, 4096, yield); + }); + backend_->cache().update(res.objects, seq, true); + if (!res.cursor || (end && *(res.cursor) > *end)) + break; + BOOST_LOG_TRIVIAL(debug) + << "Loading cache. cache size = " + << backend_->cache().size() + << " - cursor = " << ripple::strHex(res.cursor.value()); + cursor = std::move(res.cursor); + } + if (--(*numRemaining) == 0) + { + BOOST_LOG_TRIVIAL(info) << "Finished loading cache"; + backend_->cache().setFull(); + delete numRemaining; + } + else + { + BOOST_LOG_TRIVIAL(info) + << "Finished a cursor. num remaining = " << *numRemaining; + } + }}; + t.detach(); + } } void