diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 1c21da28..c2982b1d 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -357,8 +357,10 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const CassandraStatement statement{selectLedgerDiff_}; statement.bindInt(ledgerSequence); + auto start = std::chrono::system_clock::now(); CassandraResult result = executeSyncRead(statement); + auto mid = std::chrono::system_clock::now(); if (!result) return {}; std::vector objects; @@ -366,6 +368,10 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const { objects.push_back({result.getUInt256(), result.getBytes()}); } while (result.nextRow()); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(debug) << __func__ << " Fetched diff. Fetch time = " + << std::to_string((mid - start).count() / 1000000000.0) + << " . total time = " << std::to_string((end-start).count() / 1000000000.0); return objects; } LedgerPage @@ -644,7 +650,7 @@ CassandraBackend::writeKeys( std::unique_lock lck(mtx); cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); */ -} + } bool CassandraBackend::writeBooks( @@ -658,22 +664,27 @@ CassandraBackend::writeBooks( { for (auto& offer : book.second) { - if (sizes.count(offer)) + if (sizes.count(book.first) > 0) sizes[book.first]++; else sizes[book.first] = 1; ++numOffers; } } - BOOST_LOG_TRIVIAL(info) - << __func__ << " Ledger sequence = " << std::to_string(ledgerSequence) - << " . total offers = " << std::to_string(numOffers); + size_t maxSize = 0; for (auto& book : sizes) { - BOOST_LOG_TRIVIAL(info) + if(book.second > maxSize) + maxSize = book.second; + BOOST_LOG_TRIVIAL(debug) << __func__ << " Book = " << ripple::strHex(book.first) << " . num offers = " << book.second; } + BOOST_LOG_TRIVIAL(info) + << __func__ << " Ledger sequence = " << std::to_string(ledgerSequence) + << " . total offers = " << std::to_string(numOffers) + << " . total books = " << std::to_string(books.size()) + << " . max book size = " << std::to_string(maxSize); return true; } @@ -705,6 +716,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const { auto [objects, curCursor] = fetchLedgerPage2(cursor, ledgerSequence, limit); + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; for (auto& obj : objects) { @@ -746,10 +758,13 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const << ". Entire operation took " << (end - start).count() / 1000000000.0; uint32_t prevLedgerSequence = ledgerSequence; + uint32_t prevBooksLedgerSequence = ledgerSequence; uint32_t nextLedgerSequence = - ((prevLedgerSequence >> indexerShift_) << indexerShift_) + - (1 << indexerShift_); - if (nextLedgerSequence = prevLedgerSequence) + ((prevLedgerSequence >> indexerShift_) << indexerShift_); + BOOST_LOG_TRIVIAL(info) << __func__ << " next base = " << std::to_string(nextLedgerSequence); + nextLedgerSequence += (1 << indexerShift_); + BOOST_LOG_TRIVIAL(info) << __func__ << " next = " << std::to_string(nextLedgerSequence); + if (nextLedgerSequence == prevLedgerSequence) { nextLedgerSequence += (1 << indexerShift_); } @@ -765,7 +780,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const nextBooks; size_t nextOffers = 0; start = std::chrono::system_clock::now(); - for (size_t i = ledgerSequence + 1; i < nextLedgerSequence; ++i) + for (size_t i = prevLedgerSequence + 1; i <= nextLedgerSequence; ++i) { // Get the diff and update keys auto objs = fetchLedgerDiff(i); @@ -792,7 +807,6 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const } } } - BOOST_LOG_TRIVIAL(info) << __func__; // For any deleted keys, check if they are offer objects std::vector deletedKeys{ deleted.begin(), deleted.end()}; @@ -818,6 +832,44 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const ++nextOffers; } } + // books are written every 256 ledgers + if(i % 256 == 0) + { + + // Iterate through books from previous flag ledger, copying over any + // that still exist + for (auto& book : books) + { + std::vector offerKeys; + for (auto& offerKey : book.second) + { + offerKeys.push_back(offerKey); + } + + auto offers = fetchLedgerObjects(offerKeys, prevBooksLedgerSequence); + for (size_t i = 0; i < offerKeys.size(); ++i) + { + auto& offer = offers[i]; + // if the offer was deleted prior to prevLedgerSequence, don't + // copy + if (offer.size() != 0) + { + auto book = getBook(offer); + if (nextBooks[book].insert(offerKeys[i]).second) + ++nextOffers; + } + else + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " skipping deleted offer"; + + } + } + } + writeBooks(nextBooks, i); + prevBooksLedgerSequence = i; + books = std::move(nextBooks); + nextBooks = {}; + } } end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) @@ -825,36 +877,12 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const << std::to_string(nextLedgerSequence) << " shift width = " << std::to_string(indexerShift_) << ". num keys = " << keys.size() << " . Took " - << (end - start).count() / 1000000000.0; - // Iterate through books from previous flag ledger, copying over any - // that still exist - for (auto& book : books) - { - std::vector offerKeys; - for (auto& offerKey : book.second) - { - offerKeys.push_back(offerKey); - } - - auto offers = fetchLedgerObjects(offerKeys, prevLedgerSequence); - for (size_t i = 0; i < offerKeys.size(); ++i) - { - auto& offer = offers[i]; - // if the offer was deleted prior to prevLedgerSequence, don't - // copy - if (offer.size() != 0) - { - auto book = getBook(offerKeys[i]); - if (nextBooks[book].insert(offerKeys[i]).second) - ++nextOffers; - } - } - } - writeKeys(keys, ledgerSequence); - writeBooks(books, ledgerSequence); + << (end - start).count() / 1000000000.0 + << " prev ledger = " + << std::to_string(prevLedgerSequence); + writeKeys(keys, nextLedgerSequence); prevLedgerSequence = nextLedgerSequence; nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); - books = nextBooks; } return true; }