diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 8bb4efeb..9a9a3020 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -369,9 +369,11 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const objects.push_back({result.getUInt256(), result.getBytes()}); } while (result.nextRow()); auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(debug) << __func__ << " Fetched diff. Fetch time = " - << std::to_string((mid - start).count() / 1000000000.0) - << " . total time = " << std::to_string((end-start).count() / 1000000000.0); + BOOST_LOG_TRIVIAL(debug) + << __func__ << " Fetched diff. Fetch time = " + << std::to_string((mid - start).count() / 1000000000.0) + << " . total time = " + << std::to_string((end - start).count() / 1000000000.0); return objects; } LedgerPage @@ -531,6 +533,131 @@ CassandraBackend::fetchLedgerObjects( << "Fetched " << numKeys << " records from Cassandra"; return results; } +std::pair, std::optional> +CassandraBackend::fetchBookOffers( + ripple::uint256 const& book, + uint32_t sequence, + std::uint32_t limit, + std::optional const& cursor) const +{ + CassandraStatement statement{selectBook_}; + statement.bindBytes(book); + uint32_t upper = (sequence >> 8) << 8; + if (upper != sequence) + upper += (1 << 8); + statement.bindInt(upper); + if (cursor) + statement.bindBytes(*cursor); + else + { + ripple::uint256 zero = {}; + statement.bindBytes(zero); + } + statement.bindUInt(limit); + CassandraResult result = executeSyncRead(statement); + + BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; + std::vector keys; + if (!result) + return {{}, {}}; + do + { + keys.push_back(result.getUInt256()); + } while (result.nextRow()); + + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - populated keys. num keys = " << keys.size(); + if (keys.size()) + { + std::vector results; + std::vector objs = fetchLedgerObjects(keys, sequence); + for (size_t i = 0; i < objs.size(); ++i) + { + if (objs[i].size() != 0) + results.push_back({keys[i], objs[i]}); + } + return {results, results[results.size() - 1].key}; + } + + return {{}, {}}; +} +struct WriteBookCallbackData +{ + CassandraBackend const& backend; + ripple::uint256 book; + ripple::uint256 offerKey; + uint32_t ledgerSequence; + std::condition_variable& cv; + std::atomic_uint32_t& numRemaining; + std::mutex& mtx; + uint32_t currentRetries = 0; + WriteBookCallbackData( + CassandraBackend const& backend, + ripple::uint256 const& book, + ripple::uint256 const& offerKey, + uint32_t ledgerSequence, + std::condition_variable& cv, + std::mutex& mtx, + std::atomic_uint32_t& numRemaining) + : backend(backend) + , book(book) + , offerKey(offerKey) + , ledgerSequence(ledgerSequence) + , cv(cv) + , mtx(mtx) + , numRemaining(numRemaining) + + { + } +}; +void +writeBookCallback(CassFuture* fut, void* cbData); +void +writeBook2(WriteBookCallbackData& cb) +{ + CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()}; + statement.bindBytes(cb.book); + statement.bindInt(cb.ledgerSequence); + statement.bindBytes(cb.offerKey); + // Passing isRetry as true bypasses incrementing numOutstanding + cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true); +} +void +writeBookCallback(CassFuture* fut, void* cbData) +{ + WriteBookCallbackData& requestParams = + *static_cast(cbData); + + CassandraBackend const& backend = requestParams.backend; + auto rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert key error: " << rc << ", " + << cass_error_desc(rc) << ", retrying "; + // exponential backoff with a max wait of 2^10 ms (about 1 second) + auto wait = std::chrono::milliseconds( + lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + ++requestParams.currentRetries; + std::shared_ptr timer = + std::make_shared( + backend.getIOContext(), + std::chrono::steady_clock::now() + wait); + timer->async_wait( + [timer, &requestParams](const boost::system::error_code& error) { + writeBook2(requestParams); + }); + } + else + { + BOOST_LOG_TRIVIAL(trace) << __func__ << "Finished a write request"; + { + std::lock_guard lck(requestParams.mtx); + --requestParams.numRemaining; + requestParams.cv.notify_one(); + } + } +} struct WriteKeyCallbackData { @@ -626,19 +753,20 @@ CassandraBackend::writeKeys( cbs.push_back(std::make_shared( *this, key, ledgerSequence, cv, mtx, numRemaining)); writeKey(*cbs.back()); - ++numSubmitted; + ++numSubmitted; BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; std::unique_lock lck(mtx); BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; cv.wait(lck, [&numRemaining, numSubmitted, concurrentLimit, &keys]() { - BOOST_LOG_TRIVIAL(trace) - << std::to_string(numSubmitted) << " " << std::to_string(numRemaining) - << " " << std::to_string(keys.size()) << " " - << std::to_string(concurrentLimit); + BOOST_LOG_TRIVIAL(trace) << std::to_string(numSubmitted) << " " + << std::to_string(numRemaining) << " " + << std::to_string(keys.size()) << " " + << std::to_string(concurrentLimit); // keys.size() - i is number submitted. keys.size() - // numRemaining is number completed Difference is num // outstanding - return (numSubmitted - (keys.size() - numRemaining)) < concurrentLimit; + return (numSubmitted - (keys.size() - numRemaining)) < + concurrentLimit; }); if (numSubmitted % 100000 == 0) BOOST_LOG_TRIVIAL(info) @@ -656,35 +784,61 @@ bool CassandraBackend::writeBooks( std::unordered_map>& books, - uint32_t ledgerSequence) const + uint32_t ledgerSequence, + uint32_t numOffers) const { - std::unordered_map sizes; - size_t numOffers = 0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " Ledger = " << std::to_string(ledgerSequence) + << " . num books = " << std::to_string(books.size()); + std::atomic_uint32_t numRemaining = numOffers; + std::condition_variable cv; + std::mutex mtx; + std::vector> cbs; + uint32_t concurrentLimit = maxRequestsOutstanding / 2; + uint32_t numSubmitted = 0; + auto start = std::chrono::system_clock::now(); for (auto& book : books) { for (auto& offer : book.second) { - if (sizes.count(book.first) > 0) - sizes[book.first]++; - else - sizes[book.first] = 1; - ++numOffers; + cbs.push_back(std::make_shared( + *this, + book.first, + offer, + ledgerSequence, + cv, + mtx, + numRemaining)); + writeBook2(*cbs.back()); + ++numSubmitted; + BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; + std::unique_lock lck(mtx); + BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; + cv.wait( + lck, + [&numRemaining, numSubmitted, concurrentLimit, numOffers]() { + BOOST_LOG_TRIVIAL(trace) + << std::to_string(numSubmitted) << " " + << std::to_string(numRemaining) << " " + << std::to_string(numOffers) << " " + << std::to_string(concurrentLimit); + return (numSubmitted - (numOffers - numRemaining)) < + concurrentLimit; + }); + if (numSubmitted % 1000 == 0) + BOOST_LOG_TRIVIAL(info) + << __func__ << " Submitted " << std::to_string(numSubmitted) + << " write requests. Completed " + << (numOffers - numRemaining); } } - size_t maxSize = 0; - for (auto& book : sizes) - { - if(book.second > maxSize) - maxSize = book.second; - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Book = " << ripple::strHex(book.first) - << " . num offers = " << book.second; - } - BOOST_LOG_TRIVIAL(info) - << __func__ << " Ledger sequence = " << std::to_string(ledgerSequence) - << " . total offers = " << std::to_string(numOffers) - << " . total books = " << std::to_string(books.size()) - << " . max book size = " << std::to_string(maxSize); + BOOST_LOG_TRIVIAL(info) << __func__ + << "Submitted all book writes. Waiting for them to " + "finish. num submitted = " + << std::to_string(numSubmitted); + std::unique_lock lck(mtx); + cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + BOOST_LOG_TRIVIAL(info) << __func__ << "Finished writing books"; return true; } @@ -716,7 +870,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const { auto [objects, curCursor] = fetchLedgerPage2(cursor, ledgerSequence, limit); - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; for (auto& obj : objects) { @@ -749,7 +903,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const << " num books = " << books.size() << " num offers = " << numOffers << " . Took " << (mid - start).count() / 1000000000.0; writeKeys(keys, ledgerSequence); - writeBooks(books, ledgerSequence); + writeBooks(books, ledgerSequence, numOffers); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) << __func__ << "Wrote all keys from ledger " @@ -761,9 +915,11 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const uint32_t prevBooksLedgerSequence = ledgerSequence; uint32_t nextLedgerSequence = ((prevLedgerSequence >> indexerShift_) << indexerShift_); - BOOST_LOG_TRIVIAL(info) << __func__ << " next base = " << std::to_string(nextLedgerSequence); - nextLedgerSequence += (1 << indexerShift_); - BOOST_LOG_TRIVIAL(info) << __func__ << " next = " << std::to_string(nextLedgerSequence); + BOOST_LOG_TRIVIAL(info) + << __func__ << " next base = " << std::to_string(nextLedgerSequence); + nextLedgerSequence += (1 << indexerShift_); + BOOST_LOG_TRIVIAL(info) + << __func__ << " next = " << std::to_string(nextLedgerSequence); if (nextLedgerSequence == prevLedgerSequence) { nextLedgerSequence += (1 << indexerShift_); @@ -832,44 +988,45 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const ++nextOffers; } } - // books are written every 256 ledgers - if(i % 256 == 0) - { + // books are written every 256 ledgers + if (i % 256 == 0) + { + // Iterate through books from previous flag ledger, copying over + // any that still exist + for (auto& book : books) + { + std::vector offerKeys; + for (auto& offerKey : book.second) + { + offerKeys.push_back(offerKey); + } - // Iterate through books from previous flag ledger, copying over any - // that still exist - for (auto& book : books) - { - std::vector offerKeys; - for (auto& offerKey : book.second) - { - offerKeys.push_back(offerKey); - } - - auto offers = fetchLedgerObjects(offerKeys, prevBooksLedgerSequence); - for (size_t i = 0; i < offerKeys.size(); ++i) - { - auto& offer = offers[i]; - // if the offer was deleted prior to prevLedgerSequence, don't - // copy - if (offer.size() != 0) - { - auto book = getBook(offer); - if (nextBooks[book].insert(offerKeys[i]).second) - ++nextOffers; - } - else - { - BOOST_LOG_TRIVIAL(debug) << __func__ << " skipping deleted offer"; - - } - } - } - writeBooks(nextBooks, i); - prevBooksLedgerSequence = i; - books = std::move(nextBooks); - nextBooks = {}; - } + auto offers = + fetchLedgerObjects(offerKeys, prevBooksLedgerSequence); + for (size_t i = 0; i < offerKeys.size(); ++i) + { + auto& offer = offers[i]; + // if the offer was deleted prior to prevLedgerSequence, + // don't copy + if (offer.size() != 0) + { + auto book = getBook(offer); + if (nextBooks[book].insert(offerKeys[i]).second) + ++nextOffers; + } + else + { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " skipping deleted offer"; + } + } + } + writeBooks(nextBooks, i, nextOffers); + prevBooksLedgerSequence = i; + books = std::move(nextBooks); + nextBooks = {}; + nextOffers = 0; + } } end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) @@ -878,8 +1035,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const << " shift width = " << std::to_string(indexerShift_) << ". num keys = " << keys.size() << " . Took " << (end - start).count() / 1000000000.0 - << " prev ledger = " - << std::to_string(prevLedgerSequence); + << " prev ledger = " << std::to_string(prevLedgerSequence); writeKeys(keys, nextLedgerSequence); prevLedgerSequence = nextLedgerSequence; nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); @@ -1364,6 +1520,13 @@ CassandraBackend::open() " ORDER BY key ASC LIMIT ? ALLOW FILTERING"; if (!getBook_.prepareStatement(query, session_.get())) continue; + query = {}; + query << "SELECT key FROM " << tablePrefix << "books2 " + << " WHERE book = ? AND sequence = ? AND " + " key > ? " + " ORDER BY key ASC LIMIT ?"; + if (!selectBook_.prepareStatement(query, session_.get())) + continue; query = {}; query << " INSERT INTO " << tablePrefix << "account_tx" diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 09b64fe1..15f91d08 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -604,6 +604,7 @@ private: CassandraPreparedStatement insertKey_; CassandraPreparedStatement selectKeys_; CassandraPreparedStatement getBook_; + CassandraPreparedStatement selectBook_; CassandraPreparedStatement insertBook_; CassandraPreparedStatement insertBook2_; CassandraPreparedStatement deleteBook_; @@ -694,6 +695,11 @@ public: { return insertKey_; } + CassandraPreparedStatement const& + getInsertBookPreparedStatement() const + { + return insertBook2_; + } std::pair< std::vector, @@ -954,14 +960,21 @@ public: std::unordered_map< ripple::uint256, std::unordered_set>& books, - uint32_t ledgerSequence) const; - + uint32_t ledgerSequence, + uint32_t numOffers) const; std::pair, std::optional> fetchBookOffers( ripple::uint256 const& book, uint32_t sequence, std::uint32_t limit, - std::optional const& cursor) const override + std::optional const& cursor) const override; + + std::pair, std::optional> + fetchBookOffers2( + ripple::uint256 const& book, + uint32_t sequence, + std::uint32_t limit, + std::optional const& cursor) const { CassandraStatement statement{getBook_}; statement.bindBytes(book); diff --git a/test.py b/test.py index 4291275c..eb724841 100755 --- a/test.py +++ b/test.py @@ -547,7 +547,7 @@ def run(args): print(compareAccountTx(res,res2)) elif args.action == "ledger_data": res = asyncio.get_event_loop().run_until_complete( - ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary)) + ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary, args.cursor)) if args.verify: writeLedgerData(res,args.filename) elif args.action == "ledger_data_full":