diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index af0f49bd..fe4f5510 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -5,101 +5,20 @@ BackendIndexer::BackendIndexer(boost::json::object const& config) { if (config.contains("indexer_key_shift")) keyShift_ = config.at("indexer_key_shift").as_int64(); - if (config.contains("indexer_book_shift")) - bookShift_ = config.at("indexer_book_shift").as_int64(); work_.emplace(ioc_); ioThread_ = std::thread{[this]() { ioc_.run(); }}; - updateThread_ = std::thread{[this]() { ioc_.run(); }}; }; BackendIndexer::~BackendIndexer() { std::unique_lock lck(mutex_); work_.reset(); ioThread_.join(); - updateThread_.join(); -} -void -BackendIndexer::writeLedgerObject( - ripple::uint256&& key, - std::optional&& book, - bool isCreated, - bool isDeleted) -{ - ++updatesOutstanding_; - boost::asio::post( - ioc_, - [this, - key = std::move(key), - isCreated, - isDeleted, - book = std::move(book)]() { - if (isCreated) - addKey(key); - if (isDeleted) - deleteKey(key); - if (book) - { - if (isCreated) - addBookOffer(*book, key); - if (isDeleted) - deleteBookOffer(*book, key); - } - --updatesOutstanding_; - { - std::unique_lock lck(mtx); - updateCv_.notify_one(); - } - }); } void -BackendIndexer::addKey(ripple::uint256 const& key) +BackendIndexer::addKey(ripple::uint256&& key) { - std::unique_lock lck(mtx); - keys.insert(key); - keysCumulative.insert(key); -} -void -BackendIndexer::addKeyAsync(ripple::uint256 const& key) -{ - std::unique_lock lck(mtx); - keysCumulative.insert(key); -} -void -BackendIndexer::deleteKey(ripple::uint256 const& key) -{ - std::unique_lock lck(mtx); - keysCumulative.erase(key); - if (populatingCacheAsync) - deletedKeys.insert(key); -} - -void -BackendIndexer::addBookOffer( - ripple::uint256 const& book, - ripple::uint256 const& offerKey) -{ - std::unique_lock lck(mtx); - books[book].insert(offerKey); - booksCumulative[book].insert(offerKey); -} -void -BackendIndexer::addBookOfferAsync( - ripple::uint256 const& book, - ripple::uint256 const& offerKey) -{ - std::unique_lock lck(mtx); - booksCumulative[book].insert(offerKey); -} -void -BackendIndexer::deleteBookOffer( - ripple::uint256 const& book, - ripple::uint256 const& offerKey) -{ - std::unique_lock lck(mtx); - booksCumulative[book].erase(offerKey); - if (populatingCacheAsync) - deletedBooks[book].insert(offerKey); + keys.insert(std::move(key)); } void @@ -152,123 +71,6 @@ writeKeyFlagLedger( .count(); } void -writeBookFlagLedger( - uint32_t ledgerSequence, - uint32_t shift, - BackendInterface const& backend, - std::unordered_map< - ripple::uint256, - std::unordered_set> const& books) -{ - uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift)); - ripple::uint256 zero = {}; - BOOST_LOG_TRIVIAL(info) - << __func__ - << " starting. ledgerSequence = " << std::to_string(ledgerSequence) - << " nextFlag = " << std::to_string(nextFlag) - << " books.size() = " << std::to_string(books.size()); - - auto start = std::chrono::system_clock::now(); - backend.writeBooks(books, BookIndex{nextFlag}, true); - backend.writeBooks({{zero, {zero}}}, BookIndex{nextFlag}, true); - - auto end = std::chrono::system_clock::now(); - - BOOST_LOG_TRIVIAL(info) - << __func__ - << " finished. ledgerSequence = " << std::to_string(ledgerSequence) - << " nextFlag = " << std::to_string(nextFlag) - << " books.size() = " << std::to_string(books.size()) << " time = " - << std::chrono::duration_cast(end - start) - .count(); -} - -void -BackendIndexer::clearCaches() -{ - keysCumulative = {}; - booksCumulative = {}; -} - -void -BackendIndexer::doBooksRepair( - BackendInterface const& backend, - std::optional sequence) -{ - auto rng = backend.fetchLedgerRangeNoThrow(); - - if (!rng) - return; - - if (!sequence) - sequence = rng->maxSequence; - - if (sequence < rng->minSequence) - sequence = rng->minSequence; - - BOOST_LOG_TRIVIAL(info) - << __func__ << " sequence = " << std::to_string(*sequence); - - ripple::uint256 zero = {}; - while (true) - { - try - { - auto [objects, cursor, warning] = - backend.fetchBookOffers(zero, *sequence, 1); - if (!warning) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " flag ledger already written. sequence = " - << std::to_string(*sequence) << "returning"; - return; - } - else - { - uint32_t lower = (*sequence - 1) >> bookShift_ << bookShift_; - doBooksRepair(backend, lower); - } - break; - } - catch (DatabaseTimeout& t) - { - ; - } - } - std::optional cursor; - while (true) - { - try - { - auto [objects, curCursor, warning] = - backend.fetchLedgerPage(cursor, *sequence, 2048); - - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; - cursor = curCursor; - for (auto& obj : objects) - { - if (isOffer(obj.blob)) - { - auto book = getBook(obj.blob); - booksRepair[book].insert(obj.key); - } - } - if (!cursor) - break; - } - catch (DatabaseTimeout const& e) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " Database timeout fetching keys"; - std::this_thread::sleep_for(std::chrono::seconds(2)); - } - } - writeBookFlagLedger(*sequence, bookShift_, backend, booksRepair); - booksRepair = {}; - BOOST_LOG_TRIVIAL(info) - << __func__ << " finished. sequence = " << std::to_string(*sequence); -} -void BackendIndexer::doKeysRepair( BackendInterface const& backend, std::optional sequence) @@ -293,34 +95,23 @@ BackendIndexer::doKeysRepair( try { auto [objects, curCursor, warning] = - backend.fetchLedgerPage(cursor, *sequence, 2048); + backend.fetchLedgerPage({}, *sequence, 1); // no cursor means this is the first page - if (!cursor) + // if there is no warning, we don't need to do a repair + // warning only shows up on the first page + if (!warning) { - // if there is no warning, we don't need to do a repair - // warning only shows up on the first page - if (!warning) - { - BOOST_LOG_TRIVIAL(info) - << __func__ - << " flag ledger already written. returning"; - return; - } - else - { - uint32_t lower = (*sequence - 1) >> keyShift_ << keyShift_; - doKeysRepair(backend, lower); - } + BOOST_LOG_TRIVIAL(debug) + << __func__ << " flag ledger already written. returning"; + return; } - - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; - cursor = curCursor; - for (auto& obj : objects) + else { - keysRepair.insert(obj.key); + uint32_t lower = (*sequence - 1) >> keyShift_ << keyShift_; + doKeysRepair(backend, lower); + writeKeyFlagLedgerAsync(lower, backend); + return; } - if (!cursor) - break; } catch (DatabaseTimeout const& e) { @@ -329,104 +120,10 @@ BackendIndexer::doKeysRepair( std::this_thread::sleep_for(std::chrono::seconds(2)); } } - writeKeyFlagLedger(*sequence, keyShift_, backend, keysRepair); - keysRepair = {}; BOOST_LOG_TRIVIAL(info) << __func__ << " finished. sequence = " << std::to_string(*sequence); } -void -BackendIndexer::populateCaches(BackendInterface const& backend) -{ - auto rng = backend.fetchLedgerRangeNoThrow(); - if (!rng) - return; - uint32_t sequence = rng->maxSequence; - BOOST_LOG_TRIVIAL(info) - << __func__ << " sequence = " << std::to_string(sequence); - doBooksRepair(backend, sequence); - doKeysRepair(backend, sequence); - std::optional cursor; - while (true) - { - try - { - auto [objects, curCursor, warning] = - backend.fetchLedgerPage(cursor, sequence, 2048); - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; - cursor = curCursor; - for (auto& obj : objects) - { - addKeyAsync(obj.key); - if (isOffer(obj.blob)) - { - auto book = getBook(obj.blob); - addBookOfferAsync(book, obj.key); - } - } - if (!cursor) - break; - } - catch (DatabaseTimeout const& e) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " Database timeout fetching keys"; - std::this_thread::sleep_for(std::chrono::seconds(2)); - } - } - // Do reconcilation. Remove anything from keys or books that shouldn't - // be there - { - std::unique_lock lck(mtx); - populatingCacheAsync = false; - } - for (auto& key : deletedKeys) - { - deleteKey(key); - } - for (auto& book : deletedBooks) - { - for (auto& offer : book.second) - { - deleteBookOffer(book.first, offer); - } - } - { - std::unique_lock lck(mtx); - deletedKeys = {}; - deletedBooks = {}; - cacheCv_.notify_one(); - } - BOOST_LOG_TRIVIAL(info) - << __func__ - << " finished. keys.size() = " << std::to_string(keysCumulative.size()); -} -void -BackendIndexer::populateCachesAsync(BackendInterface const& backend) -{ - if (keysCumulative.size() > 0) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " caches already populated. returning"; - return; - } - { - std::unique_lock lck(mtx); - populatingCacheAsync = true; - } - BOOST_LOG_TRIVIAL(info) << __func__; - boost::asio::post(ioc_, [this, &backend]() { populateCaches(backend); }); -} - -void -BackendIndexer::waitForCaches() -{ - std::unique_lock lck(mtx); - cacheCv_.wait(lck, [this]() { - return !populatingCacheAsync && deletedKeys.size() == 0; - }); -} - void BackendIndexer::writeKeyFlagLedgerAsync( uint32_t ledgerSequence, @@ -436,28 +133,82 @@ BackendIndexer::writeKeyFlagLedgerAsync( << __func__ << " starting. sequence = " << std::to_string(ledgerSequence); - waitForCaches(); - auto keysCopy = keysCumulative; - boost::asio::post(ioc_, [=, this, &backend]() { - writeKeyFlagLedger(ledgerSequence, keyShift_, backend, keysCopy); - }); - BOOST_LOG_TRIVIAL(info) - << __func__ - << " finished. sequence = " << std::to_string(ledgerSequence); -} -void -BackendIndexer::writeBookFlagLedgerAsync( - uint32_t ledgerSequence, - BackendInterface const& backend) -{ - BOOST_LOG_TRIVIAL(info) - << __func__ - << " starting. sequence = " << std::to_string(ledgerSequence); + boost::asio::post(ioc_, [this, ledgerSequence, &backend]() { + std::unordered_set keys; + auto nextFlag = getKeyIndexOfSeq(ledgerSequence + 1); + BOOST_LOG_TRIVIAL(info) + << "writeKeyFlagLedger - " << std::to_string(nextFlag.keyIndex) + << " starting"; + ripple::uint256 zero = {}; + std::optional cursor; + size_t numKeys = 0; + auto begin = std::chrono::system_clock::now(); + while (true) + { + try + { + auto start = std::chrono::system_clock::now(); + auto [objects, curCursor, warning] = + backend.fetchLedgerPage(cursor, ledgerSequence, 2048); + auto mid = std::chrono::system_clock::now(); + // no cursor means this is the first page + if (!cursor) + { + // if there is no warning, we don't need to do a repair + // warning only shows up on the first page + if (warning) + { + BOOST_LOG_TRIVIAL(error) + << "writeKeyFlagLedger - " + << " prev flag ledger not written " + << std::to_string(nextFlag.keyIndex) << " : " + << std::to_string(ledgerSequence); + assert(false); + throw std::runtime_error("Missing prev flag"); + } + } - waitForCaches(); - auto booksCopy = booksCumulative; - boost::asio::post(ioc_, [=, this, &backend]() { - writeBookFlagLedger(ledgerSequence, bookShift_, backend, booksCopy); + cursor = curCursor; + for (auto& obj : objects) + { + keys.insert(obj.key); + } + backend.writeKeys(keys, nextFlag, true); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(debug) + << "writeKeyFlagLedger - " + << std::to_string(nextFlag.keyIndex) << " fetched a page " + << " cursor = " + << (cursor.has_value() ? ripple::strHex(*cursor) + : std::string{}) + << " num keys = " << std::to_string(numKeys) + << " fetch time = " + << std::chrono::duration_cast( + mid - start) + .count() + << " write time = " + << std::chrono::duration_cast( + end - mid) + .count(); + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + backend.writeKeys({zero}, nextFlag, true); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << "writeKeyFlagLedger - " << std::to_string(nextFlag.keyIndex) + << " finished. " + << " num keys = " << std::to_string(numKeys) << " total time = " + << std::chrono::duration_cast( + end - begin) + .count(); }); BOOST_LOG_TRIVIAL(info) << __func__ @@ -472,7 +223,6 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) << " starting. sequence = " << std::to_string(ledgerSequence); bool isFirst = false; auto keyIndex = getKeyIndexOfSeq(ledgerSequence); - auto bookIndex = getBookIndexOfSeq(ledgerSequence); if (isFirst_) { auto rng = backend.fetchLedgerRangeNoThrow(); @@ -481,26 +231,18 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) else { keyIndex = KeyIndex{ledgerSequence}; - bookIndex = BookIndex{ledgerSequence}; } } - { - std::unique_lock lck(mtx); - updateCv_.wait(lck, [this]() { return updatesOutstanding_ == 0; }); - } backend.writeKeys(keys, keyIndex); - backend.writeBooks(books, bookIndex); if (isFirst_) { // write completion record ripple::uint256 zero = {}; - backend.writeBooks({{zero, {zero}}}, bookIndex); backend.writeKeys({zero}, keyIndex); } isFirst_ = false; keys = {}; - books = {}; BOOST_LOG_TRIVIAL(info) << __func__ << " finished. sequence = " << std::to_string(ledgerSequence); diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index 78f54221..fb60a107 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -83,70 +83,18 @@ class BackendIndexer std::mutex mutex_; std::optional work_; std::thread ioThread_; - std::thread updateThread_; - std::atomic_uint32_t updatesOutstanding_ = 0; - std::condition_variable updateCv_; uint32_t keyShift_ = 20; - uint32_t bookShift_ = 10; std::unordered_set keys; - std::unordered_set keysCumulative; - std::unordered_map> - books; - std::unordered_map> - booksCumulative; - bool populatingCacheAsync = false; - // These are only used when the cache is being populated asynchronously - std::unordered_set deletedKeys; - std::unordered_map> - deletedBooks; - std::unordered_set keysRepair; - std::unordered_map> - booksRepair; - std::mutex mtx; - std::condition_variable cacheCv_; mutable bool isFirst_ = true; - void - addKeyAsync(ripple::uint256 const& key); - void - addBookOfferAsync( - ripple::uint256 const& book, - ripple::uint256 const& offerKey); - public: BackendIndexer(boost::json::object const& config); ~BackendIndexer(); void - populateCachesAsync(BackendInterface const& backend); - void - populateCaches(BackendInterface const& backend); - void - clearCaches(); - // Blocking, possibly for minutes - void - waitForCaches(); - - void - writeLedgerObject( - ripple::uint256&& key, - std::optional&& book, - bool isCreated, - bool isDeleted); - - void - addKey(ripple::uint256 const& key); - void - deleteKey(ripple::uint256 const& key); - void - addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey); - - void - deleteBookOffer( - ripple::uint256 const& book, - ripple::uint256 const& offerKey); + addKey(ripple::uint256&& key); void finish(uint32_t ledgerSequence, BackendInterface const& backend); @@ -155,22 +103,9 @@ public: uint32_t ledgerSequence, BackendInterface const& backend); void - writeBookFlagLedgerAsync( - uint32_t ledgerSequence, - BackendInterface const& backend); - void doKeysRepair( BackendInterface const& backend, std::optional sequence); - void - doBooksRepair( - BackendInterface const& backend, - std::optional sequence); - uint32_t - getBookShift() - { - return bookShift_; - } uint32_t getKeyShift() { @@ -191,24 +126,6 @@ public: { return (ledgerSequence % (1 << keyShift_)) == 0; } - BookIndex - getBookIndexOfSeq(uint32_t seq) const - { - if (isBookFlagLedger(seq)) - return BookIndex{seq}; - auto incr = (1 << bookShift_); - BookIndex index{(seq >> bookShift_ << bookShift_) + incr}; - assert(isBookFlagLedger(index.bookIndex)); - assert( - bookShift_ == keyShift_ || !isKeyFlagLedger(index.bookIndex) || - !isKeyFlagLedger(index.bookIndex + incr)); - return index; - } - bool - isBookFlagLedger(uint32_t ledgerSequence) const - { - return (ledgerSequence % (1 << bookShift_)) == 0; - } }; class BackendInterface @@ -241,18 +158,6 @@ public: return KeyIndex{seq}; return indexer_.getKeyIndexOfSeq(seq); } - std::optional - getBookIndexOfSeq(uint32_t seq) const - { - if (indexer_.isBookFlagLedger(seq)) - return BookIndex{seq}; - auto rng = fetchLedgerRange(); - if (!rng) - return {}; - if (rng->minSequence == seq) - return BookIndex{seq}; - return indexer_.getBookIndexOfSeq(seq); - } bool finishWrites(uint32_t ledgerSequence) const @@ -266,9 +171,8 @@ public: auto rng = fetchLedgerRangeNoThrow(); if (rng && rng->minSequence != ledgerSequence) isFirst_ = false; + indexer_.doKeysRepair(*this, ledgerSequence); } - if (indexer_.isBookFlagLedger(ledgerSequence) || isFirst_) - indexer_.writeBookFlagLedgerAsync(ledgerSequence, *this); if (indexer_.isKeyFlagLedger(ledgerSequence) || isFirst_) indexer_.writeKeyFlagLedgerAsync(ledgerSequence, *this); isFirst_ = false; @@ -315,8 +219,76 @@ public: virtual std::vector fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const = 0; - virtual LedgerPage + LedgerPage fetchLedgerPage( + std::optional const& cursor, + std::uint32_t ledgerSequence, + std::uint32_t limit) const + { + assert(limit != 0); + bool incomplete = false; + { + auto check = doFetchLedgerPage({}, ledgerSequence, 1); + incomplete = check.warning.has_value(); + } + uint32_t adjustedLimit = limit; + LedgerPage page; + page.cursor = cursor; + do + { + adjustedLimit = adjustedLimit > 2048 ? 2048 : adjustedLimit * 2; + auto partial = + doFetchLedgerPage(page.cursor, ledgerSequence, adjustedLimit); + page.objects.insert( + page.objects.end(), + partial.objects.begin(), + partial.objects.end()); + page.cursor = partial.cursor; + } while (page.objects.size() < limit && page.cursor); + if (incomplete) + { + std::cout << "checking lower" << std::endl; + uint32_t lowerSequence = ledgerSequence >> indexer_.getKeyShift() + << indexer_.getKeyShift(); + auto lowerPage = fetchLedgerPage(cursor, lowerSequence, limit); + std::vector keys; + std::transform( + std::move_iterator(lowerPage.objects.begin()), + std::move_iterator(lowerPage.objects.end()), + std::back_inserter(keys), + [](auto&& elt) { return std::move(elt.key); }); + auto objs = fetchLedgerObjects(keys, ledgerSequence); + for (size_t i = 0; i < keys.size(); ++i) + { + auto& obj = objs[i]; + auto& key = keys[i]; + if (obj.size()) + page.objects.push_back({std::move(key), std::move(obj)}); + } + std::sort( + page.objects.begin(), page.objects.end(), [](auto a, auto b) { + return a.key < b.key; + }); + page.warning = "Data may be incomplete"; + } + if (page.objects.size() >= limit) + { + page.objects.resize(limit); + page.cursor = page.objects.back().key; + } + return page; + } + + std::optional + fetchSuccessor(ripple::uint256 key, uint32_t ledgerSequence) + { + auto page = fetchLedgerPage({++key}, ledgerSequence, 1); + if (page.objects.size()) + return page.objects[0]; + return {}; + } + virtual LedgerPage + doFetchLedgerPage( std::optional const& cursor, std::uint32_t ledgerSequence, std::uint32_t limit) const = 0; @@ -363,8 +335,7 @@ public: std::optional&& book) const { ripple::uint256 key256 = ripple::uint256::fromVoid(key.data()); - indexer_.writeLedgerObject( - std::move(key256), std::move(book), isCreated, isDeleted); + indexer_.addKey(std::move(key256)); doWriteLedgerObject( std::move(key), seq, @@ -418,13 +389,6 @@ public: std::unordered_set const& keys, KeyIndex const& index, bool isAsync = false) const = 0; - virtual bool - writeBooks( - std::unordered_map< - ripple::uint256, - std::unordered_set> const& books, - BookIndex const& index, - bool isAsync = false) const = 0; virtual ~BackendInterface() { diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index d22f0e5a..7c4152ab 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -394,7 +394,7 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const return objects; } LedgerPage -CassandraBackend::fetchLedgerPage( +CassandraBackend::doFetchLedgerPage( std::optional const& cursor, std::uint32_t ledgerSequence, std::uint32_t limit) const @@ -433,7 +433,7 @@ CassandraBackend::fetchLedgerPage( if (keys.size() && keys.size() == limit) { page.cursor = keys.back(); - keys.pop_back(); + ++(*page.cursor); } auto objects = fetchLedgerObjects(keys, ledgerSequence); if (objects.size() != keys.size()) @@ -501,124 +501,7 @@ CassandraBackend::fetchBookOffers( std::uint32_t limit, std::optional const& cursor) const { - auto rng = fetchLedgerRange(); - auto limitTuningFactor = 50; - - if (!rng) - return {{}, {}}; - - auto readBooks = - [this, &book, &limit, &limitTuningFactor](std::uint32_t sequence) - -> std::pair< - bool, - std::vector>> { - CassandraStatement completeQuery{completeBook_}; - completeQuery.bindInt(sequence); - CassandraResult completeResult = executeSyncRead(completeQuery); - bool complete = completeResult.hasResult(); - - CassandraStatement statement{selectBook_}; - std::vector> keys = {}; - - statement.bindBytes(book.data(), 24); - statement.bindInt(sequence); - - BOOST_LOG_TRIVIAL(info) - << __func__ << " upper = " << std::to_string(sequence) << " book = " - << ripple::strHex(std::string((char*)book.data(), 24)); - - ripple::uint256 zero = beast::zero; - statement.bindBytes(zero.data(), 8); - statement.bindBytes(zero); - - statement.bindUInt(limit * limitTuningFactor); - - auto start = std::chrono::system_clock::now(); - - CassandraResult result = executeSyncRead(statement); - - auto end = std::chrono::system_clock::now(); - auto duration = ((end - start).count()) / 1000000000.0; - - BOOST_LOG_TRIVIAL(info) << "Book directory fetch took " - << std::to_string(duration) << " seconds."; - - BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; - if (!result) - { - return {false, {{}, {}}}; - } - - do - { - auto [quality, index] = result.getBytesTuple(); - std::uint64_t q = 0; - memcpy(&q, quality.data(), 8); - keys.push_back({q, ripple::uint256::fromVoid(index.data())}); - - } while (result.nextRow()); - - return {complete, keys}; - }; - - auto upper = getBookIndexOfSeq(ledgerSequence); - auto [complete, quality_keys] = readBooks(upper->bookIndex); - - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - populated keys. num keys = " << quality_keys.size(); - - std::optional warning = {}; - if (!complete) - { - warning = "Data may be incomplete"; - BOOST_LOG_TRIVIAL(info) << "May be incomplete. Fetching other page"; - - auto bookShift = indexer_.getBookShift(); - std::uint32_t lower = upper->bookIndex - (1 << bookShift); - auto originalKeys = std::move(quality_keys); - auto [lowerComplete, otherKeys] = readBooks(lower); - - assert(lowerComplete); - - std::vector> merged_keys; - merged_keys.reserve(originalKeys.size() + otherKeys.size()); - std::merge( - originalKeys.begin(), - originalKeys.end(), - otherKeys.begin(), - otherKeys.end(), - std::back_inserter(merged_keys), - [](auto pair1, auto pair2) { return pair1.first < pair2.first; }); - } - - std::vector merged(quality_keys.size()); - std::transform( - quality_keys.begin(), - quality_keys.end(), - std::back_inserter(merged), - [](auto pair) { return pair.second; }); - - auto uniqEnd = std::unique(merged.begin(), merged.end()); - std::vector keys{merged.begin(), uniqEnd}; - - std::cout << keys.size() << std::endl; - - auto start = std::chrono::system_clock::now(); - std::vector objs = fetchLedgerObjects(keys, ledgerSequence); - auto end = std::chrono::system_clock::now(); - auto duration = ((end - start).count()) / 1000000000.0; - - BOOST_LOG_TRIVIAL(info) - << "Book object fetch took " << std::to_string(duration) << " seconds."; - - std::vector results; - for (size_t i = 0; i < objs.size(); ++i) - { - if (objs[i].size() != 0) - results.push_back({keys[i], objs[i]}); - } - - return {results, {}, warning}; + return {}; } // namespace Backend struct WriteBookCallbackData { @@ -907,57 +790,6 @@ CassandraBackend::writeKeys( return true; } -bool -CassandraBackend::writeBooks( - std::unordered_map< - ripple::uint256, - std::unordered_set> const& books, - BookIndex const& index, - bool isAsync) const -{ - BOOST_LOG_TRIVIAL(info) - << __func__ << " Ledger = " << std::to_string(index.bookIndex) - << " . num books = " << std::to_string(books.size()); - std::condition_variable cv; - std::mutex mtx; - std::vector> cbs; - uint32_t concurrentLimit = - isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding; - std::atomic_uint32_t numOutstanding = 0; - size_t count = 0; - auto start = std::chrono::system_clock::now(); - for (auto& book : books) - { - for (auto& offer : book.second) - { - ++numOutstanding; - ++count; - cbs.push_back(std::make_shared( - *this, - book.first, - offer, - index.bookIndex, - cv, - mtx, - numOutstanding)); - writeBook(*cbs.back()); - BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; - std::unique_lock lck(mtx); - BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; - cv.wait(lck, [&numOutstanding, concurrentLimit]() { - return numOutstanding < concurrentLimit; - }); - } - } - BOOST_LOG_TRIVIAL(info) << __func__ - << "Submitted all book writes. Waiting for them to " - "finish. num submitted = " - << std::to_string(count); - std::unique_lock lck(mtx); - cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); - BOOST_LOG_TRIVIAL(info) << __func__ << "Finished writing books"; - return true; -} bool CassandraBackend::isIndexed(uint32_t ledgerSequence) const { @@ -1445,18 +1277,17 @@ CassandraBackend::open(bool readOnly) cass_cluster_set_connect_timeout(cluster, 10000); int ttl = getInt("ttl") ? *getInt("ttl") * 2 : 0; - int keysTtl, - keysIncr = ttl != 0 ? pow(2, indexer_.getKeyShift()) * 4 * 2 : 0; + int keysTtl = (ttl != 0 ? pow(2, indexer_.getKeyShift()) * 4 * 2 : 0); + int incr = keysTtl; while (keysTtl < ttl) { - keysTtl += keysIncr; - } - int booksTtl, - booksIncr = ttl != 0 ? pow(2, indexer_.getBookShift()) * 4 * 2 : 0; - while (booksTtl < ttl) - { - booksTtl += booksIncr; + keysTtl += incr; } + int booksTtl = 0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " setting ttl to " << std::to_string(ttl) + << " , books ttl to " << std::to_string(booksTtl) << " , keys ttl to " + << std::to_string(keysTtl); auto executeSimpleStatement = [this](std::string const& query) { CassStatement* statement = makeStatement(query.c_str(), 0); @@ -1529,7 +1360,7 @@ CassandraBackend::open(bool readOnly) << " ( key blob, sequence bigint, object blob, PRIMARY " "KEY(key, " "sequence)) WITH CLUSTERING ORDER BY (sequence DESC) AND" - << " default_time_to_live = " << ttl; + << " default_time_to_live = " << std::to_string(ttl); if (!executeSimpleStatement(query.str())) continue; @@ -1544,7 +1375,7 @@ CassandraBackend::open(bool readOnly) << " ( hash blob PRIMARY KEY, ledger_sequence bigint, " "transaction " "blob, metadata blob)" - << " WITH default_time_to_live = " << ttl; + << " WITH default_time_to_live = " << std::to_string(ttl); if (!executeSimpleStatement(query.str())) continue; @@ -1571,7 +1402,7 @@ CassandraBackend::open(bool readOnly) << " ( sequence bigint, key blob, PRIMARY KEY " "(sequence, key))" " WITH default_time_to_live = " - << keysTtl; + << std::to_string(keysTtl); if (!executeSimpleStatement(query.str())) continue; @@ -1582,28 +1413,13 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); - query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books" - << " ( book blob, sequence bigint, quality_key tuple, PRIMARY KEY " - "((book, sequence), quality_key)) WITH CLUSTERING ORDER BY " - "(quality_key " - "ASC) AND default_time_to_live = " - << booksTtl; - if (!executeSimpleStatement(query.str())) - continue; - query.str(""); - query << "SELECT * FROM " << tablePrefix << "books" - << " LIMIT 1"; - if (!executeSimpleStatement(query.str())) - continue; - query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx" << " ( account blob, seq_idx tuple, " " hash blob, " "PRIMARY KEY " "(account, seq_idx)) WITH " "CLUSTERING ORDER BY (seq_idx desc)" - << " AND default_time_to_live = " << ttl; + << " AND default_time_to_live = " << std::to_string(ttl); if (!executeSimpleStatement(query.str())) continue; @@ -1617,7 +1433,7 @@ CassandraBackend::open(bool readOnly) query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers" << " ( sequence bigint PRIMARY KEY, header blob )" - << " WITH default_time_to_live = " << ttl; + << " WITH default_time_to_live = " << std::to_string(ttl); if (!executeSimpleStatement(query.str())) continue; @@ -1630,7 +1446,7 @@ CassandraBackend::open(bool readOnly) query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes" << " (hash blob PRIMARY KEY, sequence bigint)" - << " WITH default_time_to_live = " << ttl; + << " WITH default_time_to_live = " << std::to_string(ttl); if (!executeSimpleStatement(query.str())) continue; @@ -1680,13 +1496,6 @@ CassandraBackend::open(bool readOnly) if (!insertKey_.prepareStatement(query, session_.get())) continue; - query.str(""); - query << "INSERT INTO " << tablePrefix << "books" - << " (book, sequence, quality_key) VALUES (?, ?, (?, ?))"; - if (!insertBook2_.prepareStatement(query, session_.get())) - continue; - query.str(""); - query.str(""); query << "SELECT key FROM " << tablePrefix << "keys" << " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?"; @@ -1755,23 +1564,6 @@ CassandraBackend::open(bool readOnly) if (!getToken_.prepareStatement(query, session_.get())) continue; - query.str(""); - query << "SELECT quality_key FROM " << tablePrefix << "books " - << " WHERE book = ? AND sequence = ?" - << " AND quality_key >= (?, ?)" - " ORDER BY quality_key ASC " - " LIMIT ?"; - if (!selectBook_.prepareStatement(query, session_.get())) - continue; - - query.str(""); - query << "SELECT * FROM " << tablePrefix << "books " - << "WHERE book = " - << "0x000000000000000000000000000000000000000000000000" - << " AND sequence = ?"; - if (!completeBook_.prepareStatement(query, session_.get())) - continue; - query.str(""); query << " INSERT INTO " << tablePrefix << "account_tx" << " (account, seq_idx, hash) " diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index f9b326ca..09b97a1a 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -957,7 +957,7 @@ public: CassandraResult result = executeSyncRead(statement); if (!result) { - BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; + BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows"; return {}; } return result.getBytes(); @@ -997,7 +997,7 @@ public: return {{result.getBytes(), result.getBytes(), result.getUInt32()}}; } LedgerPage - fetchLedgerPage( + doFetchLedgerPage( std::optional const& cursor, std::uint32_t ledgerSequence, std::uint32_t limit) const override; @@ -1019,13 +1019,6 @@ public: std::unordered_set const& keys, KeyIndex const& index, bool isAsync = false) const override; - bool - writeBooks( - std::unordered_map< - ripple::uint256, - std::unordered_set> const& books, - BookIndex const& index, - bool isAsync = false) const override; BookOffersPage fetchBookOffers( ripple::uint256 const& book, diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index d6bae3c1..55572ed9 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -324,7 +324,7 @@ PostgresBackend::fetchAllTransactionHashesInLedger( } LedgerPage -PostgresBackend::fetchLedgerPage( +PostgresBackend::doFetchLedgerPage( std::optional const& cursor, std::uint32_t ledgerSequence, std::uint32_t limit) const @@ -338,7 +338,7 @@ PostgresBackend::fetchLedgerPage( sql << "SELECT key FROM keys WHERE ledger_seq = " << std::to_string(index->keyIndex); if (cursor) - sql << " AND key > \'\\x" << ripple::strHex(*cursor) << "\'"; + sql << " AND key >= \'\\x" << ripple::strHex(*cursor) << "\'"; sql << " ORDER BY key ASC LIMIT " << std::to_string(limit); BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str(); auto res = pgQuery(sql.str().data()); @@ -352,7 +352,10 @@ PostgresBackend::fetchLedgerPage( keys.push_back({res.asUInt256(i, 0)}); } if (numRows == limit) + { returnCursor = keys.back(); + ++(*returnCursor); + } auto objs = fetchLedgerObjects(keys, ledgerSequence); std::vector results; @@ -379,155 +382,7 @@ PostgresBackend::fetchBookOffers( std::uint32_t limit, std::optional const& cursor) const { - auto rng = fetchLedgerRange(); - auto limitTuningFactor = 50; - - if (!rng) - return {{}, {}}; - - ripple::uint256 bookBase = - ripple::keylet::quality({ripple::ltDIR_NODE, book}, 0).key; - ripple::uint256 bookEnd = ripple::getQualityNext(bookBase); - - using bookKeyPair = std::pair; - auto getBooks = [this, &bookBase, &bookEnd, &limit, &limitTuningFactor]( - std::uint32_t sequence) - -> std::pair> { - BOOST_LOG_TRIVIAL(info) << __func__ << ": Fetching books between " - << "0x" << ripple::strHex(bookBase) << " and " - << "0x" << ripple::strHex(bookEnd) - << "at ledger " << std::to_string(sequence); - - auto start = std::chrono::system_clock::now(); - - std::stringstream sql; - sql << "SELECT COUNT(*) FROM books WHERE " - << "book = \'\\x" << ripple::strHex(ripple::uint256(beast::zero)) - << "\' AND ledger_seq = " << std::to_string(sequence); - - bool complete; - PgQuery pgQuery(this->pgPool_); - auto res = pgQuery(sql.str().data()); - if (size_t numRows = checkResult(res, 1)) - complete = res.asInt(0, 0) != 0; - else - return {false, {}}; - - sql.str(""); - sql << "SELECT book, offer_key FROM books " - << "WHERE ledger_seq = " << std::to_string(sequence) - << " AND book >= " - << "\'\\x" << ripple::strHex(bookBase) << "\' " - << "AND book < " - << "\'\\x" << ripple::strHex(bookEnd) << "\' " - << "ORDER BY book ASC " - << "LIMIT " << std::to_string(limit * limitTuningFactor); - - BOOST_LOG_TRIVIAL(debug) << sql.str(); - - res = pgQuery(sql.str().data()); - - auto end = std::chrono::system_clock::now(); - auto duration = ((end - start).count()) / 1000000000.0; - - BOOST_LOG_TRIVIAL(info) << "Postgres book key fetch took " - << std::to_string(duration) << " seconds"; - - if (size_t numRows = checkResult(res, 2)) - { - std::vector results(numRows); - for (size_t i = 0; i < numRows; ++i) - { - auto book = res.asUInt256(i, 0); - auto key = res.asUInt256(i, 1); - - results.push_back({std::move(book), std::move(key)}); - } - - return {complete, results}; - } - - return {complete, {}}; - }; - - auto fetchObjects = - [this]( - std::vector const& pairs, - std::uint32_t sequence, - std::uint32_t limit, - std::optional warning) -> BookOffersPage { - std::vector allKeys(pairs.size()); - for (auto const& pair : pairs) - allKeys.push_back(pair.second); - - auto uniqEnd = std::unique(allKeys.begin(), allKeys.end()); - std::vector keys{allKeys.begin(), uniqEnd}; - - auto start = std::chrono::system_clock::now(); - - auto ledgerEntries = fetchLedgerObjects(keys, sequence); - - auto end = std::chrono::system_clock::now(); - auto duration = ((end - start).count()) / 1000000000.0; - - BOOST_LOG_TRIVIAL(info) - << "Postgres book objects fetch took " << std::to_string(duration) - << " seconds. " - << "Fetched " << std::to_string(ledgerEntries.size()) - << " ledger entries"; - - std::vector objects; - for (auto i = 0; i < ledgerEntries.size(); ++i) - { - if (ledgerEntries[i].size() != 0) - objects.push_back(LedgerObject{keys[i], ledgerEntries[i]}); - } - - return {objects, {}, warning}; - }; - - std::uint32_t bookShift = indexer_.getBookShift(); - auto upper = getBookIndexOfSeq(ledgerSequence); - - auto [upperComplete, upperResults] = getBooks(upper->bookIndex); - - BOOST_LOG_TRIVIAL(info) << __func__ << ": Upper results found " - << upperResults.size() << " books."; - - if (upperComplete) - { - BOOST_LOG_TRIVIAL(info) << "Upper book page is complete"; - return fetchObjects(upperResults, ledgerSequence, limit, {}); - } - - BOOST_LOG_TRIVIAL(info) << "Upper book page is not complete " - << "fetching again"; - - auto lower = upper->bookIndex - (1 << bookShift); - if (lower < rng->minSequence) - lower = rng->minSequence; - - auto [lowerComplete, lowerResults] = getBooks(lower); - - BOOST_LOG_TRIVIAL(info) << __func__ << ": Lower results found " - << lowerResults.size() << " books."; - - assert(lowerComplete); - - std::vector pairs; - pairs.reserve(upperResults.size() + lowerResults.size()); - std::merge( - upperResults.begin(), - upperResults.end(), - lowerResults.begin(), - lowerResults.end(), - std::back_inserter(pairs), - [](bookKeyPair pair1, bookKeyPair pair2) -> bool { - return pair1.first < pair2.first; - }); - - std::optional warning = "book data may be incomplete"; - return fetchObjects(pairs, ledgerSequence, limit, warning); + return {}; } std::vector @@ -841,8 +696,6 @@ PostgresBackend::writeKeys( std::stringstream temp; buffer.swap(temp); numRows = 0; - if (isAsync) - std::this_thread::sleep_for(std::chrono::seconds(1)); } } if (isAsync) @@ -854,54 +707,6 @@ PostgresBackend::writeKeys( return true; } bool -PostgresBackend::writeBooks( - std::unordered_map< - ripple::uint256, - std::unordered_set> const& books, - BookIndex const& index, - bool isAsync) const -{ - if (abortWrite_) - return false; - BOOST_LOG_TRIVIAL(debug) << __func__; - - PgQuery pgQuery(pgPool_); - PgQuery& conn = isAsync ? pgQuery : writeConnection_; - std::stringstream asyncBuffer; - std::stringstream& buffer = isAsync ? asyncBuffer : booksBuffer_; - if (isAsync) - conn("BEGIN"); - size_t numRows = 0; - for (auto& book : books) - { - for (auto& offer : book.second) - { - buffer << std::to_string(index.bookIndex) << '\t' << "\\\\x" - << ripple::strHex(book.first) << '\t' << "\\\\x" - << ripple::strHex(offer) << '\n'; - numRows++; - // If the buffer gets too large, the insert fails. Not sure why. - // When writing in the background, we insert after every 10 rows - if ((isAsync && numRows == 1000) || numRows == 100000) - { - conn.bulkInsert("books", buffer.str()); - std::stringstream temp; - buffer.swap(temp); - numRows = 0; - if (isAsync) - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - } - } - if (isAsync) - { - if (numRows > 0) - conn.bulkInsert("books", buffer.str()); - conn("COMMIT"); - } - return true; -} -bool PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const { auto rng = fetchLedgerRangeNoThrow(); diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index c4890ecf..d551f42d 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -16,7 +16,7 @@ private: std::shared_ptr pgPool_; mutable PgQuery writeConnection_; mutable bool abortWrite_ = false; - mutable boost::asio::thread_pool pool_{200}; + mutable boost::asio::thread_pool pool_{16}; uint32_t writeInterval_ = 1000000; public: @@ -46,7 +46,7 @@ public: fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override; LedgerPage - fetchLedgerPage( + doFetchLedgerPage( std::optional const& cursor, std::uint32_t ledgerSequence, std::uint32_t limit) const override; @@ -120,13 +120,6 @@ public: std::unordered_set const& keys, KeyIndex const& index, bool isAsync = false) const override; - bool - writeBooks( - std::unordered_map< - ripple::uint256, - std::unordered_set> const& books, - BookIndex const& index, - bool isAsync = false) const override; }; } // namespace Backend #endif diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 023aaed7..9b093a23 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -375,7 +375,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Populating caches"; - flatMapBackend_->getIndexer().populateCachesAsync(*flatMapBackend_); BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Populated caches"; @@ -541,7 +540,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) << "Extracted and wrote " << *lastPublishedSequence - startSequence << " in " << ((end - begin).count()) / 1000000000.0; writing_ = false; - flatMapBackend_->getIndexer().clearCaches(); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Stopping etl pipeline"; diff --git a/unittests/main.cpp b/unittests/main.cpp new file mode 100644 index 00000000..4ac86e25 --- /dev/null +++ b/unittests/main.cpp @@ -0,0 +1,727 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +// Demonstrate some basic assertions. +TEST(BackendTest, Basic) +{ + boost::log::core::get()->set_filter( + boost::log::trivial::severity >= boost::log::trivial::warning); + std::string keyspace = + "oceand_test_" + + std::to_string( + std::chrono::system_clock::now().time_since_epoch().count()); + boost::json::object cassandraConfig{ + {"database", + {{"type", "cassandra"}, + {"cassandra", + {{"contact_points", "127.0.0.1"}, + {"port", 9042}, + {"keyspace", keyspace.c_str()}, + {"replication_factor", 1}, + {"table_prefix", ""}, + {"max_requests_outstanding", 1000}, + {"indexer_key_shift", 2}, + {"threads", 8}}}}}}; + boost::json::object postgresConfig{ + {"database", + {{"type", "postgres"}, + {"postgres", + {{"contact_point", "127.0.0.1"}, + {"username", "postgres"}, + {"database", keyspace.c_str()}, + {"password", "postgres"}, + {"indexer_key_shift", 2}, + {"threads", 8}}}}}}; + std::vector configs = { + cassandraConfig, postgresConfig}; + for (auto& config : configs) + { + std::cout << keyspace << std::endl; + auto backend = Backend::makeBackend(config); + backend->open(false); + + std::string rawHeader = + "03C3141A01633CD656F91B4EBB5EB89B791BD34DBC8A04BB6F407C5335BC54351E" + "DD73" + "3898497E809E04074D14D271E4832D7888754F9230800761563A292FA2315A6DB6" + "FE30" + "CC5909B285080FCD6773CC883F9FE0EE4D439340AC592AADB973ED3CF53E2232B3" + "3EF5" + "7CECAC2816E3122816E31A0A00F8377CD95DFA484CFAE282656A58CE5AA29652EF" + "FD80" + "AC59CD91416E4E13DBBE"; + + auto hexStringToBinaryString = [](auto const& hex) { + auto blob = ripple::strUnHex(hex); + std::string strBlob; + for (auto c : *blob) + { + strBlob += c; + } + return strBlob; + }; + auto binaryStringToUint256 = [](auto const& bin) -> ripple::uint256 { + ripple::uint256 uint; + return uint.fromVoid((void const*)bin.data()); + }; + auto ledgerInfoToBinaryString = [](auto const& info) { + auto blob = ledgerInfoToBlob(info); + std::string strBlob; + for (auto c : blob) + { + strBlob += c; + } + return strBlob; + }; + + std::string rawHeaderBlob = hexStringToBinaryString(rawHeader); + ripple::LedgerInfo lgrInfo = + deserializeHeader(ripple::makeSlice(rawHeaderBlob)); + + backend->startWrites(); + backend->writeLedger(lgrInfo, std::move(rawHeaderBlob), true); + ASSERT_TRUE(backend->finishWrites(lgrInfo.seq)); + { + auto rng = backend->fetchLedgerRange(); + EXPECT_TRUE(rng.has_value()); + EXPECT_EQ(rng->minSequence, rng->maxSequence); + EXPECT_EQ(rng->maxSequence, lgrInfo.seq); + } + { + auto seq = backend->fetchLatestLedgerSequence(); + EXPECT_TRUE(seq.has_value()); + EXPECT_EQ(*seq, lgrInfo.seq); + } + + { + auto retLgr = backend->fetchLedgerBySequence(lgrInfo.seq); + EXPECT_TRUE(retLgr.has_value()); + EXPECT_EQ(retLgr->seq, lgrInfo.seq); + EXPECT_EQ(ledgerInfoToBlob(lgrInfo), ledgerInfoToBlob(*retLgr)); + } + + EXPECT_FALSE( + backend->fetchLedgerBySequence(lgrInfo.seq + 1).has_value()); + auto lgrInfoOld = lgrInfo; + + auto lgrInfoNext = lgrInfo; + lgrInfoNext.seq = lgrInfo.seq + 1; + lgrInfoNext.parentHash = lgrInfo.hash; + lgrInfoNext.hash++; + lgrInfoNext.accountHash = ~lgrInfo.accountHash; + { + std::string rawHeaderBlob = ledgerInfoToBinaryString(lgrInfoNext); + + backend->startWrites(); + backend->writeLedger(lgrInfoNext, std::move(rawHeaderBlob)); + ASSERT_TRUE(backend->finishWrites(lgrInfoNext.seq)); + } + { + auto rng = backend->fetchLedgerRange(); + EXPECT_TRUE(rng.has_value()); + EXPECT_EQ(rng->minSequence, lgrInfoOld.seq); + EXPECT_EQ(rng->maxSequence, lgrInfoNext.seq); + } + { + auto seq = backend->fetchLatestLedgerSequence(); + EXPECT_EQ(seq, lgrInfoNext.seq); + } + { + auto retLgr = backend->fetchLedgerBySequence(lgrInfoNext.seq); + EXPECT_TRUE(retLgr.has_value()); + EXPECT_EQ(retLgr->seq, lgrInfoNext.seq); + EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoNext)); + EXPECT_NE(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoOld)); + retLgr = backend->fetchLedgerBySequence(lgrInfoNext.seq - 1); + EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoOld)); + + EXPECT_NE(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoNext)); + retLgr = backend->fetchLedgerBySequence(lgrInfoNext.seq - 2); + EXPECT_FALSE(backend->fetchLedgerBySequence(lgrInfoNext.seq - 2) + .has_value()); + + auto txns = backend->fetchAllTransactionsInLedger(lgrInfoNext.seq); + EXPECT_EQ(txns.size(), 0); + auto hashes = + backend->fetchAllTransactionHashesInLedger(lgrInfoNext.seq); + EXPECT_EQ(hashes.size(), 0); + } + + // the below dummy data is not expected to be consistent. The metadata + // string does represent valid metadata. Don't assume though that the + // transaction or its hash correspond to the metadata, or anything like + // that. These tests are purely binary tests to make sure the same data + // that goes in, comes back out + std::string metaHex = + "201C0000001AF8E411006F560A3E08122A05AC91DEFA87052B0554E4A29B46" + "3A27642EBB060B6052196592EEE72200000000240480FDB52503CE1A863300" + "000000000000003400000000000000005529983CBAED30F547471452921C3C" + "6B9F9685F292F6291000EED0A44413AF18C250101AC09600F4B502C8F7F830" + "F80B616DCB6F3970CB79AB70975A05ED5B66860B9564400000001FE217CB65" + "D54B640B31521B05000000000000000000000000434E5900000000000360E3" + "E0751BD9A566CD03FA6CAFC78118B82BA081142252F328CF91263417762570" + "D67220CCB33B1370E1E1E3110064561AC09600F4B502C8F7F830F80B616DCB" + "6F3970CB79AB70975A05ED33DF783681E8365A05ED33DF783681581AC09600" + "F4B502C8F7F830F80B616DCB6F3970CB79AB70975A05ED33DF783681031100" + "0000000000000000000000434E59000000000004110360E3E0751BD9A566CD" + "03FA6CAFC78118B82BA0E1E1E4110064561AC09600F4B502C8F7F830F80B61" + "6DCB6F3970CB79AB70975A05ED5B66860B95E72200000000365A05ED5B6686" + "0B95581AC09600F4B502C8F7F830F80B616DCB6F3970CB79AB70975A05ED5B" + "66860B95011100000000000000000000000000000000000000000211000000" + "00000000000000000000000000000000000311000000000000000000000000" + "434E59000000000004110360E3E0751BD9A566CD03FA6CAFC78118B82BA0E1" + "E1E311006F5647B05E66DE9F3DF2689E8F4CE6126D3136B6C5E79587F9D24B" + "D71A952B0852BAE8240480FDB950101AC09600F4B502C8F7F830F80B616DCB" + "6F3970CB79AB70975A05ED33DF78368164400000033C83A95F65D59D9A6291" + "9C2D18000000000000000000000000434E5900000000000360E3E0751BD9A5" + "66CD03FA6CAFC78118B82BA081142252F328CF91263417762570D67220CCB3" + "3B1370E1E1E511006456AEA3074F10FE15DAC592F8A0405C61FB7D4C98F588" + "C2D55C84718FAFBBD2604AE722000000003100000000000000003200000000" + "0000000058AEA3074F10FE15DAC592F8A0405C61FB7D4C98F588C2D55C8471" + "8FAFBBD2604A82142252F328CF91263417762570D67220CCB33B1370E1E1E5" + "1100612503CE1A8755CE935137F8C6C8DEF26B5CD93BE18105CA83F65E1E90" + "CEC546F562D25957DC0856E0311EB450B6177F969B94DBDDA83E99B7A0576A" + "CD9079573876F16C0C004F06E6240480FDB9624000000005FF0E2BE1E72200" + "000000240480FDBA2D00000005624000000005FF0E1F81142252F328CF9126" + "3417762570D67220CCB33B1370E1E1F1031000"; + std::string txnHex = + "1200072200000000240480FDB920190480FDB5201B03CE1A8964400000033C" + "83A95F65D59D9A62919C2D18000000000000000000000000434E5900000000" + "000360E3E0751BD9A566CD03FA6CAFC78118B82BA068400000000000000C73" + "21022D40673B44C82DEE1DDB8B9BB53DCCE4F97B27404DB850F068DD91D685" + "E337EA7446304402202EA6B702B48B39F2197112382838F92D4C02948E9911" + "FE6B2DEBCF9183A426BC022005DAC06CD4517E86C2548A80996019F3AC60A0" + "9EED153BF60C992930D68F09F981142252F328CF91263417762570D67220CC" + "B33B1370"; + std::string hashHex = + "0A81FB3D6324C2DCF73131505C6E4DC67981D7FC39F5E9574CEC4B1F22D28BF7"; + + // this account is not related to the above transaction and metadata + std::string accountHex = + "1100612200000000240480FDBC2503CE1A872D0000000555516931B2AD018EFFBE" + "17C5" + "C9DCCF872F36837C2C6136ACF80F2A24079CF81FD0624000000005FF0E07811422" + "52F3" + "28CF91263417762570D67220CCB33B1370"; + std::string accountIndexHex = + "E0311EB450B6177F969B94DBDDA83E99B7A0576ACD9079573876F16C0C004F06"; + + std::string metaBlob = hexStringToBinaryString(metaHex); + std::string txnBlob = hexStringToBinaryString(txnHex); + std::string hashBlob = hexStringToBinaryString(hashHex); + std::string accountBlob = hexStringToBinaryString(accountHex); + std::string accountIndexBlob = hexStringToBinaryString(accountIndexHex); + std::vector affectedAccounts; + + { + backend->startWrites(); + lgrInfoNext.seq = lgrInfoNext.seq + 1; + lgrInfoNext.txHash = ~lgrInfo.txHash; + lgrInfoNext.accountHash = + lgrInfoNext.accountHash ^ lgrInfoNext.txHash; + lgrInfoNext.parentHash = lgrInfoNext.hash; + lgrInfoNext.hash++; + + ripple::uint256 hash256; + EXPECT_TRUE(hash256.parseHex(hashHex)); + ripple::TxMeta txMeta{hash256, lgrInfoNext.seq, metaBlob}; + auto journal = ripple::debugLog(); + auto accountsSet = txMeta.getAffectedAccounts(journal); + for (auto& a : accountsSet) + { + affectedAccounts.push_back(a); + } + + std::vector accountTxData; + accountTxData.emplace_back(txMeta, hash256, journal); + backend->writeLedger( + lgrInfoNext, std::move(ledgerInfoToBinaryString(lgrInfoNext))); + backend->writeTransaction( + std::move(std::string{hashBlob}), + lgrInfoNext.seq, + std::move(std::string{txnBlob}), + std::move(std::string{metaBlob})); + backend->writeAccountTransactions(std::move(accountTxData)); + backend->writeLedgerObject( + std::move(std::string{accountIndexBlob}), + lgrInfoNext.seq, + std::move(std::string{accountBlob}), + true, + false, + {}); + + ASSERT_TRUE(backend->finishWrites(lgrInfoNext.seq)); + } + + { + auto rng = backend->fetchLedgerRange(); + EXPECT_TRUE(rng); + EXPECT_EQ(rng->minSequence, lgrInfoOld.seq); + EXPECT_EQ(rng->maxSequence, lgrInfoNext.seq); + auto retLgr = backend->fetchLedgerBySequence(lgrInfoNext.seq); + EXPECT_TRUE(retLgr); + EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoNext)); + auto txns = backend->fetchAllTransactionsInLedger(lgrInfoNext.seq); + EXPECT_EQ(txns.size(), 1); + EXPECT_STREQ( + (const char*)txns[0].transaction.data(), + (const char*)txnBlob.data()); + EXPECT_STREQ( + (const char*)txns[0].metadata.data(), + (const char*)metaBlob.data()); + auto hashes = + backend->fetchAllTransactionHashesInLedger(lgrInfoNext.seq); + EXPECT_EQ(hashes.size(), 1); + EXPECT_EQ(ripple::strHex(hashes[0]), hashHex); + for (auto& a : affectedAccounts) + { + auto accountTxns = backend->fetchAccountTransactions(a, 100); + EXPECT_EQ(accountTxns.first.size(), 1); + EXPECT_EQ(accountTxns.first[0], txns[0]); + EXPECT_FALSE(accountTxns.second); + } + + ripple::uint256 key256; + EXPECT_TRUE(key256.parseHex(accountIndexHex)); + auto obj = backend->fetchLedgerObject(key256, lgrInfoNext.seq); + EXPECT_TRUE(obj); + EXPECT_STREQ( + (const char*)obj->data(), (const char*)accountBlob.data()); + obj = backend->fetchLedgerObject(key256, lgrInfoNext.seq + 1); + EXPECT_TRUE(obj); + EXPECT_STREQ( + (const char*)obj->data(), (const char*)accountBlob.data()); + obj = backend->fetchLedgerObject(key256, lgrInfoOld.seq - 1); + EXPECT_FALSE(obj); + } + // obtain a time-based seed: + unsigned seed = + std::chrono::system_clock::now().time_since_epoch().count(); + std::string accountBlobOld = accountBlob; + { + backend->startWrites(); + lgrInfoNext.seq = lgrInfoNext.seq + 1; + lgrInfoNext.parentHash = lgrInfoNext.hash; + lgrInfoNext.hash++; + lgrInfoNext.txHash = lgrInfoNext.txHash ^ lgrInfoNext.accountHash; + lgrInfoNext.accountHash = + ~(lgrInfoNext.accountHash ^ lgrInfoNext.txHash); + + backend->writeLedger( + lgrInfoNext, std::move(ledgerInfoToBinaryString(lgrInfoNext))); + std::shuffle( + accountBlob.begin(), + accountBlob.end(), + std::default_random_engine(seed)); + backend->writeLedgerObject( + std::move(std::string{accountIndexBlob}), + lgrInfoNext.seq, + std::move(std::string{accountBlob}), + true, + false, + {}); + + ASSERT_TRUE(backend->finishWrites(lgrInfoNext.seq)); + } + { + auto rng = backend->fetchLedgerRange(); + EXPECT_TRUE(rng); + EXPECT_EQ(rng->minSequence, lgrInfoOld.seq); + EXPECT_EQ(rng->maxSequence, lgrInfoNext.seq); + auto retLgr = backend->fetchLedgerBySequence(lgrInfoNext.seq); + EXPECT_TRUE(retLgr); + EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoNext)); + auto txns = backend->fetchAllTransactionsInLedger(lgrInfoNext.seq); + EXPECT_EQ(txns.size(), 0); + + ripple::uint256 key256; + EXPECT_TRUE(key256.parseHex(accountIndexHex)); + auto obj = backend->fetchLedgerObject(key256, lgrInfoNext.seq); + EXPECT_TRUE(obj); + EXPECT_STREQ( + (const char*)obj->data(), (const char*)accountBlob.data()); + obj = backend->fetchLedgerObject(key256, lgrInfoNext.seq + 1); + EXPECT_TRUE(obj); + EXPECT_STREQ( + (const char*)obj->data(), (const char*)accountBlob.data()); + obj = backend->fetchLedgerObject(key256, lgrInfoNext.seq - 1); + EXPECT_TRUE(obj); + EXPECT_STREQ( + (const char*)obj->data(), (const char*)accountBlobOld.data()); + obj = backend->fetchLedgerObject(key256, lgrInfoOld.seq - 1); + EXPECT_FALSE(obj); + } + + auto generateObjects = [seed]( + size_t numObjects, uint32_t ledgerSequence) { + std::vector> res{numObjects}; + ripple::uint256 key; + key = ledgerSequence * 100000; + + for (auto& blob : res) + { + ++key; + std::string keyStr{(const char*)key.data(), key.size()}; + blob.first = keyStr; + blob.second = std::to_string(ledgerSequence) + keyStr; + } + return res; + }; + auto updateObjects = [](uint32_t ledgerSequence, auto objs) { + for (auto& [key, obj] : objs) + { + obj = std::to_string(ledgerSequence) + obj; + } + return objs; + }; + auto generateTxns = [seed](size_t numTxns, uint32_t ledgerSequence) { + std::vector> res{ + numTxns}; + ripple::uint256 base; + base = ledgerSequence * 100000; + for (auto& blob : res) + { + ++base; + std::string hashStr{(const char*)base.data(), base.size()}; + std::string txnStr = + "tx" + std::to_string(ledgerSequence) + hashStr; + std::string metaStr = + "meta" + std::to_string(ledgerSequence) + hashStr; + blob = std::make_tuple(hashStr, txnStr, metaStr); + } + return res; + }; + auto generateAccounts = [](uint32_t ledgerSequence, + uint32_t numAccounts) { + std::vector accounts; + ripple::AccountID base; + base = ledgerSequence * 998765; + for (size_t i = 0; i < numAccounts; ++i) + { + ++base; + accounts.push_back(base); + } + return accounts; + }; + auto generateAccountTx = [&](uint32_t ledgerSequence, auto txns) { + std::vector ret; + auto accounts = generateAccounts(ledgerSequence, 10); + std::srand(std::time(nullptr)); + uint32_t idx = 0; + for (auto& [hash, txn, meta] : txns) + { + AccountTransactionsData data; + data.ledgerSequence = ledgerSequence; + data.transactionIndex = idx; + data.txHash = hash; + for (size_t i = 0; i < 3; ++i) + { + data.accounts.insert( + accounts[std::rand() % accounts.size()]); + } + ++idx; + ret.push_back(data); + } + return ret; + }; + + auto generateNextLedger = [seed](auto lgrInfo) { + ++lgrInfo.seq; + lgrInfo.parentHash = lgrInfo.hash; + std::srand(std::time(nullptr)); + std::shuffle( + lgrInfo.txHash.begin(), + lgrInfo.txHash.end(), + std::default_random_engine(seed)); + std::shuffle( + lgrInfo.accountHash.begin(), + lgrInfo.accountHash.end(), + std::default_random_engine(seed)); + std::shuffle( + lgrInfo.hash.begin(), + lgrInfo.hash.end(), + std::default_random_engine(seed)); + return lgrInfo; + }; + auto writeLedger = + [&](auto lgrInfo, auto txns, auto objs, auto accountTx) { + std::cout << "writing ledger = " << std::to_string(lgrInfo.seq); + backend->startWrites(); + + backend->writeLedger( + lgrInfo, std::move(ledgerInfoToBinaryString(lgrInfo))); + for (auto [hash, txn, meta] : txns) + { + backend->writeTransaction( + std::move(hash), + lgrInfo.seq, + std::move(txn), + std::move(meta)); + } + for (auto [key, obj] : objs) + { + std::optional bookDir; + if (isOffer(obj.data())) + bookDir = getBook(obj); + backend->writeLedgerObject( + std::move(key), + lgrInfo.seq, + std::move(obj), + true, + false, + std::move(bookDir)); + } + backend->writeAccountTransactions(std::move(accountTx)); + + ASSERT_TRUE(backend->finishWrites(lgrInfo.seq)); + }; + + auto checkLedger = [&](auto lgrInfo, + auto txns, + auto objs, + auto accountTx) { + auto rng = backend->fetchLedgerRange(); + auto seq = lgrInfo.seq; + EXPECT_TRUE(rng); + EXPECT_EQ(rng->minSequence, lgrInfoOld.seq); + EXPECT_GE(rng->maxSequence, seq); + auto retLgr = backend->fetchLedgerBySequence(seq); + EXPECT_TRUE(retLgr); + EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfo)); + // retLgr = backend->fetchLedgerByHash(lgrInfo.hash); + // EXPECT_TRUE(retLgr); + // EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfo)); + auto retTxns = backend->fetchAllTransactionsInLedger(seq); + for (auto [hash, txn, meta] : txns) + { + bool found = false; + for (auto [retTxn, retMeta, retSeq] : retTxns) + { + if (std::strncmp( + (const char*)retTxn.data(), + (const char*)txn.data(), + txn.size()) == 0 && + std::strncmp( + (const char*)retMeta.data(), + (const char*)meta.data(), + meta.size()) == 0) + found = true; + } + ASSERT_TRUE(found); + } + for (auto [account, data] : accountTx) + { + std::vector retData; + std::optional cursor; + do + { + uint32_t limit = 10; + auto res = backend->fetchAccountTransactions( + account, limit, cursor); + if (res.second) + EXPECT_EQ(res.first.size(), limit); + retData.insert( + retData.end(), res.first.begin(), res.first.end()); + cursor = res.second; + } while (cursor); + EXPECT_EQ(retData.size(), data.size()); + for (size_t i = 0; i < retData.size(); ++i) + { + auto [txn, meta, seq] = retData[i]; + auto [hash, expTxn, expMeta] = data[i]; + EXPECT_STREQ( + (const char*)txn.data(), (const char*)expTxn.data()); + EXPECT_STREQ( + (const char*)meta.data(), (const char*)expMeta.data()); + } + } + for (auto [key, obj] : objs) + { + auto retObj = + backend->fetchLedgerObject(binaryStringToUint256(key), seq); + if (obj.size()) + { + ASSERT_TRUE(retObj.has_value()); + EXPECT_STREQ( + (const char*)obj.data(), (const char*)retObj->data()); + } + else + { + ASSERT_FALSE(retObj.has_value()); + } + } + Backend::LedgerPage page; + std::vector retObjs; + size_t numLoops = 0; + do + { + uint32_t limit = 10; + page = backend->fetchLedgerPage(page.cursor, seq, limit); + if (page.cursor) + EXPECT_EQ(page.objects.size(), limit); + retObjs.insert( + retObjs.end(), page.objects.begin(), page.objects.end()); + ++numLoops; + ASSERT_FALSE(page.warning.has_value()); + } while (page.cursor); + for (auto obj : objs) + { + bool found = false; + bool correct = false; + for (auto retObj : retObjs) + { + if (ripple::strHex(obj.first) == ripple::strHex(retObj.key)) + { + found = true; + ASSERT_EQ( + ripple::strHex(obj.second), + ripple::strHex(retObj.blob)); + } + } + ASSERT_EQ(found, obj.second.size() != 0); + } + }; + + std::map>> + state; + std::map< + uint32_t, + std::vector>> + allTxns; + std::unordered_map> + allTxnsMap; + std:: + map>> + allAccountTx; + std::map lgrInfos; + for (size_t i = 0; i < 10; ++i) + { + lgrInfoNext = generateNextLedger(lgrInfoNext); + auto objs = generateObjects(25, lgrInfoNext.seq); + auto txns = generateTxns(10, lgrInfoNext.seq); + auto accountTx = generateAccountTx(lgrInfoNext.seq, txns); + for (auto rec : accountTx) + { + for (auto account : rec.accounts) + { + allAccountTx[lgrInfoNext.seq][account].push_back( + std::string{ + (const char*)rec.txHash.data(), rec.txHash.size()}); + } + } + EXPECT_EQ(objs.size(), 25); + EXPECT_NE(objs[0], objs[1]); + EXPECT_EQ(txns.size(), 10); + EXPECT_NE(txns[0], txns[1]); + writeLedger(lgrInfoNext, txns, objs, accountTx); + state[lgrInfoNext.seq] = objs; + allTxns[lgrInfoNext.seq] = txns; + lgrInfos[lgrInfoNext.seq] = lgrInfoNext; + for (auto& [hash, txn, meta] : txns) + { + allTxnsMap[hash] = std::make_pair(txn, meta); + } + } + + std::vector> objs; + for (size_t i = 0; i < 10; ++i) + { + lgrInfoNext = generateNextLedger(lgrInfoNext); + if (!objs.size()) + objs = generateObjects(25, lgrInfoNext.seq); + else + objs = updateObjects(lgrInfoNext.seq, objs); + auto txns = generateTxns(10, lgrInfoNext.seq); + auto accountTx = generateAccountTx(lgrInfoNext.seq, txns); + for (auto rec : accountTx) + { + for (auto account : rec.accounts) + { + allAccountTx[lgrInfoNext.seq][account].push_back( + std::string{ + (const char*)rec.txHash.data(), rec.txHash.size()}); + } + } + EXPECT_EQ(objs.size(), 25); + EXPECT_NE(objs[0], objs[1]); + EXPECT_EQ(txns.size(), 10); + EXPECT_NE(txns[0], txns[1]); + writeLedger(lgrInfoNext, txns, objs, accountTx); + state[lgrInfoNext.seq] = objs; + allTxns[lgrInfoNext.seq] = txns; + lgrInfos[lgrInfoNext.seq] = lgrInfoNext; + for (auto& [hash, txn, meta] : txns) + { + allTxnsMap[hash] = std::make_pair(txn, meta); + } + } + std::cout << "WROTE ALL OBJECTS" << std::endl; + auto flatten = [&](uint32_t max) { + std::vector> flat; + std::map objs; + for (auto [seq, diff] : state) + { + for (auto [k, v] : diff) + { + if (seq > max) + { + if (objs.count(k) == 0) + objs[k] = ""; + } + else + { + objs[k] = v; + } + } + } + for (auto [key, value] : objs) + { + flat.push_back(std::make_pair(key, value)); + } + return flat; + }; + + auto flattenAccountTx = [&](uint32_t max) { + std::unordered_map< + ripple::AccountID, + std::vector>> + accountTx; + for (auto [seq, map] : allAccountTx) + { + if (seq > max) + break; + for (auto& [account, hashes] : map) + { + for (auto& hash : hashes) + { + auto& [txn, meta] = allTxnsMap[hash]; + accountTx[account].push_back( + std::make_tuple(hash, txn, meta)); + } + } + } + for (auto& [account, data] : accountTx) + std::reverse(data.begin(), data.end()); + return accountTx; + }; + + for (auto [seq, diff] : state) + { + std::cout << "flatteneing" << std::endl; + auto flat = flatten(seq); + std::cout << "flattened" << std::endl; + checkLedger( + lgrInfos[seq], allTxns[seq], flat, flattenAccountTx(seq)); + std::cout << "checked" << std::endl; + } + } +} +