diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index 4a294ade..aa9fa3d4 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -124,7 +124,6 @@ writeBookFlagLedger( std::unordered_map< ripple::uint256, std::unordered_set> const& books) - { uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift)); ripple::uint256 zero = {}; @@ -133,33 +132,12 @@ writeBookFlagLedger( << " starting. ledgerSequence = " << std::to_string(ledgerSequence) << " nextFlag = " << std::to_string(nextFlag) << " books.size() = " << std::to_string(books.size()); - while (true) - { - try - { - auto [objects, curCursor, warning] = - backend.fetchBookOffers(zero, nextFlag, 1); - if (!warning) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " flag ledger already written. sequence = " - << std::to_string(ledgerSequence) - << " next flag = " << std::to_string(nextFlag) - << "returning"; - return; - } - break; - } - catch (DatabaseTimeout& t) - { - ; - } - } + auto start = std::chrono::system_clock::now(); backend.writeBooks(books, nextFlag, true); backend.writeBooks({{zero, {zero}}}, nextFlag, true); - auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) << __func__ << " finished. ledgerSequence = " << std::to_string(ledgerSequence) @@ -448,7 +426,7 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) << " starting. sequence = " << std::to_string(ledgerSequence); bool isFirst = false; uint32_t keyIndex = getKeyIndexOfSeq(ledgerSequence); - uint32_t bookIndex = getKeyIndexOfSeq(ledgerSequence); + uint32_t bookIndex = getBookIndexOfSeq(ledgerSequence); auto rng = backend.fetchLedgerRangeNoThrow(); if (!rng || rng->minSequence == ledgerSequence) { @@ -464,6 +442,7 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) backend.writeKeys({zero}, ledgerSequence); writeBookFlagLedgerAsync(ledgerSequence, backend); writeKeyFlagLedgerAsync(ledgerSequence, backend); + } keys = {}; books = {}; @@ -471,5 +450,5 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) << __func__ << " finished. sequence = " << std::to_string(ledgerSequence); -} // namespace Backend +} } // namespace Backend diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index 24dfc642..fc7bfe40 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -47,11 +47,12 @@ #include #include #include +#include static void noticeReceiver(void* arg, PGresult const* res) { - BOOST_LOG_TRIVIAL(info) << "server message: " << PQresultErrorMessage(res); + BOOST_LOG_TRIVIAL(debug) << "server message: " << PQresultErrorMessage(res); } //----------------------------------------------------------------------------- diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index a5547763..addebf4b 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -378,21 +378,11 @@ PostgresBackend::fetchBookOffers( std::uint32_t limit, std::optional const& cursor) const { - auto rng = fetchLedgerRange(); + auto rng = fetchLedgerRange(); if(!rng) return {{},{}}; - uint32_t upper = ledgerSequence; - auto lastPage = rng->maxSequence - (rng->maxSequence % 256); - - if (ledgerSequence != rng->minSequence) - { - upper = (ledgerSequence >> 8) << 8; - if (upper != ledgerSequence) - upper += (1 << 8); - } - ripple::uint256 bookBase = ripple::keylet::quality({ripple::ltDIR_NODE, book}, 0).key; ripple::uint256 bookEnd = ripple::getQualityNext(bookBase); @@ -401,7 +391,27 @@ PostgresBackend::fetchBookOffers( auto getBooks = [this, &bookBase, &bookEnd, &limit](std::uint32_t sequence) -> std::pair> { + BOOST_LOG_TRIVIAL(info) << __func__ << ": Fetching books between " + << "0x" << ripple::strHex(bookBase) << " and " + << "0x" << ripple::strHex(bookEnd) << "at ledger " + << std::to_string(sequence); + + auto start = std::chrono::system_clock::now(); + std::stringstream sql; + sql << "SELECT COUNT(*) FROM books WHERE " + << "book = \'\\x" << ripple::strHex(ripple::uint256(beast::zero)) + << "\' AND ledger_seq = " << std::to_string(sequence); + + bool complete; + PgQuery pgQuery(this->pgPool_); + auto res = pgQuery(sql.str().data()); + if (size_t numRows = checkResult(res, 1)) + complete = res.asInt(0, 0) != 0; + else + return {true, {}}; + + sql.str(""); sql << "SELECT book, offer_key FROM books " << "WHERE ledger_seq = " << std::to_string(sequence) << " AND book >= " @@ -410,26 +420,24 @@ PostgresBackend::fetchBookOffers( << "\'\\x" << ripple::strHex(bookEnd) << "\' " << "ORDER BY book ASC"; - std::cout << "Querying: " << sql.str() << std::endl; - BOOST_LOG_TRIVIAL(debug) << sql.str(); - PgQuery pgQuery(this->pgPool_); - auto res = pgQuery(sql.str().data()); + res = pgQuery(sql.str().data()); + + auto end = std::chrono::system_clock::now(); + auto duration = ((end - start).count()) / 1000000000.0; + + BOOST_LOG_TRIVIAL(info) << "Postgres book key fetch took " + << std::to_string(duration) + << " seconds"; + if (size_t numRows = checkResult(res, 2)) { - bool complete = false; std::vector results(numRows); for (size_t i = 0; i < numRows; ++i) { auto book = res.asUInt256(i, 0); - auto key = res.asUInt256(i, 1); - if (i == 0 && key.isZero()) - { - complete = true; - continue; - } results.push_back({std::move(book), std::move(key)}); } @@ -451,8 +459,20 @@ PostgresBackend::fetchBookOffers( for (auto const& pair : pairs) keys.push_back(pair.second); + auto start = std::chrono::system_clock::now(); + auto ledgerEntries = fetchLedgerObjects(keys, sequence); + auto end = std::chrono::system_clock::now(); + auto duration = ((end - start).count()) / 1000000000.0; + + BOOST_LOG_TRIVIAL(info) << "Postgres book objects fetch took " + << std::to_string(duration) + << " seconds. " + << "Fetched " + << std::to_string(ledgerEntries.size()) + << " ledger entries"; + std::vector objects; for (auto i = 0; i < ledgerEntries.size(); ++i) { @@ -468,14 +488,32 @@ PostgresBackend::fetchBookOffers( return {objects, {}}; }; + std::uint32_t bookShift = indexer_.getBookShift(); + auto upper = indexer_.getBookIndexOfSeq(ledgerSequence); + auto [upperComplete, upperResults] = getBooks(upper); - if (upperComplete) - return fetchObjects(upperResults, ledgerSequence, limit); + BOOST_LOG_TRIVIAL(info) << __func__ << ": Upper results found " + << upperResults.size() << " books."; + + if (upperComplete) + { + BOOST_LOG_TRIVIAL(info) << "Upper book page is complete"; + return fetchObjects(upperResults, ledgerSequence, limit); + } + + BOOST_LOG_TRIVIAL(info) << "Upper book page is not complete " + << "fetching again"; + + auto lower = upper - (1 << bookShift); + if (lower < rng->minSequence) + lower = rng->minSequence; - auto lower = (ledgerSequence >> 8) << 8; auto [lowerComplete, lowerResults] = getBooks(lower); + BOOST_LOG_TRIVIAL(info) << __func__ << ": Lower results found " + << lowerResults.size() << " books."; + assert(lowerComplete); std::vector pairs; @@ -610,7 +648,7 @@ PostgresBackend::fetchLedgerObjects( auto res = pgQuery(sql.str().data()); if (size_t numRows = checkResult(res, 1)) { - results[i] = res.asUnHexedBlob(0, 0); + results[i] = res.asUnHexedBlob(); } if (--numRemaining == 0) { @@ -779,7 +817,7 @@ PostgresBackend::writeKeys( numRows++; // If the buffer gets too large, the insert fails. Not sure why. So we // insert after 1 million records - if (numRows == 1000000) + if (numRows == 100000) { pgQuery.bulkInsert("keys", keysBuffer.str()); std::stringstream temp; @@ -803,6 +841,7 @@ PostgresBackend::writeBooks( bool isAsync) const { BOOST_LOG_TRIVIAL(debug) << __func__; + PgQuery pgQuery(pgPool_); pgQuery("BEGIN"); std::stringstream booksBuffer;