diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index 0320d973..1cf19ce4 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -9,6 +9,7 @@ BackendIndexer::BackendIndexer(boost::json::object const& config) bookShift_ = config.at("indexer_book_shift").as_int64(); work_.emplace(ioc_); ioThread_ = std::thread{[this]() { ioc_.run(); }}; + updateThread_ = std::thread{[this]() { ioc_.run(); }}; }; BackendIndexer::~BackendIndexer() { @@ -16,6 +17,39 @@ BackendIndexer::~BackendIndexer() work_.reset(); ioThread_.join(); } +void +BackendIndexer::writeLedgerObject( + ripple::uint256&& key, + std::optional&& book, + bool isCreated, + bool isDeleted) +{ + ++updatesOutstanding_; + boost::asio::post( + ioc_, + [this, + key = std::move(key), + isCreated, + isDeleted, + book = std::move(book)]() { + if (isCreated) + addKey(key); + if (isDeleted) + deleteKey(key); + if (book) + { + if (isCreated) + addBookOffer(*book, key); + if (isDeleted) + deleteBookOffer(*book, key); + } + --updatesOutstanding_; + { + std::unique_lock lck(mtx); + updateCv_.notify_one(); + } + }); +} void BackendIndexer::addKey(ripple::uint256 const& key) @@ -360,7 +394,7 @@ BackendIndexer::populateCaches(BackendInterface const& backend) std::unique_lock lck(mtx); deletedKeys = {}; deletedBooks = {}; - cv_.notify_one(); + cacheCv_.notify_one(); } BOOST_LOG_TRIVIAL(info) << __func__ @@ -387,7 +421,7 @@ void BackendIndexer::waitForCaches() { std::unique_lock lck(mtx); - cv_.wait(lck, [this]() { + cacheCv_.wait(lck, [this]() { return !populatingCacheAsync && deletedKeys.size() == 0; }); } @@ -449,6 +483,11 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) bookIndex = BookIndex{ledgerSequence}; } } + { + std::unique_lock lck(mtx); + updateCv_.wait(lck, [this]() { return updatesOutstanding_ == 0; }); + } + backend.writeKeys(keys, keyIndex); backend.writeBooks(books, bookIndex); if (isFirst_) diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index b6451445..524b7d44 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -81,6 +81,10 @@ class BackendIndexer std::mutex mutex_; std::optional work_; std::thread ioThread_; + std::thread updateThread_; + std::atomic_uint32_t updatesOutstanding_ = 0; + std::condition_variable updateCv_; + uint32_t keyShift_ = 20; uint32_t bookShift_ = 10; std::unordered_set keys; @@ -98,7 +102,7 @@ class BackendIndexer std::unordered_map> booksRepair; std::mutex mtx; - std::condition_variable cv_; + std::condition_variable cacheCv_; mutable bool isFirst_ = true; @@ -123,6 +127,13 @@ public: void waitForCaches(); + void + writeLedgerObject( + ripple::uint256&& key, + std::optional&& book, + bool isCreated, + bool isDeleted); + void addKey(ripple::uint256 const& key); void @@ -350,17 +361,8 @@ public: std::optional&& book) const { ripple::uint256 key256 = ripple::uint256::fromVoid(key.data()); - if (isCreated) - indexer_.addKey(key256); - if (isDeleted) - indexer_.deleteKey(key256); - if (book) - { - if (isCreated) - indexer_.addBookOffer(*book, key256); - if (isDeleted) - indexer_.deleteBookOffer(*book, key256); - } + indexer_.writeLedgerObject( + std::move(key256), std::move(book), isCreated, isDeleted); doWriteLedgerObject( std::move(key), seq, diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index b447efc9..e83b3351 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -1359,12 +1359,14 @@ CassandraBackend::open(bool readOnly) ? config_["threads"].as_int64() : std::thread::hardware_concurrency(); int ttl = config_.contains("ttl") ? config_["ttl"].as_int64() * 2 : 0; - int keysTtl, keysIncr = pow(2, indexer_.getKeyShift()) * 4 * 2; + int keysTtl, + keysIncr = ttl != 0 ? pow(2, indexer_.getKeyShift()) * 4 * 2 : 0; while (keysTtl < ttl) { keysTtl += keysIncr; } - int booksTtl, booksIncr = pow(2, indexer_.getBookShift()) * 4 * 2; + int booksTtl, + booksIncr = ttl != 0 ? pow(2, indexer_.getBookShift()) * 4 * 2 : 0; while (booksTtl < ttl) { booksTtl += booksIncr; diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index dcbb0e45..799ae715 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -818,29 +818,28 @@ PostgresBackend::writeKeys( KeyIndex const& index, bool isAsync) const { - return true; - if (isAsync) - return true; if (abortWrite_) return false; BOOST_LOG_TRIVIAL(debug) << __func__; PgQuery pgQuery(pgPool_); PgQuery& conn = isAsync ? pgQuery : writeConnection_; + std::stringstream asyncBuffer; + std::stringstream& buffer = isAsync ? asyncBuffer : keysBuffer_; if (isAsync) conn("BEGIN"); size_t numRows = 0; for (auto& key : keys) { - keysBuffer_ << std::to_string(index.keyIndex) << '\t' << "\\\\x" - << ripple::strHex(key) << '\n'; + buffer << std::to_string(index.keyIndex) << '\t' << "\\\\x" + << ripple::strHex(key) << '\n'; numRows++; // If the buffer gets too large, the insert fails. Not sure why. // When writing in the background, we insert after every 10000 rows if ((isAsync && numRows == 10000) || numRows == 100000) { - conn.bulkInsert("keys", keysBuffer_.str()); + conn.bulkInsert("keys", buffer.str()); std::stringstream temp; - keysBuffer_.swap(temp); + buffer.swap(temp); numRows = 0; if (isAsync) std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -858,15 +857,14 @@ PostgresBackend::writeBooks( BookIndex const& index, bool isAsync) const { - return true; - if (isAsync) - return true; if (abortWrite_) return false; BOOST_LOG_TRIVIAL(debug) << __func__; PgQuery pgQuery(pgPool_); PgQuery& conn = isAsync ? pgQuery : writeConnection_; + std::stringstream asyncBuffer; + std::stringstream& buffer = isAsync ? asyncBuffer : booksBuffer_; if (isAsync) conn("BEGIN"); size_t numRows = 0; @@ -874,17 +872,17 @@ PostgresBackend::writeBooks( { for (auto& offer : book.second) { - booksBuffer_ << std::to_string(index.bookIndex) << '\t' << "\\\\x" - << ripple::strHex(book.first) << '\t' << "\\\\x" - << ripple::strHex(offer) << '\n'; + buffer << std::to_string(index.bookIndex) << '\t' << "\\\\x" + << ripple::strHex(book.first) << '\t' << "\\\\x" + << ripple::strHex(offer) << '\n'; numRows++; // If the buffer gets too large, the insert fails. Not sure why. // When writing in the background, we insert after every 10 rows if ((isAsync && numRows == 1000) || numRows == 100000) { - conn.bulkInsert("books", booksBuffer_.str()); + conn.bulkInsert("books", buffer.str()); std::stringstream temp; - booksBuffer_.swap(temp); + buffer.swap(temp); numRows = 0; if (isAsync) std::this_thread::sleep_for(std::chrono::seconds(1)); diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index cdcb6ee4..023aaed7 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -258,13 +258,6 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) lgrInfo, std::move(*rawData.mutable_ledger_header())); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "wrote ledger header"; - std::vector accountTxData{ - insertTransactions(lgrInfo, rawData)}; - - BOOST_LOG_TRIVIAL(debug) - << __func__ << " : " - << "Inserted all transactions. Number of transactions = " - << rawData.transactions_list().transactions_size(); for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) { @@ -301,6 +294,13 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) << __func__ << " : " << "wrote objects. num objects = " << std::to_string(rawData.ledger_objects().objects_size()); + std::vector accountTxData{ + insertTransactions(lgrInfo, rawData)}; + + BOOST_LOG_TRIVIAL(debug) + << __func__ << " : " + << "Inserted all transactions. Number of transactions = " + << rawData.transactions_list().transactions_size(); flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "wrote account_tx";