From 4e58c76eac071291bc0a78f266d71b7c71f47f13 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Mon, 3 May 2021 21:16:25 +0000 Subject: [PATCH 1/5] Fix crashing bug related to account_tx writes --- reporting/CassandraBackend.cpp | 29 +++++++++++--------- reporting/CassandraBackend.h | 49 ++++++++++++++++++++++------------ 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 3368b57e..3e3e9b41 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -24,12 +24,13 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) auto rc = cass_future_error_code(fut); if (rc != CASS_OK) { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; // exponential backoff with a max wait of 2^10 ms (about 1 second) auto wait = std::chrono::milliseconds( lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra ETL insert error: " << rc << ", " + << cass_error_desc(rc) << ", retrying in " << wait.count() + << " milliseconds"; ++requestParams.currentRetries; std::shared_ptr timer = std::make_shared( @@ -42,6 +43,8 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) } else { + BOOST_LOG_TRIVIAL(trace) + << __func__ << " Succesfully inserted a record"; backend.finishAsyncWrite(); int remaining = --requestParams.refs; if (remaining == 0) @@ -668,12 +671,13 @@ writeBookCallback(CassFuture* fut, void* cbData) auto rc = cass_future_error_code(fut); if (rc != CASS_OK) { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert key error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; // exponential backoff with a max wait of 2^10 ms (about 1 second) auto wait = std::chrono::milliseconds( lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert book error: " << rc << ", " + << cass_error_desc(rc) << ", retrying in " << wait.count() + << " milliseconds"; ++requestParams.currentRetries; std::shared_ptr timer = std::make_shared( @@ -686,7 +690,7 @@ writeBookCallback(CassFuture* fut, void* cbData) } else { - BOOST_LOG_TRIVIAL(trace) << __func__ << "Finished a write request"; + BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book"; { std::lock_guard lck(requestParams.mtx); --requestParams.numRemaining; @@ -742,12 +746,13 @@ writeKeyCallback(CassFuture* fut, void* cbData) auto rc = cass_future_error_code(fut); if (rc != CASS_OK) { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert key error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) auto wait = std::chrono::milliseconds( lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert key error: " << rc << ", " + << cass_error_desc(rc) << ", retrying in " << wait.count() + << " milliseconds"; + // exponential backoff with a max wait of 2^10 ms (about 1 second) ++requestParams.currentRetries; std::shared_ptr timer = std::make_shared( @@ -760,7 +765,7 @@ writeKeyCallback(CassFuture* fut, void* cbData) } else { - BOOST_LOG_TRIVIAL(trace) << __func__ << "Finished a write request"; + BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a key"; { std::lock_guard lck(requestParams.mtx); --requestParams.numRemaining; diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index d0175416..c7925c37 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -1185,18 +1185,29 @@ public: { } }; + struct WriteAccountTxCallbackData { CassandraBackend const* backend; - AccountTransactionsData data; + ripple::AccountID account; + uint32_t ledgerSequence; + uint32_t transactionIndex; + ripple::uint256 txHash; uint32_t currentRetries = 0; - std::atomic refs; + std::atomic refs = 1; WriteAccountTxCallbackData( CassandraBackend const* f, - AccountTransactionsData&& in) - : backend(f), data(std::move(in)), refs(data.accounts.size()) + ripple::AccountID&& account, + uint32_t lgrSeq, + uint32_t txIdx, + ripple::uint256&& hash) + : backend(f) + , account(std::move(account)) + , ledgerSequence(lgrSeq) + , transactionIndex(txIdx) + , txHash(std::move(hash)) { } }; @@ -1291,26 +1302,30 @@ public: { for (auto& record : data) { - WriteAccountTxCallbackData* cbData = - new WriteAccountTxCallbackData(this, std::move(record)); - writeAccountTx(*cbData, false); + for (auto& account : record.accounts) + { + WriteAccountTxCallbackData* cbData = + new WriteAccountTxCallbackData( + this, + std::move(account), + record.ledgerSequence, + record.transactionIndex, + std::move(record.txHash)); + writeAccountTx(*cbData, false); + } } } void writeAccountTx(WriteAccountTxCallbackData& data, bool isRetry) const { - for (auto const& account : data.data.accounts) - { - CassandraStatement statement(insertAccountTx_); - statement.bindBytes(account); - statement.bindIntTuple( - data.data.ledgerSequence, data.data.transactionIndex); - statement.bindBytes(data.data.txHash); + CassandraStatement statement(insertAccountTx_); + statement.bindBytes(data.account); + statement.bindIntTuple(data.ledgerSequence, data.transactionIndex); + statement.bindBytes(data.txHash); - executeAsyncWrite( - statement, flatMapWriteAccountTxCallback, data, isRetry); - } + executeAsyncWrite( + statement, flatMapWriteAccountTxCallback, data, isRetry); } struct WriteTransactionCallbackData From 971437f456c849a3ef4fe4e999dbc75af1a37543 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 30 Apr 2021 04:41:51 +0000 Subject: [PATCH 2/5] async write to keys and books with recovery and warnings --- handlers/BookOffers.cpp | 7 ++++- handlers/LedgerData.cpp | 10 +++++++ reporting/BackendIndexer.cpp | 50 ++++++++++++++++++++++++---------- reporting/BackendInterface.h | 14 ++++++++-- reporting/CassandraBackend.cpp | 14 ++++++++-- reporting/CassandraBackend.h | 2 +- reporting/PostgresBackend.cpp | 13 ++++++--- reporting/PostgresBackend.h | 2 +- 8 files changed, 85 insertions(+), 27 deletions(-) diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index 3a134b2e..af1b7089 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -324,7 +324,7 @@ doBookOffers( } auto start = std::chrono::system_clock::now(); - auto [offers, retCursor] = + auto [offers, retCursor, warning] = backend.fetchBookOffers(bookBase, *ledgerSequence, limit, cursor); auto end = std::chrono::system_clock::now(); @@ -361,6 +361,11 @@ doBookOffers( << ((end - start).count() / 1000000000.0); if (retCursor) response["cursor"] = ripple::strHex(*retCursor); + if (warning) + response["warning"] = + "Periodic database update in progress. Data for this book as of " + "this ledger " + "may be incomplete. Data should be complete within one minute"; return response; } diff --git a/handlers/LedgerData.cpp b/handlers/LedgerData.cpp index 72f829b3..c43bf709 100644 --- a/handlers/LedgerData.cpp +++ b/handlers/LedgerData.cpp @@ -100,6 +100,16 @@ doLedgerData( response["num_results"] = results.size(); response["db_time"] = time; response["time_per_result"] = time / (results.size() ? results.size() : 1); + if (page.warning) + { + response["warning"] = + "Periodic database update in progress. Data for this ledger may be " + "incomplete. Data should be complete " + "within a few minutes. Other RPC calls are not affected, " + "regardless of ledger. This " + "warning is only present on the first " + "page of the ledger"; + } return response; } diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index 58b2388a..be14dfd1 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -49,7 +49,9 @@ BackendIndexer::clearCaches() booksCumulative = {}; } void -BackendIndexer::populateCaches(BackendInterface const& backend) +BackendIndexer::populateCaches( + BackendInterface const& backend, + std::optional sequence) { if (keysCumulative.size() > 0) { @@ -57,16 +59,27 @@ BackendIndexer::populateCaches(BackendInterface const& backend) << __func__ << " caches already populated. returning"; return; } - auto tip = backend.fetchLatestLedgerSequence(); - if (!tip) + if (!sequence) + sequence = backend.fetchLatestLedgerSequence(); + if (!sequence) return; std::optional cursor; while (true) { try { - auto [objects, curCursor] = - backend.fetchLedgerPage(cursor, *tip, 2048); + auto [objects, curCursor, warning] = + backend.fetchLedgerPage(cursor, *sequence, 2048); + if (warning) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " performing index repair"; + uint32_t lower = (*sequence - 1) >> shift_ << shift_; + populateCaches(backend, lower); + writeNext(lower, backend); + clearCaches(); + continue; + } BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; for (auto& obj : objects) @@ -106,16 +119,23 @@ BackendIndexer::writeNext( if (isFlag) { - uint32_t nextSeq = - ((ledgerSequence >> shift_ << shift_) + (1 << shift_)); - BOOST_LOG_TRIVIAL(info) - << __func__ << " actually doing the write. keysCumulative.size() = " - << std::to_string(keysCumulative.size()); - backend.writeKeys(keysCumulative, nextSeq); - BOOST_LOG_TRIVIAL(info) << __func__ << " wrote keys"; - - backend.writeBooks(booksCumulative, nextSeq); - BOOST_LOG_TRIVIAL(info) << __func__ << " wrote books"; + auto booksCopy = booksCumulative; + auto keysCopy = keysCumulative; + boost::asio::post(ioc_, [=, &backend]() { + uint32_t nextSeq = + ((ledgerSequence >> shift_ << shift_) + (1 << shift_)); + ripple::uint256 zero = {}; + BOOST_LOG_TRIVIAL(info) << __func__ << " booksCumulative.size() = " + << std::to_string(booksCumulative.size()); + backend.writeBooks(booksCopy, nextSeq); + backend.writeBooks({{zero, {zero}}}, nextSeq); + BOOST_LOG_TRIVIAL(info) << __func__ << " wrote books"; + BOOST_LOG_TRIVIAL(info) << __func__ << " keysCumulative.size() = " + << std::to_string(keysCumulative.size()); + backend.writeKeys(keysCopy, nextSeq); + backend.writeKeys({zero}, nextSeq); + BOOST_LOG_TRIVIAL(info) << __func__ << " wrote keys"; + }); } } diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index dfb5dc14..899c910f 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -26,6 +26,13 @@ struct LedgerPage { std::vector objects; std::optional cursor; + std::optional warning; +}; +struct BookOffersPage +{ + std::vector offers; + std::optional cursor; + std::optional warning; }; struct TransactionAndMetadata { @@ -74,7 +81,9 @@ public: ~BackendIndexer(); void - populateCaches(BackendInterface const& backend); + populateCaches( + BackendInterface const& backend, + std::optional sequence = {}); void clearCaches(); @@ -160,7 +169,8 @@ public: std::uint32_t ledgerSequence, std::uint32_t limit) const = 0; - virtual std::pair, std::optional> + // TODO add warning for incomplete data + virtual BookOffersPage fetchBookOffers( ripple::uint256 const& book, uint32_t ledgerSequence, diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 3e3e9b41..2860187b 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -530,6 +530,8 @@ CassandraBackend::fetchLedgerPage( page.objects.push_back({std::move(key), std::move(obj)}); } } + if (keys.size() && keys[0].isZero()) + page.warning = "Data may be incomplete"; return page; } return {{}, {}}; @@ -568,7 +570,7 @@ CassandraBackend::fetchLedgerObjects( << "Fetched " << numKeys << " records from Cassandra"; return results; } -std::pair, std::optional> +BookOffersPage CassandraBackend::fetchBookOffers( ripple::uint256 const& book, uint32_t sequence, @@ -613,8 +615,14 @@ CassandraBackend::fetchBookOffers( if (objs[i].size() != 0) results.push_back({keys[i], objs[i]}); } - if (keys.size()) - return {results, keys[keys.size() - 1]}; + std::optional warning; + if (keys[0].isZero()) + warning = "Data may be incomplete"; + if (keys.size() == limit) + return {results, keys[keys.size() - 1], warning}; + else + return {results, {}, warning}; + return {{}, {}}; } diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index c7925c37..3b89f475 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -977,7 +977,7 @@ public: ripple::uint256, std::unordered_set> const& books, uint32_t ledgerSequence) const override; - std::pair, std::optional> + BookOffersPage fetchBookOffers( ripple::uint256 const& book, uint32_t sequence, diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 850e9930..5b10a274 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -338,7 +338,7 @@ PostgresBackend::fetchLedgerPage( sql << "SELECT key FROM keys WHERE ledger_seq = " << std::to_string(*index); if (cursor) sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'"; - sql << " ORDER BY key DESC LIMIT " << std::to_string(limit); + sql << " ORDER BY key ASC LIMIT " << std::to_string(limit); BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str(); auto res = pgQuery(sql.str().data()); BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched keys"; @@ -362,12 +362,14 @@ PostgresBackend::fetchLedgerPage( results.push_back({keys[i], objs[i]}); } } + if (keys[0].isZero()) + return {results, returnCursor, "Data may be incomplete"}; return {results, returnCursor}; } return {}; } -std::pair, std::optional> +BookOffersPage PostgresBackend::fetchBookOffers( ripple::uint256 const& book, uint32_t ledgerSequence, @@ -392,6 +394,9 @@ PostgresBackend::fetchBookOffers( { keys.push_back(res.asUInt256(i, 0)); } + std::optional warning; + if (keys[0].isZero()) + warning = "Data may be incomplete"; std::vector blobs = fetchLedgerObjects(keys, ledgerSequence); std::vector results; @@ -409,10 +414,10 @@ PostgresBackend::fetchBookOffers( BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << ripple::strHex(results[0].key) << " : " << ripple::strHex(results[results.size() - 1].key); - return {results, results[results.size() - 1].key}; + return {results, results[results.size() - 1].key, warning}; } else - return {results, {}}; + return {results, {}, warning}; } return {{}, {}}; } diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index 30c2df01..84835417 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -50,7 +50,7 @@ public: std::uint32_t ledgerSequence, std::uint32_t limit) const override; - std::pair, std::optional> + BookOffersPage fetchBookOffers( ripple::uint256 const& book, uint32_t ledgerSequence, From 64d0c5d05057ab4dc4061ddea76dd36802977f61 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 30 Apr 2021 20:15:38 +0000 Subject: [PATCH 3/5] async populate caches --- reporting/BackendIndexer.cpp | 92 +++++++++++++++++++++++++++++++--- reporting/BackendInterface.h | 30 +++++++++-- reporting/CassandraBackend.cpp | 83 +++--------------------------- reporting/CassandraBackend.h | 5 -- reporting/PostgresBackend.cpp | 2 +- reporting/ReportingETL.cpp | 3 +- 6 files changed, 119 insertions(+), 96 deletions(-) diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index be14dfd1..aa364b31 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -17,13 +17,23 @@ BackendIndexer::~BackendIndexer() void BackendIndexer::addKey(ripple::uint256 const& key) { + std::unique_lock lck(mtx); keys.insert(key); keysCumulative.insert(key); } void +BackendIndexer::addKeyAsync(ripple::uint256 const& key) +{ + std::unique_lock lck(mtx); + keysCumulative.insert(key); +} +void BackendIndexer::deleteKey(ripple::uint256 const& key) { + std::unique_lock lck(mtx); keysCumulative.erase(key); + if (populatingCacheAsync) + deletedKeys.insert(key); } void @@ -31,15 +41,27 @@ BackendIndexer::addBookOffer( ripple::uint256 const& book, ripple::uint256 const& offerKey) { + std::unique_lock lck(mtx); books[book].insert(offerKey); booksCumulative[book].insert(offerKey); } void +BackendIndexer::addBookOfferAsync( + ripple::uint256 const& book, + ripple::uint256 const& offerKey) +{ + std::unique_lock lck(mtx); + booksCumulative[book].insert(offerKey); +} +void BackendIndexer::deleteBookOffer( ripple::uint256 const& book, ripple::uint256 const& offerKey) { + std::unique_lock lck(mtx); booksCumulative[book].erase(offerKey); + if (populatingCacheAsync) + deletedBooks[book].insert(offerKey); } void @@ -48,21 +70,18 @@ BackendIndexer::clearCaches() keysCumulative = {}; booksCumulative = {}; } + void BackendIndexer::populateCaches( BackendInterface const& backend, std::optional sequence) { - if (keysCumulative.size() > 0) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " caches already populated. returning"; - return; - } if (!sequence) sequence = backend.fetchLatestLedgerSequence(); if (!sequence) return; + BOOST_LOG_TRIVIAL(info) + << __func__ << " sequence = " << std::to_string(*sequence); std::optional cursor; while (true) { @@ -84,11 +103,11 @@ BackendIndexer::populateCaches( cursor = curCursor; for (auto& obj : objects) { - keysCumulative.insert(obj.key); + addKeyAsync(obj.key); if (isOffer(obj.blob)) { auto book = getBook(obj.blob); - booksCumulative[book].insert(obj.key); + addBookOfferAsync(book, obj.key); } } if (!cursor) @@ -101,6 +120,62 @@ BackendIndexer::populateCaches( std::this_thread::sleep_for(std::chrono::seconds(2)); } } + // Do reconcilation. Remove anything from keys or books that shouldn't be + // there + { + std::unique_lock lck(mtx); + populatingCacheAsync = false; + } + auto tip = backend.fetchLatestLedgerSequence(); + for (auto& key : deletedKeys) + { + deleteKey(key); + } + for (auto& book : deletedBooks) + { + for (auto& offer : book.second) + { + deleteBookOffer(book.first, offer); + } + } + { + std::unique_lock lck(mtx); + deletedKeys = {}; + deletedBooks = {}; + cv_.notify_one(); + } + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. keys.size() = " << std::to_string(keysCumulative.size()); +} +void +BackendIndexer::populateCachesAsync( + BackendInterface const& backend, + std::optional sequence) +{ + if (keysCumulative.size() > 0) + { + BOOST_LOG_TRIVIAL(info) + << __func__ << " caches already populated. returning"; + return; + } + { + std::unique_lock lck(mtx); + populatingCacheAsync = true; + } + BOOST_LOG_TRIVIAL(info) << __func__; + boost::asio::post(ioc_, [this, sequence, &backend]() { + populateCaches(backend, sequence); + }); +} + +void +BackendIndexer::waitForCaches() +{ + std::unique_lock lck(mtx); + cv_.wait(lck, [this]() { + return !populatingCacheAsync && deletedKeys.size() == 0; + }); } void @@ -119,6 +194,7 @@ BackendIndexer::writeNext( if (isFlag) { + waitForCaches(); auto booksCopy = booksCumulative; auto keysCopy = keysCumulative; boost::asio::post(ioc_, [=, &backend]() { diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index 899c910f..dfe937b1 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -70,30 +70,51 @@ class BackendIndexer std::thread ioThread_; uint32_t shift_ = 16; std::unordered_set keys; - std::unordered_map> - books; std::unordered_set keysCumulative; + std::unordered_map> + books; std::unordered_map> booksCumulative; + bool populatingCacheAsync = false; + // These are only used when the cache is being populated asynchronously + std::unordered_set deletedKeys; + std::unordered_map> + deletedBooks; + std::mutex mtx; + std::condition_variable cv_; + + void + addKeyAsync(ripple::uint256 const& key); + void + addBookOfferAsync( + ripple::uint256 const& book, + ripple::uint256 const& offerKey); public: BackendIndexer(boost::json::object const& config); ~BackendIndexer(); + void + populateCachesAsync( + BackendInterface const& backend, + std::optional sequence = {}); void populateCaches( BackendInterface const& backend, std::optional sequence = {}); void clearCaches(); + // Blocking, possibly for minutes + void + waitForCaches(); void addKey(ripple::uint256 const& key); void deleteKey(ripple::uint256 const& key); - void addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey); + void deleteBookOffer( ripple::uint256 const& book, @@ -253,7 +274,8 @@ public: // other database methods // Open the database. Set up all of the necessary objects and - // datastructures. After this call completes, the database is ready for use. + // datastructures. After this call completes, the database is ready for + // use. virtual void open(bool readOnly) = 0; diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 2860187b..54536140 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -289,73 +289,6 @@ CassandraBackend::fetchAllTransactionHashesInLedger( return hashes; } -LedgerPage -CassandraBackend::fetchLedgerPage2( - std::optional const& cursor, - std::uint32_t ledgerSequence, - std::uint32_t limit) const -{ - BOOST_LOG_TRIVIAL(trace) << __func__; - std::optional currentCursor = cursor; - std::vector objects; - uint32_t curLimit = limit; - while (objects.size() < limit) - { - CassandraStatement statement{selectLedgerPage_}; - - int64_t intCursor = INT64_MIN; - if (currentCursor) - { - auto token = getToken(currentCursor->data()); - if (token) - intCursor = *token; - } - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - cursor = " << std::to_string(intCursor) - << " , sequence = " << std::to_string(ledgerSequence) - << ", - limit = " << std::to_string(limit); - statement.bindInt(intCursor); - statement.bindInt(ledgerSequence); - statement.bindUInt(curLimit); - - CassandraResult result = executeSyncRead(statement); - - if (!!result) - { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - got keys - size = " << result.numRows(); - - size_t prevSize = objects.size(); - do - { - std::vector object = result.getBytes(); - if (object.size()) - { - objects.push_back({result.getUInt256(), std::move(object)}); - } - } while (result.nextRow()); - size_t prevBatchSize = objects.size() - prevSize; - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - added to objects. size = " << objects.size(); - if (result.numRows() < curLimit) - { - currentCursor = {}; - break; - } - if (objects.size() < limit) - { - curLimit = 2048; - } - assert(objects.size()); - currentCursor = objects[objects.size() - 1].key; - } - } - if (objects.size()) - return {objects, currentCursor}; - - return {{}, {}}; -} - struct ReadDiffCallbackData { CassandraBackend const& backend; @@ -480,12 +413,12 @@ CassandraBackend::fetchLedgerPage( if (!index) return {}; LedgerPage page; - if (cursor) - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - Cursor = " << ripple::strHex(*cursor); BOOST_LOG_TRIVIAL(debug) << __func__ << " ledgerSequence = " << std::to_string(ledgerSequence) << " index = " << std::to_string(*index); + if (cursor) + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - Cursor = " << ripple::strHex(*cursor); CassandraStatement statement{selectKeys_}; statement.bindInt(*index); if (cursor) @@ -497,10 +430,9 @@ CassandraBackend::fetchLedgerPage( } statement.bindUInt(limit); CassandraResult result = executeSyncRead(statement); - BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Got keys"; if (!!result) { - BOOST_LOG_TRIVIAL(debug) + BOOST_LOG_TRIVIAL(trace) << __func__ << " - got keys - size = " << result.numRows(); std::vector keys; @@ -508,17 +440,14 @@ CassandraBackend::fetchLedgerPage( { keys.push_back(result.getUInt256()); } while (result.nextRow()); - BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Read keys"; auto objects = fetchLedgerObjects(keys, ledgerSequence); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Using base ledger. Got objects"; if (objects.size() != keys.size()) throw std::runtime_error("Mismatch in size of objects and keys"); if (keys.size() == limit) page.cursor = keys[keys.size() - 1]; if (cursor) - BOOST_LOG_TRIVIAL(debug) + BOOST_LOG_TRIVIAL(trace) << __func__ << " Cursor = " << ripple::strHex(*page.cursor); for (size_t i = 0; i < objects.size(); ++i) @@ -530,7 +459,7 @@ CassandraBackend::fetchLedgerPage( page.objects.push_back({std::move(key), std::move(obj)}); } } - if (keys.size() && keys[0].isZero()) + if (keys.size() && !cursor && !keys[0].isZero()) page.warning = "Data may be incomplete"; return page; } diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 3b89f475..e5732a2b 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -945,11 +945,6 @@ public: return {{result.getBytes(), result.getBytes(), result.getUInt32()}}; } LedgerPage - fetchLedgerPage2( - std::optional const& cursor, - std::uint32_t ledgerSequence, - std::uint32_t limit) const; - LedgerPage fetchLedgerPage( std::optional const& cursor, std::uint32_t ledgerSequence, diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 5b10a274..969f6572 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -362,7 +362,7 @@ PostgresBackend::fetchLedgerPage( results.push_back({keys[i], objs[i]}); } } - if (keys[0].isZero()) + if (!cursor && !keys[0].isZero()) return {results, returnCursor, "Data may be incomplete"}; return {results, returnCursor}; } diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 270d8006..5a15a581 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -367,7 +367,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) } BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Populating caches"; - flatMapBackend_->getIndexer().populateCaches(*flatMapBackend_); + + flatMapBackend_->getIndexer().populateCachesAsync(*flatMapBackend_); BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Populated caches"; From 736e0a675fd29488091ba94d17017ce5912ff278 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Mon, 3 May 2021 18:54:32 +0000 Subject: [PATCH 4/5] small fixes in CassandraBackend --- reporting/CassandraBackend.cpp | 26 ++++++------------ reporting/CassandraBackend.h | 48 ---------------------------------- 2 files changed, 8 insertions(+), 66 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 54536140..64c63e96 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -36,7 +36,7 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) std::make_shared( backend.getIOContext(), std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &func]( + timer->async_wait([timer, &requestParams, func]( const boost::system::error_code& error) { func(requestParams, true); }); @@ -66,16 +66,6 @@ flatMapWriteCallback(CassFuture* fut, void* cbData) processAsyncWriteResponse(requestParams, fut, func); } -void -flatMapWriteBookCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeBook(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} /* void @@ -564,7 +554,7 @@ struct WriteBookCallbackData ripple::uint256 offerKey; uint32_t ledgerSequence; std::condition_variable& cv; - std::atomic_uint32_t& numRemaining; + std::atomic_uint32_t& numOutstanding; std::mutex& mtx; uint32_t currentRetries = 0; WriteBookCallbackData( @@ -574,14 +564,14 @@ struct WriteBookCallbackData uint32_t ledgerSequence, std::condition_variable& cv, std::mutex& mtx, - std::atomic_uint32_t& numRemaining) + std::atomic_uint32_t& numOutstanding) : backend(backend) , book(book) , offerKey(offerKey) , ledgerSequence(ledgerSequence) , cv(cv) , mtx(mtx) - , numRemaining(numRemaining) + , numOutstanding(numOutstanding) { } @@ -589,7 +579,7 @@ struct WriteBookCallbackData void writeBookCallback(CassFuture* fut, void* cbData); void -writeBook2(WriteBookCallbackData& cb) +writeBook(WriteBookCallbackData& cb) { CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()}; statement.bindBytes(cb.book); @@ -622,7 +612,7 @@ writeBookCallback(CassFuture* fut, void* cbData) std::chrono::steady_clock::now() + wait); timer->async_wait( [timer, &requestParams](const boost::system::error_code& error) { - writeBook2(requestParams); + writeBook(requestParams); }); } else @@ -630,7 +620,7 @@ writeBookCallback(CassFuture* fut, void* cbData) BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book"; { std::lock_guard lck(requestParams.mtx); - --requestParams.numRemaining; + --requestParams.numOutstanding; requestParams.cv.notify_one(); } } @@ -789,7 +779,7 @@ CassandraBackend::writeBooks( cv, mtx, numOutstanding)); - writeBook2(*cbs.back()); + writeBook(*cbs.back()); BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; std::unique_lock lck(mtx); BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index e5732a2b..d4bc54d1 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -1220,54 +1220,6 @@ public: } } - /* - void - writeDeletedKey(WriteCallbackData& data, bool isRetry) const - { - CassandraStatement statement{insertKey_}; - statement.bindBytes(data.key); - statement.bindInt(data.createdSequence); - statement.bindInt(data.sequence); - executeAsyncWrite(statement, flatMapWriteKeyCallback, data, isRetry); - } - - void - writeKey(WriteCallbackData& data, bool isRetry) const - { - if (data.isCreated) - { - CassandraStatement statement{insertKey_}; - statement.bindBytes(data.key); - statement.bindInt(data.sequence); - statement.bindInt(INT64_MAX); - - executeAsyncWrite( - statement, flatMapWriteKeyCallback, data, isRetry); - } - else if (data.isDeleted) - { - CassandraStatement statement{getCreated_}; - - executeAsyncWrite( - statement, flatMapGetCreatedCallback, data, isRetry); - } - } - */ - - void - writeBook(WriteCallbackData& data, bool isRetry) const - { - assert(data.isCreated or data.isDeleted); - assert(data.book); - CassandraStatement statement{ - (data.isCreated ? insertBook_ : deleteBook_)}; - statement.bindBytes(*data.book); - statement.bindBytes(data.key); - statement.bindInt(data.sequence); - if (data.isCreated) - statement.bindInt(INT64_MAX); - executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry); - } void doWriteLedgerObject( std::string&& key, From 20b8059151567e0521100cfa106f97cc02b4deab Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 5 May 2021 20:22:57 +0000 Subject: [PATCH 5/5] fix up some issues with async indexer --- reporting/BackendIndexer.cpp | 145 +++++++++++++++++++++++---------- reporting/BackendInterface.h | 66 +++++++++++---- reporting/CassandraBackend.cpp | 15 +++- reporting/CassandraBackend.h | 36 +++++++- reporting/ETLHelpers.h | 14 ++++ reporting/Pg.cpp | 5 +- reporting/ReportingETL.cpp | 41 ++++++---- test.py | 4 +- 8 files changed, 241 insertions(+), 85 deletions(-) diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index aa364b31..a8282be0 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -64,6 +64,66 @@ BackendIndexer::deleteBookOffer( deletedBooks[book].insert(offerKey); } +void +writeFlagLedger( + uint32_t ledgerSequence, + uint32_t shift, + BackendInterface const& backend, + std::unordered_set const& keys, + std::unordered_map< + ripple::uint256, + std::unordered_set> const& books) + +{ + uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift)); + ripple::uint256 zero = {}; + BOOST_LOG_TRIVIAL(info) + << __func__ + << " starting. ledgerSequence = " << std::to_string(ledgerSequence) + << " nextFlag = " << std::to_string(nextFlag) + << " keys.size() = " << std::to_string(keys.size()) + << " books.size() = " << std::to_string(books.size()); + while (true) + { + try + { + auto [objects, curCursor, warning] = + backend.fetchLedgerPage({}, nextFlag, 1); + if (!(warning || objects.size() == 0)) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " flag ledger already written. sequence = " + << std::to_string(ledgerSequence) + << " next flag = " << std::to_string(nextFlag) + << "returning"; + return; + } + break; + } + catch (DatabaseTimeout& t) + { + ; + } + } + auto start = std::chrono::system_clock::now(); + backend.writeBooks(books, nextFlag); + backend.writeBooks({{zero, {zero}}}, nextFlag); + + BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote books. writing keys ..."; + + backend.writeKeys(keys, nextFlag); + backend.writeKeys({zero}, nextFlag); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. ledgerSequence = " << std::to_string(ledgerSequence) + << " nextFlag = " << std::to_string(nextFlag) + << " keys.size() = " << std::to_string(keys.size()) + << " books.size() = " << std::to_string(books.size()) << " time = " + << std::chrono::duration_cast(end - start) + .count(); +} + void BackendIndexer::clearCaches() { @@ -77,9 +137,12 @@ BackendIndexer::populateCaches( std::optional sequence) { if (!sequence) - sequence = backend.fetchLatestLedgerSequence(); - if (!sequence) - return; + { + auto rng = backend.fetchLedgerRangeNoThrow(); + if (!rng) + return; + sequence = rng->maxSequence; + } BOOST_LOG_TRIVIAL(info) << __func__ << " sequence = " << std::to_string(*sequence); std::optional cursor; @@ -95,9 +158,9 @@ BackendIndexer::populateCaches( << __func__ << " performing index repair"; uint32_t lower = (*sequence - 1) >> shift_ << shift_; populateCaches(backend, lower); - writeNext(lower, backend); + writeFlagLedger( + lower, shift_, backend, keysCumulative, booksCumulative); clearCaches(); - continue; } BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; @@ -120,8 +183,8 @@ BackendIndexer::populateCaches( std::this_thread::sleep_for(std::chrono::seconds(2)); } } - // Do reconcilation. Remove anything from keys or books that shouldn't be - // there + // Do reconcilation. Remove anything from keys or books that shouldn't + // be there { std::unique_lock lck(mtx); populatingCacheAsync = false; @@ -163,7 +226,8 @@ BackendIndexer::populateCachesAsync( std::unique_lock lck(mtx); populatingCacheAsync = true; } - BOOST_LOG_TRIVIAL(info) << __func__; + BOOST_LOG_TRIVIAL(info) + << __func__ << " seq = " << (sequence ? std::to_string(*sequence) : ""); boost::asio::post(ioc_, [this, sequence, &backend]() { populateCaches(backend, sequence); }); @@ -179,56 +243,53 @@ BackendIndexer::waitForCaches() } void -BackendIndexer::writeNext( +BackendIndexer::writeFlagLedgerAsync( uint32_t ledgerSequence, BackendInterface const& backend) { BOOST_LOG_TRIVIAL(info) << __func__ << " starting. sequence = " << std::to_string(ledgerSequence); - bool isFlag = (ledgerSequence % (1 << shift_)) == 0; - if (!backend.fetchLedgerRange()) - { - isFlag = true; - } - if (isFlag) - { - waitForCaches(); - auto booksCopy = booksCumulative; - auto keysCopy = keysCumulative; - boost::asio::post(ioc_, [=, &backend]() { - uint32_t nextSeq = - ((ledgerSequence >> shift_ << shift_) + (1 << shift_)); - ripple::uint256 zero = {}; - BOOST_LOG_TRIVIAL(info) << __func__ << " booksCumulative.size() = " - << std::to_string(booksCumulative.size()); - backend.writeBooks(booksCopy, nextSeq); - backend.writeBooks({{zero, {zero}}}, nextSeq); - BOOST_LOG_TRIVIAL(info) << __func__ << " wrote books"; - BOOST_LOG_TRIVIAL(info) << __func__ << " keysCumulative.size() = " - << std::to_string(keysCumulative.size()); - backend.writeKeys(keysCopy, nextSeq); - backend.writeKeys({zero}, nextSeq); - BOOST_LOG_TRIVIAL(info) << __func__ << " wrote keys"; - }); - } + waitForCaches(); + auto booksCopy = booksCumulative; + auto keysCopy = keysCumulative; + boost::asio::post(ioc_, [=, this, &backend]() { + writeFlagLedger(ledgerSequence, shift_, backend, keysCopy, booksCopy); + }); + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. sequence = " << std::to_string(ledgerSequence); } void BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) { - bool isFlag = ledgerSequence % (1 << shift_) == 0; - if (!backend.fetchLedgerRange()) + BOOST_LOG_TRIVIAL(info) + << __func__ + << " starting. sequence = " << std::to_string(ledgerSequence); + bool isFirst = false; + uint32_t index = getIndexOfSeq(ledgerSequence); + auto rng = backend.fetchLedgerRangeNoThrow(); + if (!rng || rng->minSequence == ledgerSequence) { - isFlag = true; + isFirst = true; + index = ledgerSequence; + } + backend.writeKeys(keys, index); + backend.writeBooks(books, index); + if (isFirst) + { + ripple::uint256 zero = {}; + backend.writeBooks({{zero, {zero}}}, ledgerSequence); + backend.writeKeys({zero}, ledgerSequence); + writeFlagLedgerAsync(ledgerSequence, backend); } - uint32_t nextSeq = ((ledgerSequence >> shift_ << shift_) + (1 << shift_)); - uint32_t curSeq = isFlag ? ledgerSequence : nextSeq; - backend.writeKeys(keys, curSeq); keys = {}; - backend.writeBooks(books, curSeq); books = {}; + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. sequence = " << std::to_string(ledgerSequence); } // namespace Backend } // namespace Backend diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index dfe937b1..b7743b1b 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -123,12 +123,27 @@ public: void finish(uint32_t ledgerSequence, BackendInterface const& backend); void - writeNext(uint32_t ledgerSequence, BackendInterface const& backend); + writeFlagLedgerAsync( + uint32_t ledgerSequence, + BackendInterface const& backend); uint32_t getShift() { return shift_; } + uint32_t + getIndexOfSeq(uint32_t seq) const + { + if (isFlagLedger(seq)) + return seq; + auto incr = (1 << shift_); + return (seq >> shift_ << shift_) + incr; + } + bool + isFlagLedger(uint32_t ledgerSequence) const + { + return (ledgerSequence % (1 << shift_)) == 0; + } }; class BackendInterface @@ -151,15 +166,27 @@ public: std::optional getIndexOfSeq(uint32_t seq) const { - if (!fetchLedgerRange()) + if (indexer_.isFlagLedger(seq)) + return seq; + auto rng = fetchLedgerRange(); + if (!rng) return {}; - if (fetchLedgerRange()->minSequence == seq) + if (rng->minSequence == seq) return seq; - uint32_t shift = indexer_.getShift(); - uint32_t incr = (1 << shift); - if ((seq % incr) == 0) - return seq; - return (seq >> shift << shift) + incr; + return indexer_.getIndexOfSeq(seq); + } + + bool + finishWrites(uint32_t ledgerSequence) const + { + indexer_.finish(ledgerSequence, *this); + auto commitRes = doFinishWrites(); + if (commitRes) + { + if (indexer_.isFlagLedger(ledgerSequence)) + indexer_.writeFlagLedgerAsync(ledgerSequence, *this); + } + return commitRes; } virtual std::optional @@ -171,6 +198,22 @@ public: virtual std::optional fetchLedgerRange() const = 0; + std::optional + fetchLedgerRangeNoThrow() const + { + while (true) + { + try + { + return fetchLedgerRange(); + } + catch (DatabaseTimeout& t) + { + ; + } + } + } + virtual std::optional fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0; @@ -286,13 +329,6 @@ public: virtual void startWrites() const = 0; - bool - finishWrites(uint32_t ledgerSequence) const - { - indexer_.finish(ledgerSequence, *this); - indexer_.writeNext(ledgerSequence, *this); - return doFinishWrites(); - } virtual bool doFinishWrites() const = 0; diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 64c63e96..3e810ed4 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -708,13 +708,15 @@ CassandraBackend::writeKeys( { BOOST_LOG_TRIVIAL(info) << __func__ << " Ledger = " << std::to_string(ledgerSequence) - << " . num keys = " << std::to_string(keys.size()); + << " . num keys = " << std::to_string(keys.size()) + << " . concurrentLimit = " + << std::to_string(indexerMaxRequestsOutstanding); std::atomic_uint32_t numRemaining = keys.size(); std::condition_variable cv; std::mutex mtx; std::vector> cbs; cbs.reserve(keys.size()); - uint32_t concurrentLimit = maxRequestsOutstanding; + uint32_t concurrentLimit = indexerMaxRequestsOutstanding; uint32_t numSubmitted = 0; for (auto& key : keys) { @@ -761,7 +763,7 @@ CassandraBackend::writeBooks( std::condition_variable cv; std::mutex mtx; std::vector> cbs; - uint32_t concurrentLimit = maxRequestsOutstanding / 2; + uint32_t concurrentLimit = indexerMaxRequestsOutstanding; std::atomic_uint32_t numOutstanding = 0; size_t count = 0; auto start = std::chrono::system_clock::now(); @@ -1429,7 +1431,7 @@ CassandraBackend::open(bool readOnly) query = {}; query << "SELECT key FROM " << tablePrefix << "keys" - << " WHERE sequence = ? AND key > ? ORDER BY key ASC LIMIT ?"; + << " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?"; if (!selectKeys_.prepareStatement(query, session_.get())) continue; @@ -1613,6 +1615,11 @@ CassandraBackend::open(bool readOnly) { maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64(); } + if (config_.contains("indexer_max_requests_outstanding")) + { + indexerMaxRequestsOutstanding = + config_["indexer_max_requests_outstanding"].as_int64(); + } /* if (config_.contains("run_indexer")) { diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index d4bc54d1..7913fe6e 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -634,6 +634,9 @@ private: // maximum number of concurrent in flight requests. New requests will wait // for earlier requests to finish if this limit is exceeded uint32_t maxRequestsOutstanding = 10000; + // we keep this small because the indexer runs in the background, and we + // don't want the database to be swamped when the indexer is running + uint32_t indexerMaxRequestsOutstanding = 10; mutable std::atomic_uint32_t numRequestsOutstanding_ = 0; // mutex and condition_variable to limit the number of concurrent in flight @@ -798,6 +801,14 @@ public: { // wait for all other writes to finish sync(); + auto rng = fetchLedgerRangeNoThrow(); + if (rng && rng->maxSequence >= ledgerSequence_) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Ledger " << std::to_string(ledgerSequence_) + << " already written. Returning"; + return false; + } // write range if (isFirstLedger_) { @@ -811,7 +822,16 @@ public: statement.bindInt(ledgerSequence_); statement.bindBoolean(true); statement.bindInt(ledgerSequence_ - 1); - return executeSyncUpdate(statement); + if (!executeSyncUpdate(statement)) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Update failed for ledger " + << std::to_string(ledgerSequence_) << ". Returning"; + return false; + } + BOOST_LOG_TRIVIAL(debug) << __func__ << " Committed ledger " + << std::to_string(ledgerSequence_); + return true; } void writeLedger( @@ -1495,6 +1515,7 @@ public: bool executeSyncUpdate(CassandraStatement const& statement) const { + bool timedOut = false; CassFuture* fut; CassError rc; do @@ -1503,8 +1524,9 @@ public: rc = cass_future_error_code(fut); if (rc != CASS_OK) { + timedOut = true; std::stringstream ss; - ss << "Cassandra sync write error"; + ss << "Cassandra sync update error"; ss << ", retrying"; ss << ": " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(warning) << ss.str(); @@ -1532,7 +1554,15 @@ public: return false; } cass_result_free(res); - return success == cass_true; + if (success != cass_true && timedOut) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Update failed, but timedOut is true"; + } + // if there was a timeout, the update may have succeeded in the + // background. We can't differentiate between an async success and + // another writer, so we just return true here + return success == cass_true || timedOut; } CassandraResult diff --git a/reporting/ETLHelpers.h b/reporting/ETLHelpers.h index 1f225a86..e6cc8afb 100644 --- a/reporting/ETLHelpers.h +++ b/reporting/ETLHelpers.h @@ -156,6 +156,20 @@ public: cv_.notify_all(); return ret; } + /// @return element popped from queue. Will block until queue is non-empty + std::optional + tryPop() + { + std::unique_lock lck(m_); + if (queue_.empty()) + return {}; + T ret = std::move(queue_.front()); + queue_.pop(); + // if queue has a max size, unblock any possible pushers + if (maxSize_) + cv_.notify_all(); + return ret; + } }; /// Parititions the uint256 keyspace into numMarkers partitions, each of equal diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index 77d38927..24dfc642 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -250,6 +250,7 @@ Pg::bulkInsert(char const* table, std::string const& records) << ". Postgres insert error: " << res.msg(); if (res) ss << ". Query status not PGRES_COPY_IN: " << res.status(); + BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; throw std::runtime_error(ss.str()); } @@ -259,6 +260,7 @@ Pg::bulkInsert(char const* table, std::string const& records) ss << "bulkInsert to " << table << ". PQputCopyData error: " << PQerrorMessage(conn_.get()); disconnect(); + BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; throw std::runtime_error(ss.str()); } @@ -268,6 +270,7 @@ Pg::bulkInsert(char const* table, std::string const& records) ss << "bulkInsert to " << table << ". PQputCopyEnd error: " << PQerrorMessage(conn_.get()); disconnect(); + BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; throw std::runtime_error(ss.str()); } @@ -282,7 +285,7 @@ Pg::bulkInsert(char const* table, std::string const& records) ss << "bulkInsert to " << table << ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status; disconnect(); - BOOST_LOG_TRIVIAL(debug) << __func__ << " " << records; + BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; throw std::runtime_error(ss.str()); } } diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 5a15a581..3306c2fd 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -89,8 +89,8 @@ std::optional ReportingETL::loadInitialLedger(uint32_t startingSequence) { // check that database is actually empty - auto ledger = flatMapBackend_->fetchLedgerBySequence(startingSequence); - if (ledger) + auto rng = flatMapBackend_->fetchLedgerRangeNoThrow(); + if (rng) { BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " << "Database is not empty"; @@ -156,7 +156,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) { try { - auto range = flatMapBackend_->fetchLedgerRange(); + auto range = flatMapBackend_->fetchLedgerRangeNoThrow(); if (!range || range->maxSequence < ledgerSequence) { @@ -359,8 +359,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) << "Starting etl pipeline"; writing_ = true; - auto parent = flatMapBackend_->fetchLedgerBySequence(startSequence - 1); - if (!parent) + auto rng = flatMapBackend_->fetchLedgerRangeNoThrow(); + if (!rng || rng->maxSequence != startSequence - 1) { assert(false); throw std::runtime_error("runETLPipeline: parent ledger is null"); @@ -385,19 +385,19 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) std::cout << std::to_string((sequence - startSequence) % numExtractors); return queues[(sequence - startSequence) % numExtractors]; }; - std::vector threads; + std::vector extractors; for (size_t i = 0; i < numExtractors; ++i) { auto transformQueue = std::make_shared(maxQueueSize); queues.push_back(transformQueue); std::cout << "added to queues"; - threads.emplace_back([this, - &startSequence, - &writeConflict, - transformQueue, - i, - numExtractors]() { + extractors.emplace_back([this, + &startSequence, + &writeConflict, + transformQueue, + i, + numExtractors]() { beast::setCurrentThreadName("rippled: ReportingETL extract"); uint32_t currentSequence = startSequence + i; @@ -503,7 +503,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) lastPublishedSequence = lgrInfo.seq; } writeConflict = !success; - auto range = flatMapBackend_->fetchLedgerRange(); + auto range = flatMapBackend_->fetchLedgerRangeNoThrow(); if (onlineDeleteInterval_ && !deleting_ && range->maxSequence - range->minSequence > *onlineDeleteInterval_) @@ -520,10 +520,15 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) } }}; - // wait for all of the threads to stop - for (auto& t : threads) - t.join(); transformer.join(); + for (size_t i = 0; i < numExtractors; ++i) + { + // pop from each queue that might be blocked on a push + getNext(i)->tryPop(); + } + // wait for all of the extractors to stop + for (auto& t : extractors) + t.join(); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(debug) << "Extracted and wrote " << *lastPublishedSequence - startSequence @@ -598,8 +603,8 @@ ReportingETL::monitor() { if (startSequence_) { - throw std::runtime_error( - "start sequence specified but db is already populated"); + BOOST_LOG_TRIVIAL(warning) + << "start sequence specified but db is already populated"; } BOOST_LOG_TRIVIAL(info) << __func__ << " : " diff --git a/test.py b/test.py index a71640cf..7c5aa898 100755 --- a/test.py +++ b/test.py @@ -510,7 +510,7 @@ async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1): print(json.dumps(x)) blobs.append(x) keys.append(x["index"]) - if limit != -1 and len(keys) > count: + if count != -1 and len(keys) > count: print("stopping early") print(len(keys)) print("done") @@ -598,7 +598,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, req["cursor"] = cursor await ws.send(json.dumps(req)) res = json.loads(await ws.recv()) - print(json.dumps(res,indent=4,sort_keys=True)) + #print(json.dumps(res,indent=4,sort_keys=True)) if "result" in res: res = res["result"] for x in res["offers"]: