diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index be14dfd1..aa364b31 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -17,13 +17,23 @@ BackendIndexer::~BackendIndexer() void BackendIndexer::addKey(ripple::uint256 const& key) { + std::unique_lock lck(mtx); keys.insert(key); keysCumulative.insert(key); } void +BackendIndexer::addKeyAsync(ripple::uint256 const& key) +{ + std::unique_lock lck(mtx); + keysCumulative.insert(key); +} +void BackendIndexer::deleteKey(ripple::uint256 const& key) { + std::unique_lock lck(mtx); keysCumulative.erase(key); + if (populatingCacheAsync) + deletedKeys.insert(key); } void @@ -31,15 +41,27 @@ BackendIndexer::addBookOffer( ripple::uint256 const& book, ripple::uint256 const& offerKey) { + std::unique_lock lck(mtx); books[book].insert(offerKey); booksCumulative[book].insert(offerKey); } void +BackendIndexer::addBookOfferAsync( + ripple::uint256 const& book, + ripple::uint256 const& offerKey) +{ + std::unique_lock lck(mtx); + booksCumulative[book].insert(offerKey); +} +void BackendIndexer::deleteBookOffer( ripple::uint256 const& book, ripple::uint256 const& offerKey) { + std::unique_lock lck(mtx); booksCumulative[book].erase(offerKey); + if (populatingCacheAsync) + deletedBooks[book].insert(offerKey); } void @@ -48,21 +70,18 @@ BackendIndexer::clearCaches() keysCumulative = {}; booksCumulative = {}; } + void BackendIndexer::populateCaches( BackendInterface const& backend, std::optional sequence) { - if (keysCumulative.size() > 0) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " caches already populated. returning"; - return; - } if (!sequence) sequence = backend.fetchLatestLedgerSequence(); if (!sequence) return; + BOOST_LOG_TRIVIAL(info) + << __func__ << " sequence = " << std::to_string(*sequence); std::optional cursor; while (true) { @@ -84,11 +103,11 @@ BackendIndexer::populateCaches( cursor = curCursor; for (auto& obj : objects) { - keysCumulative.insert(obj.key); + addKeyAsync(obj.key); if (isOffer(obj.blob)) { auto book = getBook(obj.blob); - booksCumulative[book].insert(obj.key); + addBookOfferAsync(book, obj.key); } } if (!cursor) @@ -101,6 +120,62 @@ BackendIndexer::populateCaches( std::this_thread::sleep_for(std::chrono::seconds(2)); } } + // Do reconcilation. Remove anything from keys or books that shouldn't be + // there + { + std::unique_lock lck(mtx); + populatingCacheAsync = false; + } + auto tip = backend.fetchLatestLedgerSequence(); + for (auto& key : deletedKeys) + { + deleteKey(key); + } + for (auto& book : deletedBooks) + { + for (auto& offer : book.second) + { + deleteBookOffer(book.first, offer); + } + } + { + std::unique_lock lck(mtx); + deletedKeys = {}; + deletedBooks = {}; + cv_.notify_one(); + } + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. keys.size() = " << std::to_string(keysCumulative.size()); +} +void +BackendIndexer::populateCachesAsync( + BackendInterface const& backend, + std::optional sequence) +{ + if (keysCumulative.size() > 0) + { + BOOST_LOG_TRIVIAL(info) + << __func__ << " caches already populated. returning"; + return; + } + { + std::unique_lock lck(mtx); + populatingCacheAsync = true; + } + BOOST_LOG_TRIVIAL(info) << __func__; + boost::asio::post(ioc_, [this, sequence, &backend]() { + populateCaches(backend, sequence); + }); +} + +void +BackendIndexer::waitForCaches() +{ + std::unique_lock lck(mtx); + cv_.wait(lck, [this]() { + return !populatingCacheAsync && deletedKeys.size() == 0; + }); } void @@ -119,6 +194,7 @@ BackendIndexer::writeNext( if (isFlag) { + waitForCaches(); auto booksCopy = booksCumulative; auto keysCopy = keysCumulative; boost::asio::post(ioc_, [=, &backend]() { diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index 899c910f..dfe937b1 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -70,30 +70,51 @@ class BackendIndexer std::thread ioThread_; uint32_t shift_ = 16; std::unordered_set keys; - std::unordered_map> - books; std::unordered_set keysCumulative; + std::unordered_map> + books; std::unordered_map> booksCumulative; + bool populatingCacheAsync = false; + // These are only used when the cache is being populated asynchronously + std::unordered_set deletedKeys; + std::unordered_map> + deletedBooks; + std::mutex mtx; + std::condition_variable cv_; + + void + addKeyAsync(ripple::uint256 const& key); + void + addBookOfferAsync( + ripple::uint256 const& book, + ripple::uint256 const& offerKey); public: BackendIndexer(boost::json::object const& config); ~BackendIndexer(); + void + populateCachesAsync( + BackendInterface const& backend, + std::optional sequence = {}); void populateCaches( BackendInterface const& backend, std::optional sequence = {}); void clearCaches(); + // Blocking, possibly for minutes + void + waitForCaches(); void addKey(ripple::uint256 const& key); void deleteKey(ripple::uint256 const& key); - void addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey); + void deleteBookOffer( ripple::uint256 const& book, @@ -253,7 +274,8 @@ public: // other database methods // Open the database. Set up all of the necessary objects and - // datastructures. After this call completes, the database is ready for use. + // datastructures. After this call completes, the database is ready for + // use. virtual void open(bool readOnly) = 0; diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 2860187b..54536140 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -289,73 +289,6 @@ CassandraBackend::fetchAllTransactionHashesInLedger( return hashes; } -LedgerPage -CassandraBackend::fetchLedgerPage2( - std::optional const& cursor, - std::uint32_t ledgerSequence, - std::uint32_t limit) const -{ - BOOST_LOG_TRIVIAL(trace) << __func__; - std::optional currentCursor = cursor; - std::vector objects; - uint32_t curLimit = limit; - while (objects.size() < limit) - { - CassandraStatement statement{selectLedgerPage_}; - - int64_t intCursor = INT64_MIN; - if (currentCursor) - { - auto token = getToken(currentCursor->data()); - if (token) - intCursor = *token; - } - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - cursor = " << std::to_string(intCursor) - << " , sequence = " << std::to_string(ledgerSequence) - << ", - limit = " << std::to_string(limit); - statement.bindInt(intCursor); - statement.bindInt(ledgerSequence); - statement.bindUInt(curLimit); - - CassandraResult result = executeSyncRead(statement); - - if (!!result) - { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - got keys - size = " << result.numRows(); - - size_t prevSize = objects.size(); - do - { - std::vector object = result.getBytes(); - if (object.size()) - { - objects.push_back({result.getUInt256(), std::move(object)}); - } - } while (result.nextRow()); - size_t prevBatchSize = objects.size() - prevSize; - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - added to objects. size = " << objects.size(); - if (result.numRows() < curLimit) - { - currentCursor = {}; - break; - } - if (objects.size() < limit) - { - curLimit = 2048; - } - assert(objects.size()); - currentCursor = objects[objects.size() - 1].key; - } - } - if (objects.size()) - return {objects, currentCursor}; - - return {{}, {}}; -} - struct ReadDiffCallbackData { CassandraBackend const& backend; @@ -480,12 +413,12 @@ CassandraBackend::fetchLedgerPage( if (!index) return {}; LedgerPage page; - if (cursor) - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - Cursor = " << ripple::strHex(*cursor); BOOST_LOG_TRIVIAL(debug) << __func__ << " ledgerSequence = " << std::to_string(ledgerSequence) << " index = " << std::to_string(*index); + if (cursor) + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - Cursor = " << ripple::strHex(*cursor); CassandraStatement statement{selectKeys_}; statement.bindInt(*index); if (cursor) @@ -497,10 +430,9 @@ CassandraBackend::fetchLedgerPage( } statement.bindUInt(limit); CassandraResult result = executeSyncRead(statement); - BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Got keys"; if (!!result) { - BOOST_LOG_TRIVIAL(debug) + BOOST_LOG_TRIVIAL(trace) << __func__ << " - got keys - size = " << result.numRows(); std::vector keys; @@ -508,17 +440,14 @@ CassandraBackend::fetchLedgerPage( { keys.push_back(result.getUInt256()); } while (result.nextRow()); - BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Read keys"; auto objects = fetchLedgerObjects(keys, ledgerSequence); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Using base ledger. Got objects"; if (objects.size() != keys.size()) throw std::runtime_error("Mismatch in size of objects and keys"); if (keys.size() == limit) page.cursor = keys[keys.size() - 1]; if (cursor) - BOOST_LOG_TRIVIAL(debug) + BOOST_LOG_TRIVIAL(trace) << __func__ << " Cursor = " << ripple::strHex(*page.cursor); for (size_t i = 0; i < objects.size(); ++i) @@ -530,7 +459,7 @@ CassandraBackend::fetchLedgerPage( page.objects.push_back({std::move(key), std::move(obj)}); } } - if (keys.size() && keys[0].isZero()) + if (keys.size() && !cursor && !keys[0].isZero()) page.warning = "Data may be incomplete"; return page; } diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 3b89f475..e5732a2b 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -945,11 +945,6 @@ public: return {{result.getBytes(), result.getBytes(), result.getUInt32()}}; } LedgerPage - fetchLedgerPage2( - std::optional const& cursor, - std::uint32_t ledgerSequence, - std::uint32_t limit) const; - LedgerPage fetchLedgerPage( std::optional const& cursor, std::uint32_t ledgerSequence, diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 5b10a274..969f6572 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -362,7 +362,7 @@ PostgresBackend::fetchLedgerPage( results.push_back({keys[i], objs[i]}); } } - if (keys[0].isZero()) + if (!cursor && !keys[0].isZero()) return {results, returnCursor, "Data may be incomplete"}; return {results, returnCursor}; } diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 270d8006..5a15a581 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -367,7 +367,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) } BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Populating caches"; - flatMapBackend_->getIndexer().populateCaches(*flatMapBackend_); + + flatMapBackend_->getIndexer().populateCachesAsync(*flatMapBackend_); BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Populated caches";