From b332c388d2771cddf533f7f0f30f0574933a5a93 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 4 Mar 2022 16:34:47 -0500 Subject: [PATCH] Load cache from our own database --- src/backend/BackendInterface.cpp | 1 - src/backend/BackendInterface.h | 1 - src/backend/CassandraBackend.cpp | 13 ++------ src/backend/PostgresBackend.cpp | 13 ++------ src/backend/Types.h | 2 -- src/etl/ReportingETL.cpp | 56 ++++++++++++++++++++++++-------- src/etl/ReportingETL.h | 12 +++++-- src/rpc/handlers/BookOffers.cpp | 7 +--- src/rpc/handlers/LedgerData.cpp | 13 +------- src/rpc/handlers/Subscribe.cpp | 5 ++- unittests/main.cpp | 6 ++-- 11 files changed, 61 insertions(+), 68 deletions(-) diff --git a/src/backend/BackendInterface.cpp b/src/backend/BackendInterface.cpp index 716fb98e..cbf3edaa 100644 --- a/src/backend/BackendInterface.cpp +++ b/src/backend/BackendInterface.cpp @@ -244,7 +244,6 @@ BackendInterface::fetchLedgerPage( std::optional const& cursor, std::uint32_t const ledgerSequence, std::uint32_t const limit, - std::uint32_t const limitHint, boost::asio::yield_context& yield) const { LedgerPage page; diff --git a/src/backend/BackendInterface.h b/src/backend/BackendInterface.h index 23494ae7..f4841f78 100644 --- a/src/backend/BackendInterface.h +++ b/src/backend/BackendInterface.h @@ -217,7 +217,6 @@ public: std::optional const& cursor, std::uint32_t const ledgerSequence, std::uint32_t const limit, - std::uint32_t const limitHint, boost::asio::yield_context& yield) const; // Fetches the successor to key/index diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 3112fe2a..6ff828cd 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -757,17 +757,8 @@ CassandraBackend::doOnlineDelete( std::optional cursor; while (true) { - auto [objects, curCursor, warning] = retryOnTimeout([&]() { - return fetchLedgerPage(cursor, minLedger, 256, 0, yield); - }); - if (warning) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ - << " online delete running but flag ledger is not complete"; - std::this_thread::sleep_for(std::chrono::seconds(10)); - continue; - } + auto [objects, curCursor] = retryOnTimeout( + [&]() { return fetchLedgerPage(cursor, minLedger, 256, yield); }); for (auto& obj : objects) { diff --git a/src/backend/PostgresBackend.cpp b/src/backend/PostgresBackend.cpp index 3ca85647..36d2d392 100644 --- a/src/backend/PostgresBackend.cpp +++ b/src/backend/PostgresBackend.cpp @@ -784,17 +784,8 @@ PostgresBackend::doOnlineDelete( std::optional cursor; while (true) { - auto [objects, curCursor, warning] = retryOnTimeout([&]() { - return fetchLedgerPage(cursor, minLedger, 256, 0, yield); - }); - if (warning) - { - BOOST_LOG_TRIVIAL(warning) << __func__ - << " online delete running but " - "flag ledger is not complete"; - std::this_thread::sleep_for(std::chrono::seconds(10)); - continue; - } + auto [objects, curCursor] = retryOnTimeout( + [&]() { return fetchLedgerPage(cursor, minLedger, 256, yield); }); BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; std::stringstream objectsBuffer; diff --git a/src/backend/Types.h b/src/backend/Types.h index 8e5badc2..95b4bf96 100644 --- a/src/backend/Types.h +++ b/src/backend/Types.h @@ -26,13 +26,11 @@ struct LedgerPage { std::vector objects; std::optional cursor; - std::optional warning; }; struct BookOffersPage { std::vector offers; std::optional cursor; - std::optional warning; }; struct TransactionAndMetadata { diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index d9aa90c8..cbb54198 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -832,15 +832,7 @@ ReportingETL::monitor() { } uint32_t nextSequence = latestSequence.value() + 1; - if (!backend_->cache().isFull()) - { - std::thread t{[this, latestSequence]() { - BOOST_LOG_TRIVIAL(info) << "Loading cache"; - loadBalancer_->loadInitialLedger(*latestSequence, true); - backend_->cache().setFull(); - }}; - t.detach(); - } + loadCacheAsync(*latestSequence); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " @@ -893,6 +885,46 @@ ReportingETL::monitor() } } +void +ReportingETL::loadCacheAsync(uint32_t seq) +{ + // sanity check to make sure we are not calling this multiple times + static std::atomic_bool loading = false; + if (loading) + { + assert(false); + return; + } + loading = true; + if (backend_->cache().isFull()) + { + assert(false); + return; + } + std::thread t{[this, seq]() { + BOOST_LOG_TRIVIAL(info) << "Loading cache"; + std::optional cursor; + while (true) + { + auto res = Backend::synchronousAndRetryOnTimeout( + [this, seq, &cursor](auto yield) { + return backend_->fetchLedgerPage(cursor, seq, 4096, yield); + }); + backend_->cache().update(res.objects, seq, true); + if (!res.cursor) + break; + BOOST_LOG_TRIVIAL(debug) + << "Loading cache. cache size = " << backend_->cache().size() + << " - cursor = " << ripple::strHex(res.cursor.value()); + cursor = std::move(res.cursor); + } + BOOST_LOG_TRIVIAL(info) << "Finished loading cache"; + + backend_->cache().setFull(); + }}; + t.detach(); +} + void ReportingETL::monitorReadOnly() { @@ -905,11 +937,7 @@ ReportingETL::monitorReadOnly() latestSequence = networkValidatedLedgers_->getMostRecent(); if (!latestSequence) return; - std::thread t{[this, latestSequence]() { - BOOST_LOG_TRIVIAL(info) << "Loading cache"; - loadBalancer_->loadInitialLedger(*latestSequence, true); - }}; - t.detach(); + loadCacheAsync(*latestSequence); latestSequence = *latestSequence + 1; while (true) { diff --git a/src/etl/ReportingETL.h b/src/etl/ReportingETL.h index 1e45fcf9..3880e286 100644 --- a/src/etl/ReportingETL.h +++ b/src/etl/ReportingETL.h @@ -147,9 +147,15 @@ private: std::optional loadInitialLedger(uint32_t sequence); - /// Run ETL. Extracts ledgers and writes them to the database, until a write - /// conflict occurs (or the server shuts down). - /// @note database must already be populated when this function is called + /// Populates the cache by walking through the given ledger. Should only be + /// called once + void + loadCacheAsync(uint32_t seq); + + /// Run ETL. Extracts ledgers and writes them to the database, until a + /// write conflict occurs (or the server shuts down). + /// @note database must already be populated when this function is + /// called /// @param startSequence the first ledger to extract /// @return the last ledger written to the database, if any std::optional diff --git a/src/rpc/handlers/BookOffers.cpp b/src/rpc/handlers/BookOffers.cpp index 4fa04ba2..05e1dca5 100644 --- a/src/rpc/handlers/BookOffers.cpp +++ b/src/rpc/handlers/BookOffers.cpp @@ -82,7 +82,7 @@ doBookOffers(Context const& context) } auto start = std::chrono::system_clock::now(); - auto [offers, retCursor, warning] = context.backend->fetchBookOffers( + auto [offers, retCursor] = context.backend->fetchBookOffers( bookBase, lgrInfo.seq, limit, cursor, context.yield); auto end = std::chrono::system_clock::now(); @@ -102,11 +102,6 @@ doBookOffers(Context const& context) if (retCursor) response["marker"] = ripple::strHex(*retCursor); - if (warning) - response["warning"] = - "Periodic database update in progress. Data for this book as of " - "this ledger " - "may be incomplete. Data should be complete within one minute"; return response; } diff --git a/src/rpc/handlers/LedgerData.cpp b/src/rpc/handlers/LedgerData.cpp index e6526902..15021d76 100644 --- a/src/rpc/handlers/LedgerData.cpp +++ b/src/rpc/handlers/LedgerData.cpp @@ -68,7 +68,7 @@ doLedgerData(Context const& context) Backend::LedgerPage page; auto start = std::chrono::system_clock::now(); page = context.backend->fetchLedgerPage( - cursor, lgrInfo.seq, limit, 0, context.yield); + cursor, lgrInfo.seq, limit, context.yield); auto end = std::chrono::system_clock::now(); @@ -137,17 +137,6 @@ doLedgerData(Context const& context) } response["state"] = objects; - if (cursor && page.warning) - { - response["warning"] = - "Periodic database update in progress. Data for this ledger may be " - "incomplete. Data should be complete " - "within a few minutes. Other RPC calls are not affected, " - "regardless of ledger. This " - "warning is only present on the first " - "page of the ledger"; - } - return response; } diff --git a/src/rpc/handlers/Subscribe.cpp b/src/rpc/handlers/Subscribe.cpp index d5018fe9..013838af 100644 --- a/src/rpc/handlers/Subscribe.cpp +++ b/src/rpc/handlers/Subscribe.cpp @@ -251,9 +251,8 @@ validateAndGetBooks( auto book, boost::asio::yield_context& yield) { auto bookBase = getBookBase(book); - auto [offers, retCursor, warning] = - backend->fetchBookOffers( - bookBase, rng->maxSequence, 200, {}, yield); + auto [offers, retCursor] = backend->fetchBookOffers( + bookBase, rng->maxSequence, 200, {}, yield); auto orderBook = postProcessOrderBook( offers, diff --git a/unittests/main.cpp b/unittests/main.cpp index 72974ee3..c9b1bd4f 100644 --- a/unittests/main.cpp +++ b/unittests/main.cpp @@ -804,7 +804,7 @@ TEST(BackendTest, Basic) { uint32_t limit = 10; page = backend->fetchLedgerPage( - page.cursor, seq, limit, 0, yield); + page.cursor, seq, limit, yield); std::cout << "fetched a page " << page.objects.size() << std::endl; if (page.cursor) @@ -817,7 +817,6 @@ TEST(BackendTest, Basic) page.objects.begin(), page.objects.end()); ++numLoops; - ASSERT_FALSE(page.warning.has_value()); } while (page.cursor); for (auto obj : objs) @@ -2187,7 +2186,7 @@ TEST(Backend, cacheIntegration) { uint32_t limit = 10; page = backend->fetchLedgerPage( - page.cursor, seq, limit, 0, yield); + page.cursor, seq, limit, yield); std::cout << "fetched a page " << page.objects.size() << std::endl; if (page.cursor) @@ -2200,7 +2199,6 @@ TEST(Backend, cacheIntegration) page.objects.begin(), page.objects.end()); ++numLoops; - ASSERT_FALSE(page.warning.has_value()); } while (page.cursor); for (auto obj : objs) {