diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index a8282be0..4a294ade 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -2,8 +2,11 @@ namespace Backend { BackendIndexer::BackendIndexer(boost::json::object const& config) - : shift_(config.at("indexer_shift").as_int64()) { + if (config.contains("indexer_key_shift")) + keyShift_ = config.at("indexer_key_shift").as_int64(); + if (config.contains("indexer_book_shift")) + bookShift_ = config.at("indexer_book_shift").as_int64(); work_.emplace(ioc_); ioThread_ = std::thread{[this]() { ioc_.run(); }}; }; @@ -65,15 +68,11 @@ BackendIndexer::deleteBookOffer( } void -writeFlagLedger( +writeKeyFlagLedger( uint32_t ledgerSequence, uint32_t shift, BackendInterface const& backend, - std::unordered_set const& keys, - std::unordered_map< - ripple::uint256, - std::unordered_set> const& books) - + std::unordered_set const& keys) { uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift)); ripple::uint256 zero = {}; @@ -81,15 +80,14 @@ writeFlagLedger( << __func__ << " starting. ledgerSequence = " << std::to_string(ledgerSequence) << " nextFlag = " << std::to_string(nextFlag) - << " keys.size() = " << std::to_string(keys.size()) - << " books.size() = " << std::to_string(books.size()); + << " keys.size() = " << std::to_string(keys.size()); while (true) { try { auto [objects, curCursor, warning] = backend.fetchLedgerPage({}, nextFlag, 1); - if (!(warning || objects.size() == 0)) + if (!warning) { BOOST_LOG_TRIVIAL(warning) << __func__ << " flag ledger already written. sequence = " @@ -106,19 +104,66 @@ writeFlagLedger( } } auto start = std::chrono::system_clock::now(); - backend.writeBooks(books, nextFlag); - backend.writeBooks({{zero, {zero}}}, nextFlag); - BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote books. writing keys ..."; - - backend.writeKeys(keys, nextFlag); - backend.writeKeys({zero}, nextFlag); + backend.writeKeys(keys, nextFlag, true); + backend.writeKeys({zero}, nextFlag, true); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) << __func__ << " finished. ledgerSequence = " << std::to_string(ledgerSequence) << " nextFlag = " << std::to_string(nextFlag) << " keys.size() = " << std::to_string(keys.size()) + << std::chrono::duration_cast(end - start) + .count(); +} +void +writeBookFlagLedger( + uint32_t ledgerSequence, + uint32_t shift, + BackendInterface const& backend, + std::unordered_map< + ripple::uint256, + std::unordered_set> const& books) + +{ + uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift)); + ripple::uint256 zero = {}; + BOOST_LOG_TRIVIAL(info) + << __func__ + << " starting. ledgerSequence = " << std::to_string(ledgerSequence) + << " nextFlag = " << std::to_string(nextFlag) + << " books.size() = " << std::to_string(books.size()); + while (true) + { + try + { + auto [objects, curCursor, warning] = + backend.fetchBookOffers(zero, nextFlag, 1); + if (!warning) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " flag ledger already written. sequence = " + << std::to_string(ledgerSequence) + << " next flag = " << std::to_string(nextFlag) + << "returning"; + return; + } + break; + } + catch (DatabaseTimeout& t) + { + ; + } + } + auto start = std::chrono::system_clock::now(); + backend.writeBooks(books, nextFlag, true); + backend.writeBooks({{zero, {zero}}}, nextFlag, true); + + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. ledgerSequence = " << std::to_string(ledgerSequence) + << " nextFlag = " << std::to_string(nextFlag) << " books.size() = " << std::to_string(books.size()) << " time = " << std::chrono::duration_cast(end - start) .count(); @@ -132,7 +177,80 @@ BackendIndexer::clearCaches() } void -BackendIndexer::populateCaches( +BackendIndexer::doBooksRepair( + BackendInterface const& backend, + std::optional sequence) +{ + if (!sequence) + { + auto rng = backend.fetchLedgerRangeNoThrow(); + if (!rng) + return; + sequence = rng->maxSequence; + } + BOOST_LOG_TRIVIAL(info) + << __func__ << " sequence = " << std::to_string(*sequence); + ripple::uint256 zero = {}; + while (true) + { + try + { + auto [objects, cursor, warning] = + backend.fetchBookOffers(zero, *sequence, 1); + if (!warning) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " flag ledger already written. sequence = " + << std::to_string(*sequence) << "returning"; + return; + } + else + { + uint32_t lower = (*sequence - 1) >> bookShift_ << bookShift_; + doBooksRepair(backend, lower); + } + break; + } + catch (DatabaseTimeout& t) + { + ; + } + } + std::optional cursor; + while (true) + { + try + { + auto [objects, curCursor, warning] = + backend.fetchLedgerPage(cursor, *sequence, 2048); + + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + cursor = curCursor; + for (auto& obj : objects) + { + if (isOffer(obj.blob)) + { + auto book = getBook(obj.blob); + booksRepair[book].insert(obj.key); + } + } + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + writeBookFlagLedger(*sequence, bookShift_, backend, booksRepair); + booksRepair = {}; + BOOST_LOG_TRIVIAL(info) + << __func__ << " finished. sequence = " << std::to_string(*sequence); +} +void +BackendIndexer::doKeysRepair( BackendInterface const& backend, std::optional sequence) { @@ -152,16 +270,65 @@ BackendIndexer::populateCaches( { auto [objects, curCursor, warning] = backend.fetchLedgerPage(cursor, *sequence, 2048); - if (warning) + // no cursor means this is the first page + if (!cursor) { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " performing index repair"; - uint32_t lower = (*sequence - 1) >> shift_ << shift_; - populateCaches(backend, lower); - writeFlagLedger( - lower, shift_, backend, keysCumulative, booksCumulative); - clearCaches(); + // if there is no warning, we don't need to do a repair + // warning only shows up on the first page + if (!warning) + { + BOOST_LOG_TRIVIAL(info) + << __func__ + << " flag ledger already written. returning"; + return; + } + else + { + uint32_t lower = (*sequence - 1) >> keyShift_ << keyShift_; + doKeysRepair(backend, lower); + } } + + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + cursor = curCursor; + for (auto& obj : objects) + { + keysRepair.insert(obj.key); + } + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + writeKeyFlagLedger(*sequence, keyShift_, backend, keysRepair); + keysRepair = {}; + BOOST_LOG_TRIVIAL(info) + << __func__ << " finished. sequence = " << std::to_string(*sequence); +} + +void +BackendIndexer::populateCaches(BackendInterface const& backend) +{ + auto rng = backend.fetchLedgerRangeNoThrow(); + if (!rng) + return; + uint32_t sequence = rng->maxSequence; + BOOST_LOG_TRIVIAL(info) + << __func__ << " sequence = " << std::to_string(sequence); + doBooksRepair(backend, sequence); + doKeysRepair(backend, sequence); + std::optional cursor; + while (true) + { + try + { + auto [objects, curCursor, warning] = + backend.fetchLedgerPage(cursor, sequence, 2048); BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; for (auto& obj : objects) @@ -189,7 +356,6 @@ BackendIndexer::populateCaches( std::unique_lock lck(mtx); populatingCacheAsync = false; } - auto tip = backend.fetchLatestLedgerSequence(); for (auto& key : deletedKeys) { deleteKey(key); @@ -212,9 +378,7 @@ BackendIndexer::populateCaches( << " finished. keys.size() = " << std::to_string(keysCumulative.size()); } void -BackendIndexer::populateCachesAsync( - BackendInterface const& backend, - std::optional sequence) +BackendIndexer::populateCachesAsync(BackendInterface const& backend) { if (keysCumulative.size() > 0) { @@ -226,11 +390,8 @@ BackendIndexer::populateCachesAsync( std::unique_lock lck(mtx); populatingCacheAsync = true; } - BOOST_LOG_TRIVIAL(info) - << __func__ << " seq = " << (sequence ? std::to_string(*sequence) : ""); - boost::asio::post(ioc_, [this, sequence, &backend]() { - populateCaches(backend, sequence); - }); + BOOST_LOG_TRIVIAL(info) << __func__; + boost::asio::post(ioc_, [this, &backend]() { populateCaches(backend); }); } void @@ -243,7 +404,25 @@ BackendIndexer::waitForCaches() } void -BackendIndexer::writeFlagLedgerAsync( +BackendIndexer::writeKeyFlagLedgerAsync( + uint32_t ledgerSequence, + BackendInterface const& backend) +{ + BOOST_LOG_TRIVIAL(info) + << __func__ + << " starting. sequence = " << std::to_string(ledgerSequence); + + waitForCaches(); + auto keysCopy = keysCumulative; + boost::asio::post(ioc_, [=, this, &backend]() { + writeKeyFlagLedger(ledgerSequence, keyShift_, backend, keysCopy); + }); + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. sequence = " << std::to_string(ledgerSequence); +} +void +BackendIndexer::writeBookFlagLedgerAsync( uint32_t ledgerSequence, BackendInterface const& backend) { @@ -253,9 +432,8 @@ BackendIndexer::writeFlagLedgerAsync( waitForCaches(); auto booksCopy = booksCumulative; - auto keysCopy = keysCumulative; boost::asio::post(ioc_, [=, this, &backend]() { - writeFlagLedger(ledgerSequence, shift_, backend, keysCopy, booksCopy); + writeBookFlagLedger(ledgerSequence, bookShift_, backend, booksCopy); }); BOOST_LOG_TRIVIAL(info) << __func__ @@ -269,21 +447,23 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) << __func__ << " starting. sequence = " << std::to_string(ledgerSequence); bool isFirst = false; - uint32_t index = getIndexOfSeq(ledgerSequence); + uint32_t keyIndex = getKeyIndexOfSeq(ledgerSequence); + uint32_t bookIndex = getKeyIndexOfSeq(ledgerSequence); auto rng = backend.fetchLedgerRangeNoThrow(); if (!rng || rng->minSequence == ledgerSequence) { isFirst = true; - index = ledgerSequence; + keyIndex = bookIndex = ledgerSequence; } - backend.writeKeys(keys, index); - backend.writeBooks(books, index); + backend.writeKeys(keys, keyIndex); + backend.writeBooks(books, bookIndex); if (isFirst) { ripple::uint256 zero = {}; backend.writeBooks({{zero, {zero}}}, ledgerSequence); backend.writeKeys({zero}, ledgerSequence); - writeFlagLedgerAsync(ledgerSequence, backend); + writeBookFlagLedgerAsync(ledgerSequence, backend); + writeKeyFlagLedgerAsync(ledgerSequence, backend); } keys = {}; books = {}; diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index b7743b1b..0cc7c3ea 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -68,7 +68,8 @@ class BackendIndexer std::mutex mutex_; std::optional work_; std::thread ioThread_; - uint32_t shift_ = 16; + uint32_t keyShift_ = 20; + uint32_t bookShift_ = 10; std::unordered_set keys; std::unordered_set keysCumulative; std::unordered_map> @@ -80,6 +81,9 @@ class BackendIndexer std::unordered_set deletedKeys; std::unordered_map> deletedBooks; + std::unordered_set keysRepair; + std::unordered_map> + booksRepair; std::mutex mtx; std::condition_variable cv_; @@ -95,13 +99,9 @@ public: ~BackendIndexer(); void - populateCachesAsync( - BackendInterface const& backend, - std::optional sequence = {}); + populateCachesAsync(BackendInterface const& backend); void - populateCaches( - BackendInterface const& backend, - std::optional sequence = {}); + populateCaches(BackendInterface const& backend); void clearCaches(); // Blocking, possibly for minutes @@ -123,26 +123,56 @@ public: void finish(uint32_t ledgerSequence, BackendInterface const& backend); void - writeFlagLedgerAsync( + writeKeyFlagLedgerAsync( uint32_t ledgerSequence, BackendInterface const& backend); + void + writeBookFlagLedgerAsync( + uint32_t ledgerSequence, + BackendInterface const& backend); + void + doKeysRepair( + BackendInterface const& backend, + std::optional sequence); + void + doBooksRepair( + BackendInterface const& backend, + std::optional sequence); uint32_t - getShift() + getBookShift() { - return shift_; + return bookShift_; } uint32_t - getIndexOfSeq(uint32_t seq) const + getKeyShift() { - if (isFlagLedger(seq)) + return keyShift_; + } + uint32_t + getKeyIndexOfSeq(uint32_t seq) const + { + if (isKeyFlagLedger(seq)) return seq; - auto incr = (1 << shift_); - return (seq >> shift_ << shift_) + incr; + auto incr = (1 << keyShift_); + return (seq >> keyShift_ << keyShift_) + incr; } bool - isFlagLedger(uint32_t ledgerSequence) const + isKeyFlagLedger(uint32_t ledgerSequence) const { - return (ledgerSequence % (1 << shift_)) == 0; + return (ledgerSequence % (1 << keyShift_)) == 0; + } + uint32_t + getBookIndexOfSeq(uint32_t seq) const + { + if (isBookFlagLedger(seq)) + return seq; + auto incr = (1 << bookShift_); + return (seq >> bookShift_ << bookShift_) + incr; + } + bool + isBookFlagLedger(uint32_t ledgerSequence) const + { + return (ledgerSequence % (1 << bookShift_)) == 0; } }; @@ -164,16 +194,28 @@ public: } std::optional - getIndexOfSeq(uint32_t seq) const + getKeyIndexOfSeq(uint32_t seq) const { - if (indexer_.isFlagLedger(seq)) + if (indexer_.isKeyFlagLedger(seq)) return seq; auto rng = fetchLedgerRange(); if (!rng) return {}; if (rng->minSequence == seq) return seq; - return indexer_.getIndexOfSeq(seq); + return indexer_.getKeyIndexOfSeq(seq); + } + std::optional + getBookIndexOfSeq(uint32_t seq) const + { + if (indexer_.isBookFlagLedger(seq)) + return seq; + auto rng = fetchLedgerRange(); + if (!rng) + return {}; + if (rng->minSequence == seq) + return seq; + return indexer_.getBookIndexOfSeq(seq); } bool @@ -183,8 +225,10 @@ public: auto commitRes = doFinishWrites(); if (commitRes) { - if (indexer_.isFlagLedger(ledgerSequence)) - indexer_.writeFlagLedgerAsync(ledgerSequence, *this); + if (indexer_.isBookFlagLedger(ledgerSequence)) + indexer_.writeBookFlagLedgerAsync(ledgerSequence, *this); + if (indexer_.isKeyFlagLedger(ledgerSequence)) + indexer_.writeKeyFlagLedgerAsync(ledgerSequence, *this); } return commitRes; } @@ -337,13 +381,15 @@ public: virtual bool writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const = 0; + uint32_t ledgerSequence, + bool isAsync = false) const = 0; virtual bool writeBooks( std::unordered_map< ripple::uint256, std::unordered_set> const& books, - uint32_t ledgerSequence) const = 0; + uint32_t ledgerSequence, + bool isAsync = false) const = 0; virtual ~BackendInterface() { diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index e9a398e3..8ea5c330 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -399,7 +399,7 @@ CassandraBackend::fetchLedgerPage( std::uint32_t ledgerSequence, std::uint32_t limit) const { - auto index = getIndexOfSeq(ledgerSequence); + auto index = getKeyIndexOfSeq(ledgerSequence); if (!index) return {}; LedgerPage page; @@ -418,7 +418,7 @@ CassandraBackend::fetchLedgerPage( ripple::uint256 zero; statement.bindBytes(zero); } - statement.bindUInt(limit); + statement.bindUInt(limit + 1); CassandraResult result = executeSyncRead(statement); if (!!result) { @@ -430,11 +430,14 @@ CassandraBackend::fetchLedgerPage( { keys.push_back(result.getUInt256()); } while (result.nextRow()); + if (keys.size() && keys.size() == limit) + { + page.cursor = keys.back(); + keys.pop_back(); + } auto objects = fetchLedgerObjects(keys, ledgerSequence); if (objects.size() != keys.size()) throw std::runtime_error("Mismatch in size of objects and keys"); - if (keys.size() == limit) - page.cursor = keys[keys.size() - 1]; if (cursor) BOOST_LOG_TRIVIAL(trace) @@ -449,11 +452,13 @@ CassandraBackend::fetchLedgerPage( page.objects.push_back({std::move(key), std::move(obj)}); } } - if (keys.size() && !cursor && !keys[0].isZero()) + if (!cursor && (!keys.size() || !keys[0].isZero())) page.warning = "Data may be incomplete"; return page; } - return {{}, {}}; + if (!cursor) + return {{}, {}, "Data may be incomplete"}; + return {}; } std::vector CassandraBackend::fetchLedgerObjects( @@ -777,7 +782,8 @@ writeKeyCallback(CassFuture* fut, void* cbData) bool CassandraBackend::writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const + uint32_t ledgerSequence, + bool isAsync) const { BOOST_LOG_TRIVIAL(info) << __func__ << " Ledger = " << std::to_string(ledgerSequence) @@ -789,7 +795,8 @@ CassandraBackend::writeKeys( std::mutex mtx; std::vector> cbs; cbs.reserve(keys.size()); - uint32_t concurrentLimit = indexerMaxRequestsOutstanding; + uint32_t concurrentLimit = + isAsync ? indexerMaxRequestsOutstanding : keys.size(); uint32_t numSubmitted = 0; for (auto& key : keys) { @@ -828,7 +835,8 @@ CassandraBackend::writeBooks( std::unordered_map< ripple::uint256, std::unordered_set> const& books, - uint32_t ledgerSequence) const + uint32_t ledgerSequence, + bool isAsync) const { BOOST_LOG_TRIVIAL(info) << __func__ << " Ledger = " << std::to_string(ledgerSequence) @@ -836,7 +844,8 @@ CassandraBackend::writeBooks( std::condition_variable cv; std::mutex mtx; std::vector> cbs; - uint32_t concurrentLimit = indexerMaxRequestsOutstanding; + uint32_t concurrentLimit = + isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding; std::atomic_uint32_t numOutstanding = 0; size_t count = 0; auto start = std::chrono::system_clock::now(); diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 99b67fe3..ea40dbe5 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -1013,13 +1013,15 @@ public: bool writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const override; + uint32_t ledgerSequence, + bool isAsync = false) const; bool writeBooks( std::unordered_map< ripple::uint256, std::unordered_set> const& books, - uint32_t ledgerSequence) const override; + uint32_t ledgerSequence, + bool isAsync = false) const override; BookOffersPage fetchBookOffers( ripple::uint256 const& book, diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 70a0107b..75c9ea1e 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -329,7 +329,7 @@ PostgresBackend::fetchLedgerPage( std::uint32_t ledgerSequence, std::uint32_t limit) const { - auto index = getIndexOfSeq(ledgerSequence); + auto index = getKeyIndexOfSeq(ledgerSequence); if (!index) return {}; PgQuery pgQuery(pgPool_); @@ -337,7 +337,7 @@ PostgresBackend::fetchLedgerPage( std::stringstream sql; sql << "SELECT key FROM keys WHERE ledger_seq = " << std::to_string(*index); if (cursor) - sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'"; + sql << " AND key > \'\\x" << ripple::strHex(*cursor) << "\'"; sql << " ORDER BY key ASC LIMIT " << std::to_string(limit); BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str(); auto res = pgQuery(sql.str().data()); @@ -366,6 +366,8 @@ PostgresBackend::fetchLedgerPage( return {results, returnCursor, "Data may be incomplete"}; return {results, returnCursor}; } + if (!cursor) + return {{}, {}, "Data may be incomplete"}; return {}; } @@ -436,6 +438,7 @@ PostgresBackend::fetchBookOffers( return {complete, results}; } + std::vector blobs = fetchLedgerObjects(keys, ledgerSequence); return {true, {}}; }; @@ -748,7 +751,8 @@ PostgresBackend::doFinishWrites() const bool PostgresBackend::writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const + uint32_t ledgerSequence, + bool isAsync) const { BOOST_LOG_TRIVIAL(debug) << __func__; PgQuery pgQuery(pgPool_); @@ -782,7 +786,8 @@ PostgresBackend::writeBooks( std::unordered_map< ripple::uint256, std::unordered_set> const& books, - uint32_t ledgerSequence) const + uint32_t ledgerSequence, + bool isAsync) const { BOOST_LOG_TRIVIAL(debug) << __func__; PgQuery pgQuery(pgPool_); diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index 84835417..a50d37c6 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -117,13 +117,15 @@ public: bool writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const override; + uint32_t ledgerSequence, + bool isAsync = false) const override; bool writeBooks( std::unordered_map< ripple::uint256, std::unordered_set> const& books, - uint32_t ledgerSequence) const override; + uint32_t ledgerSequence, + bool isAsync = false) const override; }; } // namespace Backend #endif diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index e9c315a8..44dd1448 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -296,7 +296,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); accumTxns_ += rawData.transactions_list().transactions_size(); bool success = true; - if (accumTxns_ > txnThreshold_) + if (accumTxns_ >= txnThreshold_) { auto start = std::chrono::system_clock::now(); success = flatMapBackend_->finishWrites(lgrInfo.seq);