From 15505905cbb77628decc7715ffaa55c7c3425970 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 15 Mar 2022 14:48:14 -0400 Subject: [PATCH] populate cache using coroutines. optionally disable cache --- src/backend/CassandraBackend.cpp | 1 + src/etl/ReportingETL.cpp | 112 ++++++++++++++++++++----------- src/etl/ReportingETL.h | 12 +++- 3 files changed, 85 insertions(+), 40 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 728a2a929..0782b19d1 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -24,6 +24,7 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) BOOST_LOG_TRIVIAL(error) << "ERROR!!! Cassandra write error: " << rc << ", " << cass_error_desc(rc) << " id= " << requestParams.toString() + << ", current retries " << requestParams.currentRetries << ", retrying in " << wait.count() << " milliseconds"; ++requestParams.currentRetries; std::shared_ptr timer = diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 4a06e362a..5a740811e 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -288,15 +288,14 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) auto firstBook = std::move(*obj.mutable_first_book()); if (!firstBook.size()) firstBook = uint256ToString(Backend::lastKey); + BOOST_LOG_TRIVIAL(debug) << __func__ << " writing book successor " + << ripple::strHex(obj.book_base()) << " - " + << ripple::strHex(firstBook); backend_->writeSuccessor( std::move(*obj.mutable_book_base()), lgrInfo.seq, std::move(firstBook)); - - BOOST_LOG_TRIVIAL(debug) << __func__ << " writing book successor " - << ripple::strHex(obj.book_base()) << " - " - << ripple::strHex(firstBook); } for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) { @@ -832,7 +831,7 @@ ReportingETL::monitor() BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Database already populated. Picking up from the tip of history"; - loadCacheAsync(rng->maxSequence); + loadCache(rng->maxSequence); } assert(rng); uint32_t nextSequence = rng->maxSequence + 1; @@ -889,8 +888,13 @@ ReportingETL::monitor() } void -ReportingETL::loadCacheAsync(uint32_t seq) +ReportingETL::loadCache(uint32_t seq) { + if (cacheLoadStyle_ == CacheLoadStyle::NOT_AT_ALL) + { + BOOST_LOG_TRIVIAL(warning) << "Cache is disabled. Not loading"; + return; + } // sanity check to make sure we are not calling this multiple times static std::atomic_bool loading = false; if (loading) @@ -933,38 +937,55 @@ ReportingETL::loadCacheAsync(uint32_t seq) { std::optional start = cursors[i]; std::optional end = cursors[i + 1]; - std::thread t{[this, seq, start, end, numRemaining]() { - std::optional cursor = start; - while (true) - { - auto res = Backend::synchronousAndRetryOnTimeout( - [this, seq, &cursor](auto yield) { - return backend_->fetchLedgerPage( - cursor, seq, 4096, yield); - }); - backend_->cache().update(res.objects, seq, true); - if (!res.cursor || (end && *(res.cursor) > *end)) - break; - BOOST_LOG_TRIVIAL(debug) - << "Loading cache. cache size = " - << backend_->cache().size() - << " - cursor = " << ripple::strHex(res.cursor.value()); - cursor = std::move(res.cursor); - } - if (--(*numRemaining) == 0) - { - BOOST_LOG_TRIVIAL(info) << "Finished loading cache"; - backend_->cache().setFull(); - delete numRemaining; - } - else - { - BOOST_LOG_TRIVIAL(info) - << "Finished a cursor. num remaining = " << *numRemaining; - } - }}; - t.detach(); + cacheDownloadStrands_.emplace_back(ioContext_); + boost::asio::spawn( + cacheDownloadStrands_.back(), + [this, seq, start, end, numRemaining]( + boost::asio::yield_context yield) { + std::optional cursor = start; + while (true) + { + auto res = + Backend::retryOnTimeout([this, seq, &cursor, &yield]() { + return backend_->fetchLedgerPage( + cursor, seq, 256, yield); + }); + backend_->cache().update(res.objects, seq, true); + if (!res.cursor || (end && *(res.cursor) > *end)) + break; + BOOST_LOG_TRIVIAL(debug) + << "Loading cache. cache size = " + << backend_->cache().size() + << " - cursor = " << ripple::strHex(res.cursor.value()); + cursor = std::move(res.cursor); + } + if (--(*numRemaining) == 0) + { + BOOST_LOG_TRIVIAL(info) + << "Finished loading cache. cache size = " + << backend_->cache().size(); + backend_->cache().setFull(); + delete numRemaining; + } + else + { + BOOST_LOG_TRIVIAL(info) + << "Finished a cursor. num remaining = " + << *numRemaining; + } + }); } + // If loading synchronously, poll cache until full + while (cacheLoadStyle_ == CacheLoadStyle::SYNC && + !backend_->cache().isFull()) + { + BOOST_LOG_TRIVIAL(debug) + << "Cache not full. Cache size = " << backend_->cache().size() + << ". Sleeping ..."; + std::this_thread::sleep_for(std::chrono::seconds(10)); + } + BOOST_LOG_TRIVIAL(info) + << "Cache is full. Cache size = " << backend_->cache().size(); } void @@ -980,7 +1001,7 @@ ReportingETL::monitorReadOnly() return; else latestSequence = rng->maxSequence; - loadCacheAsync(latestSequence); + loadCache(latestSequence); latestSequence++; while (true) { @@ -1049,4 +1070,19 @@ ReportingETL::ReportingETL( extractorThreads_ = config.at("extractor_threads").as_int64(); if (config.contains("txn_threshold")) txnThreshold_ = config.at("txn_threshold").as_int64(); + if (config.contains("cache")) + { + auto cache = config.at("cache").as_object(); + if (cache.contains("load") && cache.at("load").is_string()) + { + auto entry = config.at("cache").as_object().at("load").as_string(); + boost::algorithm::to_lower(entry); + if (entry == "sync") + cacheLoadStyle_ = CacheLoadStyle::SYNC; + if (entry == "async") + cacheLoadStyle_ = CacheLoadStyle::ASYNC; + if (entry == "none" || entry == "no") + cacheLoadStyle_ = CacheLoadStyle::NOT_AT_ALL; + } + } } diff --git a/src/etl/ReportingETL.h b/src/etl/ReportingETL.h index 3880e2860..52252fb46 100644 --- a/src/etl/ReportingETL.h +++ b/src/etl/ReportingETL.h @@ -46,6 +46,10 @@ private: std::optional onlineDeleteInterval_; std::uint32_t extractorThreads_ = 1; + enum class CacheLoadStyle { ASYNC, SYNC, NOT_AT_ALL }; + + CacheLoadStyle cacheLoadStyle_ = CacheLoadStyle::ASYNC; + std::thread worker_; boost::asio::io_context& ioContext_; @@ -63,6 +67,8 @@ private: /// ledgers are published in order boost::asio::io_context::strand publishStrand_; + std::vector cacheDownloadStrands_; + /// Mechanism for communicating with ETL sources. ETLLoadBalancer wraps an /// arbitrary number of ETL sources and load balances ETL requests across /// those sources. @@ -148,9 +154,11 @@ private: loadInitialLedger(uint32_t sequence); /// Populates the cache by walking through the given ledger. Should only be - /// called once + /// called once. The default behavior is to return immediately and populate + /// the cache in the background. This can be overridden via config + /// parameter, to populate synchronously, or not at all void - loadCacheAsync(uint32_t seq); + loadCache(uint32_t seq); /// Run ETL. Extracts ledgers and writes them to the database, until a /// write conflict occurs (or the server shuts down).