mirror of
https://github.com/XRPLF/clio.git
synced 2026-04-29 15:37:53 +00:00
parallel cache population
This commit is contained in:
@@ -828,10 +828,10 @@ ReportingETL::monitor()
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " : "
|
||||
<< "Database already populated. Picking up from the tip of history";
|
||||
loadCacheAsync(rng->maxSequence);
|
||||
}
|
||||
assert(rng);
|
||||
uint32_t nextSequence = rng->maxSequence + 1;
|
||||
loadCacheAsync(rng->maxSequence);
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " : "
|
||||
@@ -900,28 +900,67 @@ ReportingETL::loadCacheAsync(uint32_t seq)
|
||||
assert(false);
|
||||
return;
|
||||
}
|
||||
std::thread t{[this, seq]() {
|
||||
BOOST_LOG_TRIVIAL(info) << "Loading cache";
|
||||
std::optional<ripple::uint256> cursor;
|
||||
while (true)
|
||||
{
|
||||
auto res = Backend::synchronousAndRetryOnTimeout(
|
||||
[this, seq, &cursor](auto yield) {
|
||||
return backend_->fetchLedgerPage(cursor, seq, 4096, yield);
|
||||
});
|
||||
backend_->cache().update(res.objects, seq, true);
|
||||
if (!res.cursor)
|
||||
break;
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "Loading cache. cache size = " << backend_->cache().size()
|
||||
<< " - cursor = " << ripple::strHex(res.cursor.value());
|
||||
cursor = std::move(res.cursor);
|
||||
}
|
||||
BOOST_LOG_TRIVIAL(info) << "Finished loading cache";
|
||||
std::vector<Backend::LedgerObject> diff =
|
||||
Backend::synchronousAndRetryOnTimeout(
|
||||
[&](auto yield) { return backend_->fetchLedgerDiff(seq, yield); });
|
||||
|
||||
backend_->cache().setFull();
|
||||
}};
|
||||
t.detach();
|
||||
std::sort(
|
||||
diff.begin(), diff.end(), [](auto a, auto b) { return a.key < b.key; });
|
||||
std::vector<std::optional<ripple::uint256>> cursors;
|
||||
cursors.push_back({});
|
||||
for (auto& obj : diff)
|
||||
{
|
||||
if (obj.blob.size())
|
||||
cursors.push_back({obj.key});
|
||||
}
|
||||
cursors.push_back({});
|
||||
std::stringstream cursorStr;
|
||||
for (auto& c : cursors)
|
||||
{
|
||||
if (c)
|
||||
cursorStr << ripple::strHex(*c) << ", ";
|
||||
}
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "Loading cache. num cursors = " << cursors.size() - 1
|
||||
<< ". cursors = " << cursorStr.str();
|
||||
std::atomic_uint* numRemaining = new std::atomic_uint{cursors.size() - 1};
|
||||
|
||||
for (size_t i = 0; i < cursors.size() - 1; ++i)
|
||||
{
|
||||
std::optional<ripple::uint256> start = cursors[i];
|
||||
std::optional<ripple::uint256> end = cursors[i + 1];
|
||||
std::thread t{[this, seq, start, end, numRemaining]() {
|
||||
std::optional<ripple::uint256> cursor = start;
|
||||
while (true)
|
||||
{
|
||||
auto res = Backend::synchronousAndRetryOnTimeout(
|
||||
[this, seq, &cursor](auto yield) {
|
||||
return backend_->fetchLedgerPage(
|
||||
cursor, seq, 4096, yield);
|
||||
});
|
||||
backend_->cache().update(res.objects, seq, true);
|
||||
if (!res.cursor || (end && *(res.cursor) > *end))
|
||||
break;
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "Loading cache. cache size = "
|
||||
<< backend_->cache().size()
|
||||
<< " - cursor = " << ripple::strHex(res.cursor.value());
|
||||
cursor = std::move(res.cursor);
|
||||
}
|
||||
if (--(*numRemaining) == 0)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info) << "Finished loading cache";
|
||||
backend_->cache().setFull();
|
||||
delete numRemaining;
|
||||
}
|
||||
else
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "Finished a cursor. num remaining = " << *numRemaining;
|
||||
}
|
||||
}};
|
||||
t.detach();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
Reference in New Issue
Block a user