diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index a8282be0..c92f131a 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -2,7 +2,8 @@ namespace Backend { BackendIndexer::BackendIndexer(boost::json::object const& config) - : shift_(config.at("indexer_shift").as_int64()) + : keyShift_(config.at("indexer_key_shift").as_int64()) + , bookShift_(config.at("indexer_book_shift").as_int64()) { work_.emplace(ioc_); ioThread_ = std::thread{[this]() { ioc_.run(); }}; @@ -65,11 +66,59 @@ BackendIndexer::deleteBookOffer( } void -writeFlagLedger( +writeKeyFlagLedger( + uint32_t ledgerSequence, + uint32_t shift, + BackendInterface const& backend, + std::unordered_set const& keys) +{ + uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift)); + ripple::uint256 zero = {}; + BOOST_LOG_TRIVIAL(info) + << __func__ + << " starting. ledgerSequence = " << std::to_string(ledgerSequence) + << " nextFlag = " << std::to_string(nextFlag) + << " keys.size() = " << std::to_string(keys.size()); + while (true) + { + try + { + auto [objects, curCursor, warning] = + backend.fetchLedgerPage({}, nextFlag, 1); + if (!warning) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " flag ledger already written. sequence = " + << std::to_string(ledgerSequence) + << " next flag = " << std::to_string(nextFlag) + << "returning"; + return; + } + break; + } + catch (DatabaseTimeout& t) + { + ; + } + } + auto start = std::chrono::system_clock::now(); + + backend.writeKeys(keys, nextFlag); + backend.writeKeys({zero}, nextFlag); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. ledgerSequence = " << std::to_string(ledgerSequence) + << " nextFlag = " << std::to_string(nextFlag) + << " keys.size() = " << std::to_string(keys.size()) + << std::chrono::duration_cast(end - start) + .count(); +} +void +writeBookFlagLedger( uint32_t ledgerSequence, uint32_t shift, BackendInterface const& backend, - std::unordered_set const& keys, std::unordered_map< ripple::uint256, std::unordered_set> const& books) @@ -81,15 +130,14 @@ writeFlagLedger( << __func__ << " starting. ledgerSequence = " << std::to_string(ledgerSequence) << " nextFlag = " << std::to_string(nextFlag) - << " keys.size() = " << std::to_string(keys.size()) << " books.size() = " << std::to_string(books.size()); while (true) { try { auto [objects, curCursor, warning] = - backend.fetchLedgerPage({}, nextFlag, 1); - if (!(warning || objects.size() == 0)) + backend.fetchBookOffers(zero, nextFlag, 1); + if (!warning) { BOOST_LOG_TRIVIAL(warning) << __func__ << " flag ledger already written. sequence = " @@ -109,16 +157,11 @@ writeFlagLedger( backend.writeBooks(books, nextFlag); backend.writeBooks({{zero, {zero}}}, nextFlag); - BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote books. writing keys ..."; - - backend.writeKeys(keys, nextFlag); - backend.writeKeys({zero}, nextFlag); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) << __func__ << " finished. ledgerSequence = " << std::to_string(ledgerSequence) << " nextFlag = " << std::to_string(nextFlag) - << " keys.size() = " << std::to_string(keys.size()) << " books.size() = " << std::to_string(books.size()) << " time = " << std::chrono::duration_cast(end - start) .count(); @@ -132,7 +175,80 @@ BackendIndexer::clearCaches() } void -BackendIndexer::populateCaches( +BackendIndexer::doBooksRepair( + BackendInterface const& backend, + std::optional sequence) +{ + if (!sequence) + { + auto rng = backend.fetchLedgerRangeNoThrow(); + if (!rng) + return; + sequence = rng->maxSequence; + } + BOOST_LOG_TRIVIAL(info) + << __func__ << " sequence = " << std::to_string(*sequence); + ripple::uint256 zero = {}; + while (true) + { + try + { + auto [objects, cursor, warning] = + backend.fetchBookOffers(zero, *sequence, 1); + if (!warning) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " flag ledger already written. sequence = " + << std::to_string(*sequence) << "returning"; + return; + } + else + { + uint32_t lower = (*sequence - 1) >> bookShift_ << bookShift_; + doBooksRepair(backend, lower); + } + break; + } + catch (DatabaseTimeout& t) + { + ; + } + } + std::optional cursor; + while (true) + { + try + { + auto [objects, curCursor, warning] = + backend.fetchLedgerPage(cursor, *sequence, 2048); + + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + cursor = curCursor; + for (auto& obj : objects) + { + if (isOffer(obj.blob)) + { + auto book = getBook(obj.blob); + booksRepair[book].insert(obj.key); + } + } + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + writeBookFlagLedger(*sequence, bookShift_, backend, booksRepair); + booksRepair = {}; + BOOST_LOG_TRIVIAL(info) + << __func__ << " finished. sequence = " << std::to_string(*sequence); +} +void +BackendIndexer::doKeysRepair( BackendInterface const& backend, std::optional sequence) { @@ -152,16 +268,65 @@ BackendIndexer::populateCaches( { auto [objects, curCursor, warning] = backend.fetchLedgerPage(cursor, *sequence, 2048); - if (warning) + // no cursor means this is the first page + if (!cursor) { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " performing index repair"; - uint32_t lower = (*sequence - 1) >> shift_ << shift_; - populateCaches(backend, lower); - writeFlagLedger( - lower, shift_, backend, keysCumulative, booksCumulative); - clearCaches(); + // if there is no warning, we don't need to do a repair + // warning only shows up on the first page + if (!warning) + { + BOOST_LOG_TRIVIAL(info) + << __func__ + << " flag ledger already written. returning"; + return; + } + else + { + uint32_t lower = (*sequence - 1) >> keyShift_ << keyShift_; + doKeysRepair(backend, lower); + } } + + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + cursor = curCursor; + for (auto& obj : objects) + { + keysRepair.insert(obj.key); + } + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + writeKeyFlagLedger(*sequence, keyShift_, backend, keysRepair); + keysRepair = {}; + BOOST_LOG_TRIVIAL(info) + << __func__ << " finished. sequence = " << std::to_string(*sequence); +} + +void +BackendIndexer::populateCaches(BackendInterface const& backend) +{ + auto rng = backend.fetchLedgerRangeNoThrow(); + if (!rng) + return; + uint32_t sequence = rng->maxSequence; + BOOST_LOG_TRIVIAL(info) + << __func__ << " sequence = " << std::to_string(sequence); + doBooksRepair(backend, sequence); + doKeysRepair(backend, sequence); + std::optional cursor; + while (true) + { + try + { + auto [objects, curCursor, warning] = + backend.fetchLedgerPage(cursor, sequence, 2048); BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; for (auto& obj : objects) @@ -189,7 +354,6 @@ BackendIndexer::populateCaches( std::unique_lock lck(mtx); populatingCacheAsync = false; } - auto tip = backend.fetchLatestLedgerSequence(); for (auto& key : deletedKeys) { deleteKey(key); @@ -212,9 +376,7 @@ BackendIndexer::populateCaches( << " finished. keys.size() = " << std::to_string(keysCumulative.size()); } void -BackendIndexer::populateCachesAsync( - BackendInterface const& backend, - std::optional sequence) +BackendIndexer::populateCachesAsync(BackendInterface const& backend) { if (keysCumulative.size() > 0) { @@ -226,11 +388,8 @@ BackendIndexer::populateCachesAsync( std::unique_lock lck(mtx); populatingCacheAsync = true; } - BOOST_LOG_TRIVIAL(info) - << __func__ << " seq = " << (sequence ? std::to_string(*sequence) : ""); - boost::asio::post(ioc_, [this, sequence, &backend]() { - populateCaches(backend, sequence); - }); + BOOST_LOG_TRIVIAL(info) << __func__; + boost::asio::post(ioc_, [this, &backend]() { populateCaches(backend); }); } void @@ -243,7 +402,25 @@ BackendIndexer::waitForCaches() } void -BackendIndexer::writeFlagLedgerAsync( +BackendIndexer::writeKeyFlagLedgerAsync( + uint32_t ledgerSequence, + BackendInterface const& backend) +{ + BOOST_LOG_TRIVIAL(info) + << __func__ + << " starting. sequence = " << std::to_string(ledgerSequence); + + waitForCaches(); + auto keysCopy = keysCumulative; + boost::asio::post(ioc_, [=, this, &backend]() { + writeKeyFlagLedger(ledgerSequence, keyShift_, backend, keysCopy); + }); + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. sequence = " << std::to_string(ledgerSequence); +} +void +BackendIndexer::writeBookFlagLedgerAsync( uint32_t ledgerSequence, BackendInterface const& backend) { @@ -253,9 +430,8 @@ BackendIndexer::writeFlagLedgerAsync( waitForCaches(); auto booksCopy = booksCumulative; - auto keysCopy = keysCumulative; boost::asio::post(ioc_, [=, this, &backend]() { - writeFlagLedger(ledgerSequence, shift_, backend, keysCopy, booksCopy); + writeBookFlagLedger(ledgerSequence, bookShift_, backend, booksCopy); }); BOOST_LOG_TRIVIAL(info) << __func__ @@ -269,21 +445,23 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) << __func__ << " starting. sequence = " << std::to_string(ledgerSequence); bool isFirst = false; - uint32_t index = getIndexOfSeq(ledgerSequence); + uint32_t keyIndex = getKeyIndexOfSeq(ledgerSequence); + uint32_t bookIndex = getKeyIndexOfSeq(ledgerSequence); auto rng = backend.fetchLedgerRangeNoThrow(); if (!rng || rng->minSequence == ledgerSequence) { isFirst = true; - index = ledgerSequence; + keyIndex = bookIndex = ledgerSequence; } - backend.writeKeys(keys, index); - backend.writeBooks(books, index); + backend.writeKeys(keys, keyIndex); + backend.writeBooks(books, bookIndex); if (isFirst) { ripple::uint256 zero = {}; backend.writeBooks({{zero, {zero}}}, ledgerSequence); backend.writeKeys({zero}, ledgerSequence); - writeFlagLedgerAsync(ledgerSequence, backend); + writeBookFlagLedgerAsync(ledgerSequence, backend); + writeKeyFlagLedgerAsync(ledgerSequence, backend); } keys = {}; books = {}; diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index b7743b1b..e73ed390 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -68,7 +68,8 @@ class BackendIndexer std::mutex mutex_; std::optional work_; std::thread ioThread_; - uint32_t shift_ = 16; + uint32_t keyShift_ = 20; + uint32_t bookShift_ = 10; std::unordered_set keys; std::unordered_set keysCumulative; std::unordered_map> @@ -80,6 +81,9 @@ class BackendIndexer std::unordered_set deletedKeys; std::unordered_map> deletedBooks; + std::unordered_set keysRepair; + std::unordered_map> + booksRepair; std::mutex mtx; std::condition_variable cv_; @@ -95,13 +99,9 @@ public: ~BackendIndexer(); void - populateCachesAsync( - BackendInterface const& backend, - std::optional sequence = {}); + populateCachesAsync(BackendInterface const& backend); void - populateCaches( - BackendInterface const& backend, - std::optional sequence = {}); + populateCaches(BackendInterface const& backend); void clearCaches(); // Blocking, possibly for minutes @@ -123,26 +123,56 @@ public: void finish(uint32_t ledgerSequence, BackendInterface const& backend); void - writeFlagLedgerAsync( + writeKeyFlagLedgerAsync( uint32_t ledgerSequence, BackendInterface const& backend); + void + writeBookFlagLedgerAsync( + uint32_t ledgerSequence, + BackendInterface const& backend); + void + doKeysRepair( + BackendInterface const& backend, + std::optional sequence); + void + doBooksRepair( + BackendInterface const& backend, + std::optional sequence); uint32_t - getShift() + getBookShift() { - return shift_; + return bookShift_; } uint32_t - getIndexOfSeq(uint32_t seq) const + getKeyShift() { - if (isFlagLedger(seq)) + return keyShift_; + } + uint32_t + getKeyIndexOfSeq(uint32_t seq) const + { + if (isKeyFlagLedger(seq)) return seq; - auto incr = (1 << shift_); - return (seq >> shift_ << shift_) + incr; + auto incr = (1 << keyShift_); + return (seq >> keyShift_ << keyShift_) + incr; } bool - isFlagLedger(uint32_t ledgerSequence) const + isKeyFlagLedger(uint32_t ledgerSequence) const { - return (ledgerSequence % (1 << shift_)) == 0; + return (ledgerSequence % (1 << keyShift_)) == 0; + } + uint32_t + getBookIndexOfSeq(uint32_t seq) const + { + if (isBookFlagLedger(seq)) + return seq; + auto incr = (1 << bookShift_); + return (seq >> bookShift_ << bookShift_) + incr; + } + bool + isBookFlagLedger(uint32_t ledgerSequence) const + { + return (ledgerSequence % (1 << bookShift_)) == 0; } }; @@ -164,16 +194,28 @@ public: } std::optional - getIndexOfSeq(uint32_t seq) const + getKeyIndexOfSeq(uint32_t seq) const { - if (indexer_.isFlagLedger(seq)) + if (indexer_.isKeyFlagLedger(seq)) return seq; auto rng = fetchLedgerRange(); if (!rng) return {}; if (rng->minSequence == seq) return seq; - return indexer_.getIndexOfSeq(seq); + return indexer_.getKeyIndexOfSeq(seq); + } + std::optional + getBookIndexOfSeq(uint32_t seq) const + { + if (indexer_.isBookFlagLedger(seq)) + return seq; + auto rng = fetchLedgerRange(); + if (!rng) + return {}; + if (rng->minSequence == seq) + return seq; + return indexer_.getBookIndexOfSeq(seq); } bool @@ -183,8 +225,10 @@ public: auto commitRes = doFinishWrites(); if (commitRes) { - if (indexer_.isFlagLedger(ledgerSequence)) - indexer_.writeFlagLedgerAsync(ledgerSequence, *this); + if (indexer_.isBookFlagLedger(ledgerSequence)) + indexer_.writeBookFlagLedgerAsync(ledgerSequence, *this); + if (indexer_.isKeyFlagLedger(ledgerSequence)) + indexer_.writeKeyFlagLedgerAsync(ledgerSequence, *this); } return commitRes; } diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 3e810ed4..98e0a188 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -399,7 +399,7 @@ CassandraBackend::fetchLedgerPage( std::uint32_t ledgerSequence, std::uint32_t limit) const { - auto index = getIndexOfSeq(ledgerSequence); + auto index = getKeyIndexOfSeq(ledgerSequence); if (!index) return {}; LedgerPage page; @@ -449,11 +449,11 @@ CassandraBackend::fetchLedgerPage( page.objects.push_back({std::move(key), std::move(obj)}); } } - if (keys.size() && !cursor && !keys[0].isZero()) + if (!keys.size() || (!cursor && !keys[0].isZero())) page.warning = "Data may be incomplete"; return page; } - return {{}, {}}; + return {{}, {}, "Data may be incomplete"}; } std::vector CassandraBackend::fetchLedgerObjects( @@ -498,7 +498,7 @@ CassandraBackend::fetchBookOffers( { CassandraStatement statement{selectBook_}; statement.bindBytes(book); - auto index = getIndexOfSeq(sequence); + auto index = getBookIndexOfSeq(sequence); if (!index) return {}; BOOST_LOG_TRIVIAL(info) << __func__ << " index = " << std::to_string(*index) @@ -517,7 +517,7 @@ CassandraBackend::fetchBookOffers( BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; std::vector keys; if (!result) - return {{}, {}}; + return {{}, {}, "Data may be incomplete"}; do { keys.push_back(result.getUInt256()); @@ -535,17 +535,17 @@ CassandraBackend::fetchBookOffers( results.push_back({keys[i], objs[i]}); } std::optional warning; - if (keys[0].isZero()) + if (!cursor && !keys[0].isZero()) warning = "Data may be incomplete"; if (keys.size() == limit) return {results, keys[keys.size() - 1], warning}; else return {results, {}, warning}; - - return {{}, {}}; } + else if (!cursor) + return {{}, {}, "Data may be incomplete"}; - return {{}, {}}; + return {}; } struct WriteBookCallbackData { diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 969f6572..af3d9c18 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -329,7 +329,7 @@ PostgresBackend::fetchLedgerPage( std::uint32_t ledgerSequence, std::uint32_t limit) const { - auto index = getIndexOfSeq(ledgerSequence); + auto index = getKeyIndexOfSeq(ledgerSequence); if (!index) return {}; PgQuery pgQuery(pgPool_); @@ -366,6 +366,8 @@ PostgresBackend::fetchLedgerPage( return {results, returnCursor, "Data may be incomplete"}; return {results, returnCursor}; } + if (!cursor) + return {{}, {}, "Data may be incomplete"}; return {}; } diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 3306c2fd..8234fd4d 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -300,7 +300,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); accumTxns_ += rawData.transactions_list().transactions_size(); bool success = true; - if (accumTxns_ > txnThreshold_) + if (accumTxns_ >= txnThreshold_) { auto start = std::chrono::system_clock::now(); success = flatMapBackend_->finishWrites(lgrInfo.seq);