diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index c92f131a..4a294ade 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -2,9 +2,11 @@ namespace Backend { BackendIndexer::BackendIndexer(boost::json::object const& config) - : keyShift_(config.at("indexer_key_shift").as_int64()) - , bookShift_(config.at("indexer_book_shift").as_int64()) { + if (config.contains("indexer_key_shift")) + keyShift_ = config.at("indexer_key_shift").as_int64(); + if (config.contains("indexer_book_shift")) + bookShift_ = config.at("indexer_book_shift").as_int64(); work_.emplace(ioc_); ioThread_ = std::thread{[this]() { ioc_.run(); }}; }; @@ -103,8 +105,8 @@ writeKeyFlagLedger( } auto start = std::chrono::system_clock::now(); - backend.writeKeys(keys, nextFlag); - backend.writeKeys({zero}, nextFlag); + backend.writeKeys(keys, nextFlag, true); + backend.writeKeys({zero}, nextFlag, true); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) << __func__ @@ -154,8 +156,8 @@ writeBookFlagLedger( } } auto start = std::chrono::system_clock::now(); - backend.writeBooks(books, nextFlag); - backend.writeBooks({{zero, {zero}}}, nextFlag); + backend.writeBooks(books, nextFlag, true); + backend.writeBooks({{zero, {zero}}}, nextFlag, true); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index e73ed390..0cc7c3ea 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -381,13 +381,15 @@ public: virtual bool writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const = 0; + uint32_t ledgerSequence, + bool isAsync = false) const = 0; virtual bool writeBooks( std::unordered_map< ripple::uint256, std::unordered_set> const& books, - uint32_t ledgerSequence) const = 0; + uint32_t ledgerSequence, + bool isAsync = false) const = 0; virtual ~BackendInterface() { diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 98e0a188..ea6bb082 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -418,7 +418,7 @@ CassandraBackend::fetchLedgerPage( ripple::uint256 zero; statement.bindBytes(zero); } - statement.bindUInt(limit); + statement.bindUInt(limit + 1); CassandraResult result = executeSyncRead(statement); if (!!result) { @@ -430,11 +430,14 @@ CassandraBackend::fetchLedgerPage( { keys.push_back(result.getUInt256()); } while (result.nextRow()); + if (keys.size() && keys.size() == limit) + { + page.cursor = keys.back(); + keys.pop_back(); + } auto objects = fetchLedgerObjects(keys, ledgerSequence); if (objects.size() != keys.size()) throw std::runtime_error("Mismatch in size of objects and keys"); - if (keys.size() == limit) - page.cursor = keys[keys.size() - 1]; if (cursor) BOOST_LOG_TRIVIAL(trace) @@ -449,11 +452,13 @@ CassandraBackend::fetchLedgerPage( page.objects.push_back({std::move(key), std::move(obj)}); } } - if (!keys.size() || (!cursor && !keys[0].isZero())) + if (!cursor && (!keys.size() || !keys[0].isZero())) page.warning = "Data may be incomplete"; return page; } - return {{}, {}, "Data may be incomplete"}; + if (!cursor) + return {{}, {}, "Data may be incomplete"}; + return {}; } std::vector CassandraBackend::fetchLedgerObjects( @@ -496,19 +501,36 @@ CassandraBackend::fetchBookOffers( std::uint32_t limit, std::optional const& cursor) const { - CassandraStatement statement{selectBook_}; - statement.bindBytes(book); auto index = getBookIndexOfSeq(sequence); if (!index) return {}; BOOST_LOG_TRIVIAL(info) << __func__ << " index = " << std::to_string(*index) << " book = " << ripple::strHex(book); + BookOffersPage page; + ripple::uint256 zero = {}; + { + CassandraStatement statement{selectBook_}; + statement.bindBytes(zero); + statement.bindInt(*index); + statement.bindBytes(zero); + statement.bindUInt(1); + CassandraResult result = executeSyncRead(statement); + if (!result) + page.warning = "Data may be incomplete"; + else + { + auto key = result.getUInt256(); + if (!key.isZero()) + page.warning = "Data may be incomplete"; + } + } + CassandraStatement statement{selectBook_}; + statement.bindBytes(book); statement.bindInt(*index); if (cursor) statement.bindBytes(*cursor); else { - ripple::uint256 zero = {}; statement.bindBytes(zero); } statement.bindUInt(limit); @@ -517,11 +539,16 @@ CassandraBackend::fetchBookOffers( BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; std::vector keys; if (!result) - return {{}, {}, "Data may be incomplete"}; + return page; do { keys.push_back(result.getUInt256()); } while (result.nextRow()); + if (keys.size() && keys.size() == limit) + { + page.cursor = keys.back(); + keys.pop_back(); + } BOOST_LOG_TRIVIAL(debug) << __func__ << " - populated keys. num keys = " << keys.size(); @@ -532,20 +559,10 @@ CassandraBackend::fetchBookOffers( for (size_t i = 0; i < objs.size(); ++i) { if (objs[i].size() != 0) - results.push_back({keys[i], objs[i]}); + page.offers.push_back({keys[i], objs[i]}); } - std::optional warning; - if (!cursor && !keys[0].isZero()) - warning = "Data may be incomplete"; - if (keys.size() == limit) - return {results, keys[keys.size() - 1], warning}; - else - return {results, {}, warning}; } - else if (!cursor) - return {{}, {}, "Data may be incomplete"}; - - return {}; + return page; } struct WriteBookCallbackData { @@ -704,7 +721,8 @@ writeKeyCallback(CassFuture* fut, void* cbData) bool CassandraBackend::writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const + uint32_t ledgerSequence, + bool isAsync) const { BOOST_LOG_TRIVIAL(info) << __func__ << " Ledger = " << std::to_string(ledgerSequence) @@ -716,7 +734,8 @@ CassandraBackend::writeKeys( std::mutex mtx; std::vector> cbs; cbs.reserve(keys.size()); - uint32_t concurrentLimit = indexerMaxRequestsOutstanding; + uint32_t concurrentLimit = + isAsync ? indexerMaxRequestsOutstanding : keys.size(); uint32_t numSubmitted = 0; for (auto& key : keys) { @@ -755,7 +774,8 @@ CassandraBackend::writeBooks( std::unordered_map< ripple::uint256, std::unordered_set> const& books, - uint32_t ledgerSequence) const + uint32_t ledgerSequence, + bool isAsync) const { BOOST_LOG_TRIVIAL(info) << __func__ << " Ledger = " << std::to_string(ledgerSequence) @@ -763,7 +783,8 @@ CassandraBackend::writeBooks( std::condition_variable cv; std::mutex mtx; std::vector> cbs; - uint32_t concurrentLimit = indexerMaxRequestsOutstanding; + uint32_t concurrentLimit = + isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding; std::atomic_uint32_t numOutstanding = 0; size_t count = 0; auto start = std::chrono::system_clock::now(); @@ -1507,7 +1528,7 @@ CassandraBackend::open(bool readOnly) query = {}; query << "SELECT key FROM " << tablePrefix << "books2 " << " WHERE book = ? AND sequence = ? AND " - " key > ? " + " key >= ? " " ORDER BY key ASC LIMIT ?"; if (!selectBook_.prepareStatement(query, session_.get())) continue; diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 7913fe6e..6beb6860 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -985,13 +985,15 @@ public: bool writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const; + uint32_t ledgerSequence, + bool isAsync = false) const; bool writeBooks( std::unordered_map< ripple::uint256, std::unordered_set> const& books, - uint32_t ledgerSequence) const override; + uint32_t ledgerSequence, + bool isAsync = false) const override; BookOffersPage fetchBookOffers( ripple::uint256 const& book, diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index af3d9c18..e3c6d03f 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -337,7 +337,7 @@ PostgresBackend::fetchLedgerPage( std::stringstream sql; sql << "SELECT key FROM keys WHERE ledger_seq = " << std::to_string(*index); if (cursor) - sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'"; + sql << " AND key > \'\\x" << ripple::strHex(*cursor) << "\'"; sql << " ORDER BY key ASC LIMIT " << std::to_string(limit); BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str(); auto res = pgQuery(sql.str().data()); @@ -378,14 +378,37 @@ PostgresBackend::fetchBookOffers( std::uint32_t limit, std::optional const& cursor) const { + auto index = getBookIndexOfSeq(ledgerSequence); + if (!index) + return {}; PgQuery pgQuery(pgPool_); + ripple::uint256 zero = {}; + std::optional warning; + { + std::stringstream sql; + sql << "SELECT offer_key FROM books WHERE book = " + << "\'\\x" << ripple::strHex(zero) + << "\' AND ledger_seq = " << std::to_string(*index); + auto res = pgQuery(sql.str().data()); + sql << " ORDER BY offer_key ASC" + << " LIMIT " << std::to_string(limit); + if (size_t numRows = checkResult(res, 1)) + { + auto key = res.asUInt256(0, 0); + if (!key.isZero()) + warning = "Data may be incomplete"; + } + else + warning = "Data may be incomplete"; + } + std::stringstream sql; sql << "SELECT offer_key FROM books WHERE book = " << "\'\\x" << ripple::strHex(book) - << "\' AND ledger_seq = " << std::to_string(ledgerSequence); + << "\' AND ledger_seq = " << std::to_string(*index); if (cursor) - sql << " AND offer_key < \'\\x" << ripple::strHex(*cursor) << "\'"; - sql << " ORDER BY offer_key DESC, ledger_seq DESC" + sql << " AND offer_key > \'\\x" << ripple::strHex(*cursor) << "\'"; + sql << " ORDER BY offer_key ASC" << " LIMIT " << std::to_string(limit); BOOST_LOG_TRIVIAL(debug) << sql.str(); auto res = pgQuery(sql.str().data()); @@ -396,9 +419,6 @@ PostgresBackend::fetchBookOffers( { keys.push_back(res.asUInt256(i, 0)); } - std::optional warning; - if (keys[0].isZero()) - warning = "Data may be incomplete"; std::vector blobs = fetchLedgerObjects(keys, ledgerSequence); std::vector results; @@ -421,7 +441,7 @@ PostgresBackend::fetchBookOffers( else return {results, {}, warning}; } - return {{}, {}}; + return {{}, {}, warning}; } std::vector @@ -697,7 +717,8 @@ PostgresBackend::doFinishWrites() const bool PostgresBackend::writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const + uint32_t ledgerSequence, + bool isAsync) const { BOOST_LOG_TRIVIAL(debug) << __func__; PgQuery pgQuery(pgPool_); @@ -731,7 +752,8 @@ PostgresBackend::writeBooks( std::unordered_map< ripple::uint256, std::unordered_set> const& books, - uint32_t ledgerSequence) const + uint32_t ledgerSequence, + bool isAsync) const { BOOST_LOG_TRIVIAL(debug) << __func__; PgQuery pgQuery(pgPool_); diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index 84835417..a50d37c6 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -117,13 +117,15 @@ public: bool writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const override; + uint32_t ledgerSequence, + bool isAsync = false) const override; bool writeBooks( std::unordered_map< ripple::uint256, std::unordered_set> const& books, - uint32_t ledgerSequence) const override; + uint32_t ledgerSequence, + bool isAsync = false) const override; }; } // namespace Backend #endif