mirror of
https://github.com/XRPLF/clio.git
synced 2026-06-03 16:56:45 +00:00
populate cache using coroutines. optionally disable cache
This commit is contained in:
@@ -24,6 +24,7 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func)
|
||||
BOOST_LOG_TRIVIAL(error)
|
||||
<< "ERROR!!! Cassandra write error: " << rc << ", "
|
||||
<< cass_error_desc(rc) << " id= " << requestParams.toString()
|
||||
<< ", current retries " << requestParams.currentRetries
|
||||
<< ", retrying in " << wait.count() << " milliseconds";
|
||||
++requestParams.currentRetries;
|
||||
std::shared_ptr<boost::asio::steady_timer> timer =
|
||||
|
||||
@@ -288,15 +288,14 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
|
||||
auto firstBook = std::move(*obj.mutable_first_book());
|
||||
if (!firstBook.size())
|
||||
firstBook = uint256ToString(Backend::lastKey);
|
||||
BOOST_LOG_TRIVIAL(debug) << __func__ << " writing book successor "
|
||||
<< ripple::strHex(obj.book_base()) << " - "
|
||||
<< ripple::strHex(firstBook);
|
||||
|
||||
backend_->writeSuccessor(
|
||||
std::move(*obj.mutable_book_base()),
|
||||
lgrInfo.seq,
|
||||
std::move(firstBook));
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug) << __func__ << " writing book successor "
|
||||
<< ripple::strHex(obj.book_base()) << " - "
|
||||
<< ripple::strHex(firstBook);
|
||||
}
|
||||
for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects()))
|
||||
{
|
||||
@@ -832,7 +831,7 @@ ReportingETL::monitor()
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " : "
|
||||
<< "Database already populated. Picking up from the tip of history";
|
||||
loadCacheAsync(rng->maxSequence);
|
||||
loadCache(rng->maxSequence);
|
||||
}
|
||||
assert(rng);
|
||||
uint32_t nextSequence = rng->maxSequence + 1;
|
||||
@@ -889,8 +888,13 @@ ReportingETL::monitor()
|
||||
}
|
||||
|
||||
void
|
||||
ReportingETL::loadCacheAsync(uint32_t seq)
|
||||
ReportingETL::loadCache(uint32_t seq)
|
||||
{
|
||||
if (cacheLoadStyle_ == CacheLoadStyle::NOT_AT_ALL)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning) << "Cache is disabled. Not loading";
|
||||
return;
|
||||
}
|
||||
// sanity check to make sure we are not calling this multiple times
|
||||
static std::atomic_bool loading = false;
|
||||
if (loading)
|
||||
@@ -933,38 +937,55 @@ ReportingETL::loadCacheAsync(uint32_t seq)
|
||||
{
|
||||
std::optional<ripple::uint256> start = cursors[i];
|
||||
std::optional<ripple::uint256> end = cursors[i + 1];
|
||||
std::thread t{[this, seq, start, end, numRemaining]() {
|
||||
std::optional<ripple::uint256> cursor = start;
|
||||
while (true)
|
||||
{
|
||||
auto res = Backend::synchronousAndRetryOnTimeout(
|
||||
[this, seq, &cursor](auto yield) {
|
||||
return backend_->fetchLedgerPage(
|
||||
cursor, seq, 4096, yield);
|
||||
});
|
||||
backend_->cache().update(res.objects, seq, true);
|
||||
if (!res.cursor || (end && *(res.cursor) > *end))
|
||||
break;
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "Loading cache. cache size = "
|
||||
<< backend_->cache().size()
|
||||
<< " - cursor = " << ripple::strHex(res.cursor.value());
|
||||
cursor = std::move(res.cursor);
|
||||
}
|
||||
if (--(*numRemaining) == 0)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info) << "Finished loading cache";
|
||||
backend_->cache().setFull();
|
||||
delete numRemaining;
|
||||
}
|
||||
else
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "Finished a cursor. num remaining = " << *numRemaining;
|
||||
}
|
||||
}};
|
||||
t.detach();
|
||||
cacheDownloadStrands_.emplace_back(ioContext_);
|
||||
boost::asio::spawn(
|
||||
cacheDownloadStrands_.back(),
|
||||
[this, seq, start, end, numRemaining](
|
||||
boost::asio::yield_context yield) {
|
||||
std::optional<ripple::uint256> cursor = start;
|
||||
while (true)
|
||||
{
|
||||
auto res =
|
||||
Backend::retryOnTimeout([this, seq, &cursor, &yield]() {
|
||||
return backend_->fetchLedgerPage(
|
||||
cursor, seq, 256, yield);
|
||||
});
|
||||
backend_->cache().update(res.objects, seq, true);
|
||||
if (!res.cursor || (end && *(res.cursor) > *end))
|
||||
break;
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "Loading cache. cache size = "
|
||||
<< backend_->cache().size()
|
||||
<< " - cursor = " << ripple::strHex(res.cursor.value());
|
||||
cursor = std::move(res.cursor);
|
||||
}
|
||||
if (--(*numRemaining) == 0)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "Finished loading cache. cache size = "
|
||||
<< backend_->cache().size();
|
||||
backend_->cache().setFull();
|
||||
delete numRemaining;
|
||||
}
|
||||
else
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "Finished a cursor. num remaining = "
|
||||
<< *numRemaining;
|
||||
}
|
||||
});
|
||||
}
|
||||
// If loading synchronously, poll cache until full
|
||||
while (cacheLoadStyle_ == CacheLoadStyle::SYNC &&
|
||||
!backend_->cache().isFull())
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "Cache not full. Cache size = " << backend_->cache().size()
|
||||
<< ". Sleeping ...";
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
}
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "Cache is full. Cache size = " << backend_->cache().size();
|
||||
}
|
||||
|
||||
void
|
||||
@@ -980,7 +1001,7 @@ ReportingETL::monitorReadOnly()
|
||||
return;
|
||||
else
|
||||
latestSequence = rng->maxSequence;
|
||||
loadCacheAsync(latestSequence);
|
||||
loadCache(latestSequence);
|
||||
latestSequence++;
|
||||
while (true)
|
||||
{
|
||||
@@ -1049,4 +1070,19 @@ ReportingETL::ReportingETL(
|
||||
extractorThreads_ = config.at("extractor_threads").as_int64();
|
||||
if (config.contains("txn_threshold"))
|
||||
txnThreshold_ = config.at("txn_threshold").as_int64();
|
||||
if (config.contains("cache"))
|
||||
{
|
||||
auto cache = config.at("cache").as_object();
|
||||
if (cache.contains("load") && cache.at("load").is_string())
|
||||
{
|
||||
auto entry = config.at("cache").as_object().at("load").as_string();
|
||||
boost::algorithm::to_lower(entry);
|
||||
if (entry == "sync")
|
||||
cacheLoadStyle_ = CacheLoadStyle::SYNC;
|
||||
if (entry == "async")
|
||||
cacheLoadStyle_ = CacheLoadStyle::ASYNC;
|
||||
if (entry == "none" || entry == "no")
|
||||
cacheLoadStyle_ = CacheLoadStyle::NOT_AT_ALL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,10 @@ private:
|
||||
std::optional<std::uint32_t> onlineDeleteInterval_;
|
||||
std::uint32_t extractorThreads_ = 1;
|
||||
|
||||
enum class CacheLoadStyle { ASYNC, SYNC, NOT_AT_ALL };
|
||||
|
||||
CacheLoadStyle cacheLoadStyle_ = CacheLoadStyle::ASYNC;
|
||||
|
||||
std::thread worker_;
|
||||
boost::asio::io_context& ioContext_;
|
||||
|
||||
@@ -63,6 +67,8 @@ private:
|
||||
/// ledgers are published in order
|
||||
boost::asio::io_context::strand publishStrand_;
|
||||
|
||||
std::vector<boost::asio::io_context::strand> cacheDownloadStrands_;
|
||||
|
||||
/// Mechanism for communicating with ETL sources. ETLLoadBalancer wraps an
|
||||
/// arbitrary number of ETL sources and load balances ETL requests across
|
||||
/// those sources.
|
||||
@@ -148,9 +154,11 @@ private:
|
||||
loadInitialLedger(uint32_t sequence);
|
||||
|
||||
/// Populates the cache by walking through the given ledger. Should only be
|
||||
/// called once
|
||||
/// called once. The default behavior is to return immediately and populate
|
||||
/// the cache in the background. This can be overridden via config
|
||||
/// parameter, to populate synchronously, or not at all
|
||||
void
|
||||
loadCacheAsync(uint32_t seq);
|
||||
loadCache(uint32_t seq);
|
||||
|
||||
/// Run ETL. Extracts ledgers and writes them to the database, until a
|
||||
/// write conflict occurs (or the server shuts down).
|
||||
|
||||
Reference in New Issue
Block a user