From 2eec383b358e75db0dc32d234d9523b7ff912e21 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 2 Jul 2021 15:05:46 +0000 Subject: [PATCH] cleanup --- src/backend/BackendInterface.cpp | 3 +- src/backend/CassandraBackend.cpp | 52 +++++++++++++++++++++++++++----- src/backend/CassandraBackend.h | 36 ++-------------------- 3 files changed, 49 insertions(+), 42 deletions(-) diff --git a/src/backend/BackendInterface.cpp b/src/backend/BackendInterface.cpp index 29cd2610..b1a26ff1 100644 --- a/src/backend/BackendInterface.cpp +++ b/src/backend/BackendInterface.cpp @@ -205,6 +205,7 @@ BackendInterface::fetchLedgerPage( uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4)); LedgerPage page; page.cursor = cursor; + int numCalls = 0; do { adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2; @@ -223,7 +224,7 @@ BackendInterface::fetchLedgerPage( page.objects.insert( page.objects.end(), partial.objects.begin(), partial.objects.end()); page.cursor = partial.cursor; - } while (page.objects.size() < limit && page.cursor); + } while (page.objects.size() < limit && page.cursor && ++numCalls < 10); if (incomplete) { auto rng = fetchLedgerRange(); diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index d41085b3..dd794d6a 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -301,13 +301,52 @@ CassandraBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence); return fetchTransactions(hashes); } +std::vector +CassandraBackend::fetchTransactions( + std::vector const& hashes) const +{ + std::size_t const numHashes = hashes.size(); + std::atomic_uint32_t numFinished = 0; + std::condition_variable cv; + std::mutex mtx; + std::vector results{numHashes}; + std::vector> cbs; + cbs.reserve(numHashes); + auto start = std::chrono::system_clock::now(); + for (std::size_t i = 0; i < hashes.size(); ++i) + { + cbs.push_back(std::make_shared( + *this, hashes[i], results[i], cv, numFinished, numHashes)); + read(*cbs[i]); + } + assert(results.size() == cbs.size()); + + std::unique_lock lck(mtx); + cv.wait( + lck, [&numFinished, &numHashes]() { return numFinished == numHashes; }); + auto end = std::chrono::system_clock::now(); + for (auto const& res : results) + { + if (res.transaction.size() == 1 && res.transaction[0] == 0) + throw DatabaseTimeout(); + } + + BOOST_LOG_TRIVIAL(debug) + << "Fetched " << numHashes << " transactions from Cassandra in " + << std::chrono::duration_cast(end - start) + .count() + << " milliseconds"; + return results; +} std::vector CassandraBackend::fetchAllTransactionHashesInLedger( uint32_t ledgerSequence) const { CassandraStatement statement{selectAllTransactionHashesInLedger_}; statement.bindInt(ledgerSequence); + auto start = std::chrono::system_clock::now(); CassandraResult result = executeSyncRead(statement); + auto end = std::chrono::system_clock::now(); if (!result) { BOOST_LOG_TRIVIAL(error) @@ -320,6 +359,12 @@ CassandraBackend::fetchAllTransactionHashesInLedger( { hashes.push_back(result.getUInt256()); } while (result.nextRow()); + BOOST_LOG_TRIVIAL(debug) + << "Fetched " << hashes.size() + << " transaction hashes from Cassandra in " + << std::chrono::duration_cast(end - start) + .count() + << " milliseconds"; return hashes; } @@ -1568,13 +1613,6 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); - query << "SELECT transaction, metadata, ledger_sequence FROM " - << tablePrefix << "transactions" - << " WHERE ledger_sequence = ?"; - if (!selectAllTransactionsInLedger_.prepareStatement( - query, session_.get())) - continue; - query.str(""); query << "SELECT hash FROM " << tablePrefix << "ledger_transactions" << " WHERE ledger_sequence = ?"; if (!selectAllTransactionHashesInLedger_.prepareStatement( diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index f78f1b88..a7a7b9b2 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -635,7 +635,6 @@ private: CassandraPreparedStatement insertTransaction_; CassandraPreparedStatement insertLedgerTransaction_; CassandraPreparedStatement selectTransaction_; - CassandraPreparedStatement selectAllTransactionsInLedger_; CassandraPreparedStatement selectAllTransactionHashesInLedger_; CassandraPreparedStatement selectObject_; CassandraPreparedStatement selectLedgerPageKeys_; @@ -1059,39 +1058,8 @@ public: }; std::vector - fetchTransactions(std::vector const& hashes) const override - { - std::size_t const numHashes = hashes.size(); - BOOST_LOG_TRIVIAL(debug) - << "Fetching " << numHashes << " transactions from Cassandra"; - std::atomic_uint32_t numFinished = 0; - std::condition_variable cv; - std::mutex mtx; - std::vector results{numHashes}; - std::vector> cbs; - cbs.reserve(numHashes); - for (std::size_t i = 0; i < hashes.size(); ++i) - { - cbs.push_back(std::make_shared( - *this, hashes[i], results[i], cv, numFinished, numHashes)); - read(*cbs[i]); - } - assert(results.size() == cbs.size()); - - std::unique_lock lck(mtx); - cv.wait(lck, [&numFinished, &numHashes]() { - return numFinished == numHashes; - }); - for (auto const& res : results) - { - if (res.transaction.size() == 1 && res.transaction[0] == 0) - throw DatabaseTimeout(); - } - - BOOST_LOG_TRIVIAL(debug) - << "Fetched " << numHashes << " transactions from Cassandra"; - return results; - } + fetchTransactions( + std::vector const& hashes) const override; void read(ReadCallbackData& data) const