diff --git a/reporting/BackendInterface.cpp b/reporting/BackendInterface.cpp new file mode 100644 index 00000000..ebf3abfe --- /dev/null +++ b/reporting/BackendInterface.cpp @@ -0,0 +1,280 @@ +#include +#include +#include +namespace Backend { +bool +BackendInterface::finishWrites(uint32_t ledgerSequence) const +{ + indexer_.finish(ledgerSequence, *this); + auto commitRes = doFinishWrites(); + if (commitRes) + { + if (isFirst_) + indexer_.doKeysRepairAsync(*this, ledgerSequence); + if (indexer_.isKeyFlagLedger(ledgerSequence)) + indexer_.writeKeyFlagLedgerAsync(ledgerSequence, *this); + isFirst_ = false; + } + else + { + // if commitRes is false, we are relinquishing control of ETL. We + // reset isFirst_ to true so that way if we later regain control of + // ETL, we trigger the index repair + isFirst_ = true; + } + return commitRes; +} +bool +BackendInterface::isLedgerIndexed(std::uint32_t ledgerSequence) const +{ + auto keyIndex = getKeyIndexOfSeq(ledgerSequence); + if (keyIndex) + { + auto page = doFetchLedgerPage({}, ledgerSequence, 1); + return !page.warning.has_value(); + } + return false; +} +void +BackendInterface::writeLedgerObject( + std::string&& key, + uint32_t seq, + std::string&& blob, + bool isCreated, + bool isDeleted, + std::optional&& book) const +{ + ripple::uint256 key256 = ripple::uint256::fromVoid(key.data()); + indexer_.addKey(std::move(key256)); + doWriteLedgerObject( + std::move(key), + seq, + std::move(blob), + isCreated, + isDeleted, + std::move(book)); +} +std::optional +BackendInterface::fetchLedgerRangeNoThrow() const +{ + BOOST_LOG_TRIVIAL(warning) << __func__; + while (true) + { + try + { + return fetchLedgerRange(); + } + catch (DatabaseTimeout& t) + { + ; + } + } +} +std::optional +BackendInterface::getKeyIndexOfSeq(uint32_t seq) const +{ + if (indexer_.isKeyFlagLedger(seq)) + return KeyIndex{seq}; + auto rng = fetchLedgerRange(); + if (!rng) + return {}; + if (rng->minSequence == seq) + return KeyIndex{seq}; + return indexer_.getKeyIndexOfSeq(seq); +} +BookOffersPage +BackendInterface::fetchBookOffers( + ripple::uint256 const& book, + uint32_t ledgerSequence, + std::uint32_t limit, + std::optional const& cursor) const +{ + BookOffersPage page; + const ripple::uint256 bookEnd = ripple::getQualityNext(book); + ripple::uint256 uTipIndex = book; + bool done = false; + while (page.offers.size() < limit) + { + auto offerDir = fetchSuccessor(uTipIndex, ledgerSequence); + if (!offerDir || offerDir->key > bookEnd) + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " - offerDir.has_value() " + << offerDir.has_value() << " breaking"; + break; + } + while (page.offers.size() < limit) + { + uTipIndex = offerDir->key; + ripple::STLedgerEntry sle{ + ripple::SerialIter{ + offerDir->blob.data(), offerDir->blob.size()}, + offerDir->key}; + auto indexes = sle.getFieldV256(ripple::sfIndexes); + std::vector keys; + keys.insert(keys.end(), indexes.begin(), indexes.end()); + auto objs = fetchLedgerObjects(keys, ledgerSequence); + for (size_t i = 0; i < keys.size(); ++i) + { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " key = " << ripple::strHex(keys[i]) + << " blob = " << ripple::strHex(objs[i]); + if (objs[i].size()) + page.offers.push_back({keys[i], objs[i]}); + } + auto next = sle.getFieldU64(ripple::sfIndexNext); + if (!next) + { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " next is empty. breaking"; + break; + } + auto nextKey = ripple::keylet::page(uTipIndex, next); + auto nextDir = fetchLedgerObject(nextKey.key, ledgerSequence); + assert(nextDir); + offerDir->blob = *nextDir; + offerDir->key = nextKey.key; + } + } + + return page; +} + +std::optional +BackendInterface::fetchSuccessor(ripple::uint256 key, uint32_t ledgerSequence) + const +{ + auto page = fetchLedgerPage({++key}, ledgerSequence, 1); + if (page.objects.size()) + return page.objects[0]; + return {}; +} +LedgerPage +BackendInterface::fetchLedgerPage( + std::optional const& cursor, + std::uint32_t ledgerSequence, + std::uint32_t limit) const +{ + assert(limit != 0); + bool incomplete = !isLedgerIndexed(ledgerSequence); + // really low limits almost always miss + uint32_t adjustedLimit = std::max(limit, (uint32_t)4); + LedgerPage page; + page.cursor = cursor; + do + { + adjustedLimit = adjustedLimit > 2048 ? 2048 : adjustedLimit * 2; + auto partial = + doFetchLedgerPage(page.cursor, ledgerSequence, adjustedLimit); + page.objects.insert( + page.objects.end(), partial.objects.begin(), partial.objects.end()); + page.cursor = partial.cursor; + } while (page.objects.size() < limit && page.cursor); + if (incomplete) + { + auto rng = fetchLedgerRange(); + if (!rng) + return page; + if (rng->minSequence == ledgerSequence) + { + BOOST_LOG_TRIVIAL(fatal) + << __func__ + << " Database is populated but first flag ledger is " + "incomplete. This should never happen"; + assert(false); + throw std::runtime_error("Missing base flag ledger"); + } + uint32_t lowerSequence = (ledgerSequence - 1) >> indexer_.getKeyShift() + << indexer_.getKeyShift(); + if (lowerSequence < rng->minSequence) + lowerSequence = rng->minSequence; + BOOST_LOG_TRIVIAL(debug) + << __func__ + << " recursing. ledgerSequence = " << std::to_string(ledgerSequence) + << " , lowerSequence = " << std::to_string(lowerSequence); + auto lowerPage = fetchLedgerPage(cursor, lowerSequence, limit); + std::vector keys; + std::transform( + std::move_iterator(lowerPage.objects.begin()), + std::move_iterator(lowerPage.objects.end()), + std::back_inserter(keys), + [](auto&& elt) { return std::move(elt.key); }); + auto objs = fetchLedgerObjects(keys, ledgerSequence); + for (size_t i = 0; i < keys.size(); ++i) + { + auto& obj = objs[i]; + auto& key = keys[i]; + if (obj.size()) + page.objects.push_back({std::move(key), std::move(obj)}); + } + std::sort(page.objects.begin(), page.objects.end(), [](auto a, auto b) { + return a.key < b.key; + }); + page.warning = "Data may be incomplete"; + } + if (page.objects.size() >= limit) + { + page.objects.resize(limit); + page.cursor = page.objects.back().key; + } + return page; +} + +void +BackendInterface::checkFlagLedgers() const +{ + auto rng = fetchLedgerRangeNoThrow(); + if (rng) + { + bool prevComplete = true; + uint32_t cur = rng->minSequence; + size_t numIncomplete = 0; + while (cur <= rng->maxSequence + 1) + { + auto keyIndex = getKeyIndexOfSeq(cur); + assert(keyIndex.has_value()); + cur = keyIndex->keyIndex; + + if (!isLedgerIndexed(cur)) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " - flag ledger " + << std::to_string(keyIndex->keyIndex) << " is incomplete"; + ++numIncomplete; + prevComplete = false; + } + else + { + if (!prevComplete) + { + BOOST_LOG_TRIVIAL(fatal) + << __func__ << " - flag ledger " + << std::to_string(keyIndex->keyIndex) + << " is incomplete but the next is complete. This " + "should never happen"; + assert(false); + throw std::runtime_error("missing prev flag ledger"); + } + prevComplete = true; + BOOST_LOG_TRIVIAL(info) + << __func__ << " - flag ledger " + << std::to_string(keyIndex->keyIndex) << " is complete"; + } + cur = cur + 1; + } + if (numIncomplete > 1) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " " << std::to_string(numIncomplete) + << " incomplete flag ledgers. " + "This can happen, but is unlikely. Check indexer_key_shift " + "in config"; + } + else + { + BOOST_LOG_TRIVIAL(info) + << __func__ << " number of incomplete flag ledgers = " + << std::to_string(numIncomplete); + } + } +} +} // namespace Backend