diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index 3a134b2e..af1b7089 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -324,7 +324,7 @@ doBookOffers( } auto start = std::chrono::system_clock::now(); - auto [offers, retCursor] = + auto [offers, retCursor, warning] = backend.fetchBookOffers(bookBase, *ledgerSequence, limit, cursor); auto end = std::chrono::system_clock::now(); @@ -361,6 +361,11 @@ doBookOffers( << ((end - start).count() / 1000000000.0); if (retCursor) response["cursor"] = ripple::strHex(*retCursor); + if (warning) + response["warning"] = + "Periodic database update in progress. Data for this book as of " + "this ledger " + "may be incomplete. Data should be complete within one minute"; return response; } diff --git a/handlers/LedgerData.cpp b/handlers/LedgerData.cpp index 72f829b3..c43bf709 100644 --- a/handlers/LedgerData.cpp +++ b/handlers/LedgerData.cpp @@ -100,6 +100,16 @@ doLedgerData( response["num_results"] = results.size(); response["db_time"] = time; response["time_per_result"] = time / (results.size() ? results.size() : 1); + if (page.warning) + { + response["warning"] = + "Periodic database update in progress. Data for this ledger may be " + "incomplete. Data should be complete " + "within a few minutes. Other RPC calls are not affected, " + "regardless of ledger. This " + "warning is only present on the first " + "page of the ledger"; + } return response; } diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index 58b2388a..be14dfd1 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -49,7 +49,9 @@ BackendIndexer::clearCaches() booksCumulative = {}; } void -BackendIndexer::populateCaches(BackendInterface const& backend) +BackendIndexer::populateCaches( + BackendInterface const& backend, + std::optional sequence) { if (keysCumulative.size() > 0) { @@ -57,16 +59,27 @@ BackendIndexer::populateCaches(BackendInterface const& backend) << __func__ << " caches already populated. returning"; return; } - auto tip = backend.fetchLatestLedgerSequence(); - if (!tip) + if (!sequence) + sequence = backend.fetchLatestLedgerSequence(); + if (!sequence) return; std::optional cursor; while (true) { try { - auto [objects, curCursor] = - backend.fetchLedgerPage(cursor, *tip, 2048); + auto [objects, curCursor, warning] = + backend.fetchLedgerPage(cursor, *sequence, 2048); + if (warning) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " performing index repair"; + uint32_t lower = (*sequence - 1) >> shift_ << shift_; + populateCaches(backend, lower); + writeNext(lower, backend); + clearCaches(); + continue; + } BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; for (auto& obj : objects) @@ -106,16 +119,23 @@ BackendIndexer::writeNext( if (isFlag) { - uint32_t nextSeq = - ((ledgerSequence >> shift_ << shift_) + (1 << shift_)); - BOOST_LOG_TRIVIAL(info) - << __func__ << " actually doing the write. keysCumulative.size() = " - << std::to_string(keysCumulative.size()); - backend.writeKeys(keysCumulative, nextSeq); - BOOST_LOG_TRIVIAL(info) << __func__ << " wrote keys"; - - backend.writeBooks(booksCumulative, nextSeq); - BOOST_LOG_TRIVIAL(info) << __func__ << " wrote books"; + auto booksCopy = booksCumulative; + auto keysCopy = keysCumulative; + boost::asio::post(ioc_, [=, &backend]() { + uint32_t nextSeq = + ((ledgerSequence >> shift_ << shift_) + (1 << shift_)); + ripple::uint256 zero = {}; + BOOST_LOG_TRIVIAL(info) << __func__ << " booksCumulative.size() = " + << std::to_string(booksCumulative.size()); + backend.writeBooks(booksCopy, nextSeq); + backend.writeBooks({{zero, {zero}}}, nextSeq); + BOOST_LOG_TRIVIAL(info) << __func__ << " wrote books"; + BOOST_LOG_TRIVIAL(info) << __func__ << " keysCumulative.size() = " + << std::to_string(keysCumulative.size()); + backend.writeKeys(keysCopy, nextSeq); + backend.writeKeys({zero}, nextSeq); + BOOST_LOG_TRIVIAL(info) << __func__ << " wrote keys"; + }); } } diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index dfb5dc14..899c910f 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -26,6 +26,13 @@ struct LedgerPage { std::vector objects; std::optional cursor; + std::optional warning; +}; +struct BookOffersPage +{ + std::vector offers; + std::optional cursor; + std::optional warning; }; struct TransactionAndMetadata { @@ -74,7 +81,9 @@ public: ~BackendIndexer(); void - populateCaches(BackendInterface const& backend); + populateCaches( + BackendInterface const& backend, + std::optional sequence = {}); void clearCaches(); @@ -160,7 +169,8 @@ public: std::uint32_t ledgerSequence, std::uint32_t limit) const = 0; - virtual std::pair, std::optional> + // TODO add warning for incomplete data + virtual BookOffersPage fetchBookOffers( ripple::uint256 const& book, uint32_t ledgerSequence, diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 3e3e9b41..2860187b 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -530,6 +530,8 @@ CassandraBackend::fetchLedgerPage( page.objects.push_back({std::move(key), std::move(obj)}); } } + if (keys.size() && keys[0].isZero()) + page.warning = "Data may be incomplete"; return page; } return {{}, {}}; @@ -568,7 +570,7 @@ CassandraBackend::fetchLedgerObjects( << "Fetched " << numKeys << " records from Cassandra"; return results; } -std::pair, std::optional> +BookOffersPage CassandraBackend::fetchBookOffers( ripple::uint256 const& book, uint32_t sequence, @@ -613,8 +615,14 @@ CassandraBackend::fetchBookOffers( if (objs[i].size() != 0) results.push_back({keys[i], objs[i]}); } - if (keys.size()) - return {results, keys[keys.size() - 1]}; + std::optional warning; + if (keys[0].isZero()) + warning = "Data may be incomplete"; + if (keys.size() == limit) + return {results, keys[keys.size() - 1], warning}; + else + return {results, {}, warning}; + return {{}, {}}; } diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index c7925c37..3b89f475 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -977,7 +977,7 @@ public: ripple::uint256, std::unordered_set> const& books, uint32_t ledgerSequence) const override; - std::pair, std::optional> + BookOffersPage fetchBookOffers( ripple::uint256 const& book, uint32_t sequence, diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 850e9930..5b10a274 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -338,7 +338,7 @@ PostgresBackend::fetchLedgerPage( sql << "SELECT key FROM keys WHERE ledger_seq = " << std::to_string(*index); if (cursor) sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'"; - sql << " ORDER BY key DESC LIMIT " << std::to_string(limit); + sql << " ORDER BY key ASC LIMIT " << std::to_string(limit); BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str(); auto res = pgQuery(sql.str().data()); BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched keys"; @@ -362,12 +362,14 @@ PostgresBackend::fetchLedgerPage( results.push_back({keys[i], objs[i]}); } } + if (keys[0].isZero()) + return {results, returnCursor, "Data may be incomplete"}; return {results, returnCursor}; } return {}; } -std::pair, std::optional> +BookOffersPage PostgresBackend::fetchBookOffers( ripple::uint256 const& book, uint32_t ledgerSequence, @@ -392,6 +394,9 @@ PostgresBackend::fetchBookOffers( { keys.push_back(res.asUInt256(i, 0)); } + std::optional warning; + if (keys[0].isZero()) + warning = "Data may be incomplete"; std::vector blobs = fetchLedgerObjects(keys, ledgerSequence); std::vector results; @@ -409,10 +414,10 @@ PostgresBackend::fetchBookOffers( BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << ripple::strHex(results[0].key) << " : " << ripple::strHex(results[results.size() - 1].key); - return {results, results[results.size() - 1].key}; + return {results, results[results.size() - 1].key, warning}; } else - return {results, {}}; + return {results, {}, warning}; } return {{}, {}}; } diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index 30c2df01..84835417 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -50,7 +50,7 @@ public: std::uint32_t ledgerSequence, std::uint32_t limit) const override; - std::pair, std::optional> + BookOffersPage fetchBookOffers( ripple::uint256 const& book, uint32_t ledgerSequence,