Load cache from our own database

This commit is contained in:
CJ Cobb
2022-03-04 16:34:47 -05:00
parent a6afbd0945
commit b332c388d2
11 changed files with 61 additions and 68 deletions

View File

@@ -244,7 +244,6 @@ BackendInterface::fetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t const ledgerSequence, std::uint32_t const ledgerSequence,
std::uint32_t const limit, std::uint32_t const limit,
std::uint32_t const limitHint,
boost::asio::yield_context& yield) const boost::asio::yield_context& yield) const
{ {
LedgerPage page; LedgerPage page;

View File

@@ -217,7 +217,6 @@ public:
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t const ledgerSequence, std::uint32_t const ledgerSequence,
std::uint32_t const limit, std::uint32_t const limit,
std::uint32_t const limitHint,
boost::asio::yield_context& yield) const; boost::asio::yield_context& yield) const;
// Fetches the successor to key/index // Fetches the successor to key/index

View File

@@ -757,17 +757,8 @@ CassandraBackend::doOnlineDelete(
std::optional<ripple::uint256> cursor; std::optional<ripple::uint256> cursor;
while (true) while (true)
{ {
auto [objects, curCursor, warning] = retryOnTimeout([&]() { auto [objects, curCursor] = retryOnTimeout(
return fetchLedgerPage(cursor, minLedger, 256, 0, yield); [&]() { return fetchLedgerPage(cursor, minLedger, 256, yield); });
});
if (warning)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__
<< " online delete running but flag ledger is not complete";
std::this_thread::sleep_for(std::chrono::seconds(10));
continue;
}
for (auto& obj : objects) for (auto& obj : objects)
{ {

View File

@@ -784,17 +784,8 @@ PostgresBackend::doOnlineDelete(
std::optional<ripple::uint256> cursor; std::optional<ripple::uint256> cursor;
while (true) while (true)
{ {
auto [objects, curCursor, warning] = retryOnTimeout([&]() { auto [objects, curCursor] = retryOnTimeout(
return fetchLedgerPage(cursor, minLedger, 256, 0, yield); [&]() { return fetchLedgerPage(cursor, minLedger, 256, yield); });
});
if (warning)
{
BOOST_LOG_TRIVIAL(warning) << __func__
<< " online delete running but "
"flag ledger is not complete";
std::this_thread::sleep_for(std::chrono::seconds(10));
continue;
}
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
std::stringstream objectsBuffer; std::stringstream objectsBuffer;

View File

@@ -26,13 +26,11 @@ struct LedgerPage
{ {
std::vector<LedgerObject> objects; std::vector<LedgerObject> objects;
std::optional<ripple::uint256> cursor; std::optional<ripple::uint256> cursor;
std::optional<std::string> warning;
}; };
struct BookOffersPage struct BookOffersPage
{ {
std::vector<LedgerObject> offers; std::vector<LedgerObject> offers;
std::optional<ripple::uint256> cursor; std::optional<ripple::uint256> cursor;
std::optional<std::string> warning;
}; };
struct TransactionAndMetadata struct TransactionAndMetadata
{ {

View File

@@ -832,15 +832,7 @@ ReportingETL::monitor()
{ {
} }
uint32_t nextSequence = latestSequence.value() + 1; uint32_t nextSequence = latestSequence.value() + 1;
if (!backend_->cache().isFull()) loadCacheAsync(*latestSequence);
{
std::thread t{[this, latestSequence]() {
BOOST_LOG_TRIVIAL(info) << "Loading cache";
loadBalancer_->loadInitialLedger(*latestSequence, true);
backend_->cache().setFull();
}};
t.detach();
}
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " : "
@@ -893,6 +885,46 @@ ReportingETL::monitor()
} }
} }
void
ReportingETL::loadCacheAsync(uint32_t seq)
{
// sanity check to make sure we are not calling this multiple times
static std::atomic_bool loading = false;
if (loading)
{
assert(false);
return;
}
loading = true;
if (backend_->cache().isFull())
{
assert(false);
return;
}
std::thread t{[this, seq]() {
BOOST_LOG_TRIVIAL(info) << "Loading cache";
std::optional<ripple::uint256> cursor;
while (true)
{
auto res = Backend::synchronousAndRetryOnTimeout(
[this, seq, &cursor](auto yield) {
return backend_->fetchLedgerPage(cursor, seq, 4096, yield);
});
backend_->cache().update(res.objects, seq, true);
if (!res.cursor)
break;
BOOST_LOG_TRIVIAL(debug)
<< "Loading cache. cache size = " << backend_->cache().size()
<< " - cursor = " << ripple::strHex(res.cursor.value());
cursor = std::move(res.cursor);
}
BOOST_LOG_TRIVIAL(info) << "Finished loading cache";
backend_->cache().setFull();
}};
t.detach();
}
void void
ReportingETL::monitorReadOnly() ReportingETL::monitorReadOnly()
{ {
@@ -905,11 +937,7 @@ ReportingETL::monitorReadOnly()
latestSequence = networkValidatedLedgers_->getMostRecent(); latestSequence = networkValidatedLedgers_->getMostRecent();
if (!latestSequence) if (!latestSequence)
return; return;
std::thread t{[this, latestSequence]() { loadCacheAsync(*latestSequence);
BOOST_LOG_TRIVIAL(info) << "Loading cache";
loadBalancer_->loadInitialLedger(*latestSequence, true);
}};
t.detach();
latestSequence = *latestSequence + 1; latestSequence = *latestSequence + 1;
while (true) while (true)
{ {

View File

@@ -147,9 +147,15 @@ private:
std::optional<ripple::LedgerInfo> std::optional<ripple::LedgerInfo>
loadInitialLedger(uint32_t sequence); loadInitialLedger(uint32_t sequence);
/// Run ETL. Extracts ledgers and writes them to the database, until a write /// Populates the cache by walking through the given ledger. Should only be
/// conflict occurs (or the server shuts down). /// called once
/// @note database must already be populated when this function is called void
loadCacheAsync(uint32_t seq);
/// Run ETL. Extracts ledgers and writes them to the database, until a
/// write conflict occurs (or the server shuts down).
/// @note database must already be populated when this function is
/// called
/// @param startSequence the first ledger to extract /// @param startSequence the first ledger to extract
/// @return the last ledger written to the database, if any /// @return the last ledger written to the database, if any
std::optional<uint32_t> std::optional<uint32_t>

View File

@@ -82,7 +82,7 @@ doBookOffers(Context const& context)
} }
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
auto [offers, retCursor, warning] = context.backend->fetchBookOffers( auto [offers, retCursor] = context.backend->fetchBookOffers(
bookBase, lgrInfo.seq, limit, cursor, context.yield); bookBase, lgrInfo.seq, limit, cursor, context.yield);
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
@@ -102,11 +102,6 @@ doBookOffers(Context const& context)
if (retCursor) if (retCursor)
response["marker"] = ripple::strHex(*retCursor); response["marker"] = ripple::strHex(*retCursor);
if (warning)
response["warning"] =
"Periodic database update in progress. Data for this book as of "
"this ledger "
"may be incomplete. Data should be complete within one minute";
return response; return response;
} }

View File

@@ -68,7 +68,7 @@ doLedgerData(Context const& context)
Backend::LedgerPage page; Backend::LedgerPage page;
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
page = context.backend->fetchLedgerPage( page = context.backend->fetchLedgerPage(
cursor, lgrInfo.seq, limit, 0, context.yield); cursor, lgrInfo.seq, limit, context.yield);
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
@@ -137,17 +137,6 @@ doLedgerData(Context const& context)
} }
response["state"] = objects; response["state"] = objects;
if (cursor && page.warning)
{
response["warning"] =
"Periodic database update in progress. Data for this ledger may be "
"incomplete. Data should be complete "
"within a few minutes. Other RPC calls are not affected, "
"regardless of ledger. This "
"warning is only present on the first "
"page of the ledger";
}
return response; return response;
} }

View File

@@ -251,9 +251,8 @@ validateAndGetBooks(
auto book, auto book,
boost::asio::yield_context& yield) { boost::asio::yield_context& yield) {
auto bookBase = getBookBase(book); auto bookBase = getBookBase(book);
auto [offers, retCursor, warning] = auto [offers, retCursor] = backend->fetchBookOffers(
backend->fetchBookOffers( bookBase, rng->maxSequence, 200, {}, yield);
bookBase, rng->maxSequence, 200, {}, yield);
auto orderBook = postProcessOrderBook( auto orderBook = postProcessOrderBook(
offers, offers,

View File

@@ -804,7 +804,7 @@ TEST(BackendTest, Basic)
{ {
uint32_t limit = 10; uint32_t limit = 10;
page = backend->fetchLedgerPage( page = backend->fetchLedgerPage(
page.cursor, seq, limit, 0, yield); page.cursor, seq, limit, yield);
std::cout << "fetched a page " << page.objects.size() std::cout << "fetched a page " << page.objects.size()
<< std::endl; << std::endl;
if (page.cursor) if (page.cursor)
@@ -817,7 +817,6 @@ TEST(BackendTest, Basic)
page.objects.begin(), page.objects.begin(),
page.objects.end()); page.objects.end());
++numLoops; ++numLoops;
ASSERT_FALSE(page.warning.has_value());
} while (page.cursor); } while (page.cursor);
for (auto obj : objs) for (auto obj : objs)
@@ -2187,7 +2186,7 @@ TEST(Backend, cacheIntegration)
{ {
uint32_t limit = 10; uint32_t limit = 10;
page = backend->fetchLedgerPage( page = backend->fetchLedgerPage(
page.cursor, seq, limit, 0, yield); page.cursor, seq, limit, yield);
std::cout << "fetched a page " << page.objects.size() std::cout << "fetched a page " << page.objects.size()
<< std::endl; << std::endl;
if (page.cursor) if (page.cursor)
@@ -2200,7 +2199,6 @@ TEST(Backend, cacheIntegration)
page.objects.begin(), page.objects.begin(),
page.objects.end()); page.objects.end());
++numLoops; ++numLoops;
ASSERT_FALSE(page.warning.has_value());
} while (page.cursor); } while (page.cursor);
for (auto obj : objs) for (auto obj : objs)
{ {