generate cursors for cache download with configurable number of diffs

This commit is contained in:
CJ Cobb
2022-03-16 10:53:28 -04:00
committed by CJ Cobb
parent f8437b9ba4
commit bcd59ddf73
2 changed files with 42 additions and 11 deletions

View File

@@ -908,12 +908,28 @@ ReportingETL::loadCache(uint32_t seq)
assert(false);
return;
}
std::vector<Backend::LedgerObject> diff =
Backend::synchronousAndRetryOnTimeout(
[&](auto yield) { return backend_->fetchLedgerDiff(seq, yield); });
std::vector<Backend::LedgerObject> diff;
auto append = [](auto&& a, auto&& b) {
a.insert(std::end(a), std::begin(b), std::end(b));
};
std::sort(
diff.begin(), diff.end(), [](auto a, auto b) { return a.key < b.key; });
for (size_t i = 0; i < numDiffs_; ++i)
{
append(diff, Backend::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLedgerDiff(seq - i, yield);
}));
}
std::sort(diff.begin(), diff.end(), [](auto a, auto b) {
return a.key < b.key ||
(a.key == b.key && a.blob.size() < b.blob.size());
});
diff.erase(
std::unique(
diff.begin(),
diff.end(),
[](auto a, auto b) { return a.key == b.key; }),
diff.end());
std::vector<std::optional<ripple::uint256>> cursors;
cursors.push_back({});
for (auto& obj : diff)
@@ -929,17 +945,19 @@ ReportingETL::loadCache(uint32_t seq)
cursorStr << ripple::strHex(*c) << ", ";
}
BOOST_LOG_TRIVIAL(info)
<< "Loading cache. num cursors = " << cursors.size() - 1
<< ". cursors = " << cursorStr.str();
<< "Loading cache. num cursors = " << cursors.size() - 1;
BOOST_LOG_TRIVIAL(debug) << __func__ << " cursors = " << cursorStr.str();
std::atomic_uint* numRemaining = new std::atomic_uint{cursors.size() - 1};
auto startTime = std::chrono::system_clock::now();
for (size_t i = 0; i < cursors.size() - 1; ++i)
{
std::optional<ripple::uint256> start = cursors[i];
std::optional<ripple::uint256> end = cursors[i + 1];
boost::asio::spawn(
ioContext_,
[this, seq, start, end, numRemaining](
[this, seq, start, end, numRemaining, startTime](
boost::asio::yield_context yield) {
std::optional<ripple::uint256> cursor = start;
while (true)
@@ -960,9 +978,14 @@ ReportingETL::loadCache(uint32_t seq)
}
if (--(*numRemaining) == 0)
{
auto endTime = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(
endTime - startTime);
BOOST_LOG_TRIVIAL(info)
<< "Finished loading cache. cache size = "
<< backend_->cache().size();
<< backend_->cache().size() << ". Took "
<< duration.count() << " seconds";
backend_->cache().setFull();
delete numRemaining;
}
@@ -982,9 +1005,9 @@ ReportingETL::loadCache(uint32_t seq)
<< "Cache not full. Cache size = " << backend_->cache().size()
<< ". Sleeping ...";
std::this_thread::sleep_for(std::chrono::seconds(10));
BOOST_LOG_TRIVIAL(info)
<< "Cache is full. Cache size = " << backend_->cache().size();
}
BOOST_LOG_TRIVIAL(info)
<< "Cache is full. Cache size = " << backend_->cache().size();
}
void
@@ -1083,5 +1106,9 @@ ReportingETL::ReportingETL(
if (entry == "none" || entry == "no")
cacheLoadStyle_ = CacheLoadStyle::NOT_AT_ALL;
}
if (cache.contains("num_diffs") && cache.at("num_diffs").as_int64())
{
numDiffs_ = cache.at("num_diffs").as_int64();
}
}
}

View File

@@ -50,6 +50,10 @@ private:
CacheLoadStyle cacheLoadStyle_ = CacheLoadStyle::ASYNC;
// number of diffs to use to generate cursors to traverse the ledger in
// parallel during initial cache download
size_t numDiffs_ = 1;
std::thread worker_;
boost::asio::io_context& ioContext_;