From 43374e55bd338e40eb962cfadc95a9feb8000b8d Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Thu, 24 Jun 2021 18:29:19 +0000 Subject: [PATCH 01/11] simulate x ledgers at once --- src/backend/CassandraBackend.cpp | 6 ++++-- src/etl/ReportingETL.cpp | 34 +++++++++++++++++++++++++++++--- src/server/HttpBase.h | 2 -- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index b6bf4ef2..d9a7382d 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -1,6 +1,6 @@ -#include #include #include +#include #include /* namespace std { @@ -268,7 +268,9 @@ CassandraBackend::fetchAllTransactionHashesInLedger( CassandraResult result = executeSyncRead(statement); if (!result) { - BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; + BOOST_LOG_TRIVIAL(error) + << __func__ + << " - no rows . ledger = " << std::to_string(ledgerSequence); return {}; } std::vector hashes; diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 87ab9cb4..6828db91 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -26,9 +26,9 @@ #include #include #include -#include #include #include +#include #include #include @@ -245,7 +245,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) auto lgr = backend_->fetchLedgerBySequence(ledgerSequence); assert(lgr); publishLedger(*lgr); - + return true; } } @@ -253,7 +253,6 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) { continue; } - } return false; } @@ -516,6 +515,30 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) beast::setCurrentThreadName("rippled: ReportingETL transform"); uint32_t currentSequence = startSequence; + int counter = 0; + std::atomic_int per = 1; + auto startTimer = [this, &per]() { + std::cout << "calling outer"; + auto innerFunc = [this, &per](auto& f) -> void { + std::cout << "calling inner"; + std::shared_ptr timer = + std::make_shared( + ioContext_, + std::chrono::steady_clock::now() + + std::chrono::seconds(30)); + timer->async_wait( + [timer, f, &per](const boost::system::error_code& error) { + std::cout << "***incrementing per*****"; + ++per; + if (per > 100) + per = 100; + f(f); + }); + }; + innerFunc(innerFunc); + }; + startTimer(); + while (!writeConflict) { std::optional fetchResponse{ @@ -570,6 +593,11 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) deleting_ = false; }); } + if (++counter >= per) + { + std::this_thread::sleep_for(std::chrono::seconds(4)); + counter = 0; + } } }}; diff --git a/src/server/HttpBase.h b/src/server/HttpBase.h index 0a4ab512..44f420fd 100644 --- a/src/server/HttpBase.h +++ b/src/server/HttpBase.h @@ -174,8 +174,6 @@ handle_request( wsStyleRequest["command"] = request["method"]; - std::cout << "Transfromed to ws style stuff" << std::endl; - auto [builtResponse, cost] = buildResponse(wsStyleRequest, backend, nullptr, balancer, nullptr); From 416a3cc5232920c897742fa1d35ef610f4a3ff09 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 25 Jun 2021 17:36:20 +0000 Subject: [PATCH 02/11] various fixes --- src/backend/CassandraBackend.cpp | 2 +- src/etl/ReportingETL.cpp | 39 ++++++++++++++++++++++++-------- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index d9a7382d..6815f2b5 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -744,7 +744,7 @@ CassandraBackend::writeKeys( std::vector> cbs; cbs.reserve(keys.size()); uint32_t concurrentLimit = - isAsync ? indexerMaxRequestsOutstanding : keys.size(); + isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding; BOOST_LOG_TRIVIAL(debug) << __func__ << " Ledger = " << std::to_string(index.keyIndex) << " . num keys = " << std::to_string(keys.size()) diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 6828db91..35eff2ae 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -173,9 +173,22 @@ ReportingETL::getFees(std::uint32_t seq) void ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) { - auto ledgerRange = backend_->fetchLedgerRange(); + auto ledgerRange = backend_->fetchLedgerRangeNoThrow(); + auto fees = getFees(lgrInfo.seq); - auto transactions = backend_->fetchAllTransactionsInLedger(lgrInfo.seq); + std::vector transactions; + while (true) + { + try + { + transactions = backend_->fetchAllTransactionsInLedger(lgrInfo.seq); + break; + } + catch (Backend::DatabaseTimeout const&) + { + BOOST_LOG_TRIVIAL(warning) << "Read timeout fetching transactions"; + } + } if (!fees || !ledgerRange) { @@ -516,20 +529,19 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) uint32_t currentSequence = startSequence; int counter = 0; - std::atomic_int per = 1; + std::atomic_int per = 4; auto startTimer = [this, &per]() { - std::cout << "calling outer"; auto innerFunc = [this, &per](auto& f) -> void { - std::cout << "calling inner"; std::shared_ptr timer = std::make_shared( ioContext_, std::chrono::steady_clock::now() + - std::chrono::seconds(30)); + std::chrono::minutes(5)); timer->async_wait( [timer, f, &per](const boost::system::error_code& error) { - std::cout << "***incrementing per*****"; ++per; + BOOST_LOG_TRIVIAL(info) + << "Incremented per to " << std::to_string(per); if (per > 100) per = 100; f(f); @@ -538,6 +550,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) innerFunc(innerFunc); }; startTimer(); + auto begin = std::chrono::system_clock::now(); while (!writeConflict) { @@ -576,7 +589,9 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) // success is false if the ledger was already written if (success) { - publishLedger(lgrInfo); + boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() { + publishLedger(lgrInfo); + }); lastPublishedSequence = lgrInfo.seq; } writeConflict = !success; @@ -595,8 +610,14 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) } if (++counter >= per) { - std::this_thread::sleep_for(std::chrono::seconds(4)); + std::chrono::milliseconds sleep = + std::chrono::duration_cast( + std::chrono::seconds(4) - (end - begin)); + BOOST_LOG_TRIVIAL(info) << "Sleeping for " << sleep.count() + << " . per = " << std::to_string(per); + std::this_thread::sleep_for(sleep); counter = 0; + begin = std::chrono::system_clock::now(); } } }}; From 9f8724b7ab866708dfd1a41b7e1b4a1980bff48c Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 29 Jun 2021 17:33:42 +0000 Subject: [PATCH 03/11] write transaction hashes to separate table --- src/backend/CassandraBackend.cpp | 88 ++++++++++++++++++++++++++------ src/backend/CassandraBackend.h | 18 ++----- src/etl/ReportingETL.cpp | 7 ++- test.py | 17 ++++-- 4 files changed, 94 insertions(+), 36 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 6815f2b5..d41085b3 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -51,6 +51,13 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) delete &requestParams; } } +template +void +processAsyncWrite(CassFuture* fut, void* cbData) +{ + T& requestParams = *static_cast(cbData); + processAsyncWriteResponse(requestParams, fut, requestParams.retry); +} // Process the result of an asynchronous write. Retry on error // @param fut cassandra future associated with the write // @param cbData struct that holds the request parameters @@ -217,6 +224,54 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData) } } +template +struct CallbackData +{ + CassandraBackend const* backend; + T data; + F retry; + uint32_t currentRetries; + std::atomic refs = 1; + + CallbackData(CassandraBackend const* b, T&& d, F f) + : backend(b), data(std::move(d)), retry(f) + { + } +}; + +void +CassandraBackend::writeTransaction( + std::string&& hash, + uint32_t seq, + std::string&& transaction, + std::string&& metadata) const +{ + BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; + std::string hashCpy = hash; + auto func = [this](auto& params, bool retry) { + CassandraStatement statement{insertLedgerTransaction_}; + statement.bindInt(params.data.first); + statement.bindBytes(params.data.second); + executeAsyncWrite( + statement, + processAsyncWrite< + typename std::remove_reference::type>, + params, + retry); + }; + auto* lgrSeqToHash = + new CallbackData(this, std::make_pair(seq, std::move(hashCpy)), func); + WriteTransactionCallbackData* data = new WriteTransactionCallbackData( + this, + std::move(hash), + seq, + std::move(transaction), + std::move(metadata)); + + writeTransaction(*data, false); + func(*lgrSeqToHash, false); +} + std::optional CassandraBackend::fetchLedgerRange() const { @@ -243,21 +298,8 @@ CassandraBackend::fetchLedgerRange() const std::vector CassandraBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const { - CassandraStatement statement{selectAllTransactionsInLedger_}; - statement.bindInt(ledgerSequence); - CassandraResult result = executeSyncRead(statement); - if (!result) - { - BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; - return {}; - } - std::vector txns; - do - { - txns.push_back( - {result.getBytes(), result.getBytes(), result.getUInt32()}); - } while (result.nextRow()); - return txns; + auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence); + return fetchTransactions(hashes); } std::vector CassandraBackend::fetchAllTransactionHashesInLedger( @@ -1373,6 +1415,14 @@ CassandraBackend::open(bool readOnly) << " WITH default_time_to_live = " << std::to_string(ttl); if (!executeSimpleStatement(query.str())) continue; + query.str(""); + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix + << "ledger_transactions" + << " ( ledger_sequence bigint, hash blob, PRIMARY " + "KEY(ledger_sequence, hash))" + << " WITH default_time_to_live = " << std::to_string(ttl); + if (!executeSimpleStatement(query.str())) + continue; query.str(""); query << "SELECT * FROM " << tablePrefix << "transactions" @@ -1483,6 +1533,12 @@ CassandraBackend::open(bool readOnly) "?, ?)"; if (!insertTransaction_.prepareStatement(query, session_.get())) continue; + query.str(""); + query << "INSERT INTO " << tablePrefix << "ledger_transactions" + << " (ledger_sequence, hash) VALUES " + "(?, ?)"; + if (!insertLedgerTransaction_.prepareStatement(query, session_.get())) + continue; query.str(""); query << "INSERT INTO " << tablePrefix << "keys" @@ -1519,7 +1575,7 @@ CassandraBackend::open(bool readOnly) query, session_.get())) continue; query.str(""); - query << "SELECT hash FROM " << tablePrefix << "transactions" + query << "SELECT hash FROM " << tablePrefix << "ledger_transactions" << " WHERE ledger_sequence = ?"; if (!selectAllTransactionHashesInLedger_.prepareStatement( query, session_.get())) diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index 4f958530..f78f1b88 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -26,13 +26,13 @@ #include #include #include +#include +#include #include #include #include #include #include -#include -#include namespace Backend { @@ -633,6 +633,7 @@ private: // than making a new statement CassandraPreparedStatement insertObject_; CassandraPreparedStatement insertTransaction_; + CassandraPreparedStatement insertLedgerTransaction_; CassandraPreparedStatement selectTransaction_; CassandraPreparedStatement selectAllTransactionsInLedger_; CassandraPreparedStatement selectAllTransactionHashesInLedger_; @@ -1319,18 +1320,7 @@ public: std::string&& hash, uint32_t seq, std::string&& transaction, - std::string&& metadata) const override - { - BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; - WriteTransactionCallbackData* data = new WriteTransactionCallbackData( - this, - std::move(hash), - seq, - std::move(transaction), - std::move(metadata)); - - writeTransaction(*data, false); - } + std::string&& metadata) const override; void startWrites() const override diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 35eff2ae..0cb2a7a7 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -529,7 +529,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) uint32_t currentSequence = startSequence; int counter = 0; - std::atomic_int per = 4; + std::atomic_int per = 100; auto startTimer = [this, &per]() { auto innerFunc = [this, &per](auto& f) -> void { std::shared_ptr timer = @@ -549,7 +549,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) }; innerFunc(innerFunc); }; - startTimer(); + // startTimer(); + auto begin = std::chrono::system_clock::now(); while (!writeConflict) @@ -608,6 +609,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) deleting_ = false; }); } + /* if (++counter >= per) { std::chrono::milliseconds sleep = @@ -619,6 +621,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) counter = 0; begin = std::chrono::system_clock::now(); } + */ } }}; diff --git a/test.py b/test.py index 9b821a46..07a45cf6 100755 --- a/test.py +++ b/test.py @@ -699,11 +699,14 @@ async def ledgers(ip, port, minLedger, maxLedger, transactions, expand, maxCalls start = datetime.datetime.now().timestamp() await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":True, "transactions":bool(transactions),"expand":bool(expand)})) res = json.loads(await ws.recv()) - print(res["header"]["blob"]) end = datetime.datetime.now().timestamp() if (end - start) > 0.1: - print("request took more than 100ms") + print("request took more than 100ms : " + str(end - start)) numCalls = numCalls + 1 + if "error" in res: + print(res["error"]) + else: + print(res["header"]["blob"]) except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -842,8 +845,13 @@ args = parser.parse_args() def run(args): asyncio.set_event_loop(asyncio.new_event_loop()) - if(args.ledger is None): - args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1] + rng =asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)) + if args.ledger is None: + args.ledger = rng[1] + if args.maxLedger == -1: + args.maxLedger = rng[1] + if args.minLedger == -1: + args.minLedger = rng[0] if args.action == "fee": asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port)) elif args.action == "server_info": @@ -891,6 +899,7 @@ def run(args): end = datetime.datetime.now().timestamp() num = int(args.numRunners) * int(args.numCalls) print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + print("Latency = " + str((end - start) / int(args.numCalls)) + " seconds") elif args.action == "ledger_entries": keys = [] ledger_index = 0 From 2eec383b358e75db0dc32d234d9523b7ff912e21 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 2 Jul 2021 15:05:46 +0000 Subject: [PATCH 04/11] cleanup --- src/backend/BackendInterface.cpp | 3 +- src/backend/CassandraBackend.cpp | 52 +++++++++++++++++++++++++++----- src/backend/CassandraBackend.h | 36 ++-------------------- 3 files changed, 49 insertions(+), 42 deletions(-) diff --git a/src/backend/BackendInterface.cpp b/src/backend/BackendInterface.cpp index 29cd2610..b1a26ff1 100644 --- a/src/backend/BackendInterface.cpp +++ b/src/backend/BackendInterface.cpp @@ -205,6 +205,7 @@ BackendInterface::fetchLedgerPage( uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4)); LedgerPage page; page.cursor = cursor; + int numCalls = 0; do { adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2; @@ -223,7 +224,7 @@ BackendInterface::fetchLedgerPage( page.objects.insert( page.objects.end(), partial.objects.begin(), partial.objects.end()); page.cursor = partial.cursor; - } while (page.objects.size() < limit && page.cursor); + } while (page.objects.size() < limit && page.cursor && ++numCalls < 10); if (incomplete) { auto rng = fetchLedgerRange(); diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index d41085b3..dd794d6a 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -301,13 +301,52 @@ CassandraBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence); return fetchTransactions(hashes); } +std::vector +CassandraBackend::fetchTransactions( + std::vector const& hashes) const +{ + std::size_t const numHashes = hashes.size(); + std::atomic_uint32_t numFinished = 0; + std::condition_variable cv; + std::mutex mtx; + std::vector results{numHashes}; + std::vector> cbs; + cbs.reserve(numHashes); + auto start = std::chrono::system_clock::now(); + for (std::size_t i = 0; i < hashes.size(); ++i) + { + cbs.push_back(std::make_shared( + *this, hashes[i], results[i], cv, numFinished, numHashes)); + read(*cbs[i]); + } + assert(results.size() == cbs.size()); + + std::unique_lock lck(mtx); + cv.wait( + lck, [&numFinished, &numHashes]() { return numFinished == numHashes; }); + auto end = std::chrono::system_clock::now(); + for (auto const& res : results) + { + if (res.transaction.size() == 1 && res.transaction[0] == 0) + throw DatabaseTimeout(); + } + + BOOST_LOG_TRIVIAL(debug) + << "Fetched " << numHashes << " transactions from Cassandra in " + << std::chrono::duration_cast(end - start) + .count() + << " milliseconds"; + return results; +} std::vector CassandraBackend::fetchAllTransactionHashesInLedger( uint32_t ledgerSequence) const { CassandraStatement statement{selectAllTransactionHashesInLedger_}; statement.bindInt(ledgerSequence); + auto start = std::chrono::system_clock::now(); CassandraResult result = executeSyncRead(statement); + auto end = std::chrono::system_clock::now(); if (!result) { BOOST_LOG_TRIVIAL(error) @@ -320,6 +359,12 @@ CassandraBackend::fetchAllTransactionHashesInLedger( { hashes.push_back(result.getUInt256()); } while (result.nextRow()); + BOOST_LOG_TRIVIAL(debug) + << "Fetched " << hashes.size() + << " transaction hashes from Cassandra in " + << std::chrono::duration_cast(end - start) + .count() + << " milliseconds"; return hashes; } @@ -1568,13 +1613,6 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); - query << "SELECT transaction, metadata, ledger_sequence FROM " - << tablePrefix << "transactions" - << " WHERE ledger_sequence = ?"; - if (!selectAllTransactionsInLedger_.prepareStatement( - query, session_.get())) - continue; - query.str(""); query << "SELECT hash FROM " << tablePrefix << "ledger_transactions" << " WHERE ledger_sequence = ?"; if (!selectAllTransactionHashesInLedger_.prepareStatement( diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index f78f1b88..a7a7b9b2 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -635,7 +635,6 @@ private: CassandraPreparedStatement insertTransaction_; CassandraPreparedStatement insertLedgerTransaction_; CassandraPreparedStatement selectTransaction_; - CassandraPreparedStatement selectAllTransactionsInLedger_; CassandraPreparedStatement selectAllTransactionHashesInLedger_; CassandraPreparedStatement selectObject_; CassandraPreparedStatement selectLedgerPageKeys_; @@ -1059,39 +1058,8 @@ public: }; std::vector - fetchTransactions(std::vector const& hashes) const override - { - std::size_t const numHashes = hashes.size(); - BOOST_LOG_TRIVIAL(debug) - << "Fetching " << numHashes << " transactions from Cassandra"; - std::atomic_uint32_t numFinished = 0; - std::condition_variable cv; - std::mutex mtx; - std::vector results{numHashes}; - std::vector> cbs; - cbs.reserve(numHashes); - for (std::size_t i = 0; i < hashes.size(); ++i) - { - cbs.push_back(std::make_shared( - *this, hashes[i], results[i], cv, numFinished, numHashes)); - read(*cbs[i]); - } - assert(results.size() == cbs.size()); - - std::unique_lock lck(mtx); - cv.wait(lck, [&numFinished, &numHashes]() { - return numFinished == numHashes; - }); - for (auto const& res : results) - { - if (res.transaction.size() == 1 && res.transaction[0] == 0) - throw DatabaseTimeout(); - } - - BOOST_LOG_TRIVIAL(debug) - << "Fetched " << numHashes << " transactions from Cassandra"; - return results; - } + fetchTransactions( + std::vector const& hashes) const override; void read(ReadCallbackData& data) const From 6ed91a4f7b7b865eeacce721f5b3b8ca55eebea0 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 2 Jul 2021 19:30:17 +0000 Subject: [PATCH 05/11] fix gcc build issue --- deps/cassandra.cmake | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/cassandra.cmake b/deps/cassandra.cmake index 80f2ade5..d34d7c79 100644 --- a/deps/cassandra.cmake +++ b/deps/cassandra.cmake @@ -44,7 +44,7 @@ if(NOT cassandra) GIT_REPOSITORY https://github.com/krb5/krb5.git GIT_TAG master UPDATE_COMMAND "" - CONFIGURE_COMMAND autoreconf src && ./src/configure --enable-static --disable-shared + CONFIGURE_COMMAND autoreconf src && CFLAGS=-fcommon ./src/configure --enable-static --disable-shared BUILD_IN_SOURCE 1 BUILD_COMMAND make INSTALL_COMMAND "" @@ -106,7 +106,7 @@ if(NOT cassandra) -DLIBUV_ROOT_DIR=${BINARY_DIR} -DLIBUV_INCLUDE_DIR=${SOURCE_DIR}/include -DCASS_BUILD_STATIC=ON - -DCASS_BUILD_SHARED=OFF + -DCASS_BUILD_SHARED=OFF INSTALL_COMMAND "" BUILD_BYPRODUCTS /${CMAKE_STATIC_LIBRARY_PREFIX}cassandra_static.a ) From 8684dba694f585406c97ef3a1f914aada8c6997b Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 9 Jul 2021 19:54:04 +0000 Subject: [PATCH 06/11] checkpoint --- src/backend/CassandraBackend.cpp | 247 ++++++++++++++++++++++++++----- src/backend/CassandraBackend.h | 74 +-------- 2 files changed, 217 insertions(+), 104 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index dd794d6a..971d907b 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -45,10 +45,7 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) { BOOST_LOG_TRIVIAL(trace) << __func__ << " Succesfully inserted a record"; - backend.finishAsyncWrite(); - int remaining = --requestParams.refs; - if (remaining == 0) - delete &requestParams; + requestParams.finish(); } } template @@ -58,6 +55,7 @@ processAsyncWrite(CassFuture* fut, void* cbData) T& requestParams = *static_cast(cbData); processAsyncWriteResponse(requestParams, fut, requestParams.retry); } +/* // Process the result of an asynchronous write. Retry on error // @param fut cassandra future associated with the write // @param cbData struct that holds the request parameters @@ -72,7 +70,7 @@ flatMapWriteCallback(CassFuture* fut, void* cbData) processAsyncWriteResponse(requestParams, fut, func); } - +*/ /* void @@ -141,6 +139,7 @@ flatMapGetCreatedCallback(CassFuture* fut, void* cbData) } } */ +/* void flatMapWriteTransactionCallback(CassFuture* fut, void* cbData) { @@ -182,6 +181,7 @@ flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData) }; processAsyncWriteResponse(requestParams, fut, func); } +*/ // Process the result of an asynchronous read. Retry on error // @param fut cassandra future associated with the read @@ -224,21 +224,185 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData) } } -template +template struct CallbackData { CassandraBackend const* backend; T data; - F retry; + std::function&, bool)> retry; uint32_t currentRetries; std::atomic refs = 1; - CallbackData(CassandraBackend const* b, T&& d, F f) - : backend(b), data(std::move(d)), retry(f) + CallbackData(CassandraBackend const* b, T&& d, B bind) + : backend(b), data(std::move(d)) + { + retry = [bind, this](auto& params, bool isRetry) { + auto statement = bind(params); + backend->executeAsyncWrite( + statement, + processAsyncWrite< + typename std::remove_reference::type>, + params, + isRetry); + }; + } + virtual void + start() + { + retry(*this, false); + } + + virtual void + finish() + { + backend->finishAsyncWrite(); + int remaining = --refs; + if (remaining == 0) + delete this; + } + virtual ~CallbackData() + { + } +}; +template +struct BulkWriteCallbackData : public CallbackData +{ + std::mutex& mtx; + std::condition_variable& cv; + std::atomic_int& numRemaining; + BulkWriteCallbackData( + CassandraBackend const* b, + T&& d, + B bind, + std::atomic_int& r, + std::mutex& m, + std::condition_variable& c) + : CallbackData(b, std::move(d), bind) + , numRemaining(r) + , mtx(m) + , cv(c) + { + } + void + start() override + { + this->retry(*this, true); + } + + void + finish() override + { + { + std::lock_guard lck(mtx); + --numRemaining; + cv.notify_one(); + } + } + ~BulkWriteCallbackData() { } }; +template +void +makeAndExecuteAsyncWrite(CassandraBackend const* b, T&& d, B bind) +{ + auto* cb = new CallbackData(b, std::move(d), bind); + cb->start(); +} +template +std::shared_ptr> +makeAndExecuteBulkAsyncWrite( + CassandraBackend const* b, + T&& d, + B bind, + std::atomic_int& r, + std::mutex& m, + std::condition_variable& c) +{ + auto cb = std::make_shared>( + b, std::move(d), bind, r, m, c); + cb->start(); + return cb; +} +void +CassandraBackend::doWriteLedgerObject( + std::string&& key, + uint32_t seq, + std::string&& blob, + bool isCreated, + bool isDeleted, + std::optional&& book) const +{ + BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple(std::move(key), seq, std::move(blob))), + [this](auto& params) { + auto& [key, sequence, blob] = params.data; + + CassandraStatement statement{insertObject_}; + statement.bindBytes(key); + statement.bindInt(sequence); + statement.bindBytes(blob); + return statement; + }); +} +void +CassandraBackend::writeLedger( + ripple::LedgerInfo const& ledgerInfo, + std::string&& header, + bool isFirst) const +{ + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple(ledgerInfo.seq, std::move(header))), + [this](auto& params) { + auto& [sequence, header] = params.data; + CassandraStatement statement{insertLedgerHeader_}; + statement.bindInt(sequence); + statement.bindBytes(header); + return statement; + }); + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple(ledgerInfo.hash, ledgerInfo.seq)), + [this](auto& params) { + auto& [hash, sequence] = params.data; + CassandraStatement statement{insertLedgerHash_}; + statement.bindBytes(hash); + statement.bindInt(sequence); + return statement; + }); + ledgerSequence_ = ledgerInfo.seq; + isFirstLedger_ = isFirst; +} +void +CassandraBackend::writeAccountTransactions( + std::vector&& data) const +{ + for (auto& record : data) + { + for (auto& account : record.accounts) + { + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple( + std::move(account), + record.ledgerSequence, + record.transactionIndex, + record.txHash)), + [this](auto& params) { + CassandraStatement statement(insertAccountTx_); + auto& [account, lgrSeq, txnIdx, hash] = params.data; + statement.bindBytes(account); + statement.bindIntTuple(lgrSeq, txnIdx); + statement.bindBytes(hash); + return statement; + }); + } + } +} void CassandraBackend::writeTransaction( std::string&& hash, @@ -248,28 +412,27 @@ CassandraBackend::writeTransaction( { BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; std::string hashCpy = hash; - auto func = [this](auto& params, bool retry) { - CassandraStatement statement{insertLedgerTransaction_}; - statement.bindInt(params.data.first); - statement.bindBytes(params.data.second); - executeAsyncWrite( - statement, - processAsyncWrite< - typename std::remove_reference::type>, - params, - retry); - }; - auto* lgrSeqToHash = - new CallbackData(this, std::make_pair(seq, std::move(hashCpy)), func); - WriteTransactionCallbackData* data = new WriteTransactionCallbackData( - this, - std::move(hash), - seq, - std::move(transaction), - std::move(metadata)); - writeTransaction(*data, false); - func(*lgrSeqToHash, false); + makeAndExecuteAsyncWrite( + this, std::move(std::make_pair(seq, hash)), [this](auto& params) { + CassandraStatement statement{insertLedgerTransaction_}; + statement.bindInt(params.data.first); + statement.bindBytes(params.data.second); + return statement; + }); + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple( + std::move(hash), seq, std::move(transaction), std::move(metadata))), + [this](auto& params) { + CassandraStatement statement{insertTransaction_}; + auto& [hash, sequence, transaction, metadata] = params.data; + statement.bindBytes(hash); + statement.bindInt(sequence); + statement.bindBytes(transaction); + statement.bindBytes(metadata); + return statement; + }); } std::optional @@ -825,10 +988,20 @@ CassandraBackend::writeKeys( KeyIndex const& index, bool isAsync) const { - std::atomic_uint32_t numRemaining = keys.size(); + auto bind = [this](auto& params) { + auto& [lgrSeq, key] = params.data; + CassandraStatement statement{insertKey_}; + statement.bindInt(lgrSeq); + statement.bindBytes(key); + return statement; + }; + std::atomic_int numRemaining = keys.size(); std::condition_variable cv; std::mutex mtx; - std::vector> cbs; + std::vector, + typename std::remove_reference::type>>> + cbs; cbs.reserve(keys.size()); uint32_t concurrentLimit = isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding; @@ -840,9 +1013,13 @@ CassandraBackend::writeKeys( uint32_t numSubmitted = 0; for (auto& key : keys) { - cbs.push_back(std::make_shared( - *this, key, index.keyIndex, cv, mtx, numRemaining)); - writeKey(*cbs.back()); + cbs.push_back(makeAndExecuteBulkAsyncWrite( + this, + std::make_pair(index.keyIndex, std::move(key)), + bind, + numRemaining, + mtx, + cv)); ++numSubmitted; BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; std::unique_lock lck(mtx); diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index a7a7b9b2..1a0ce7a4 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -869,18 +869,7 @@ public: writeLedger( ripple::LedgerInfo const& ledgerInfo, std::string&& header, - bool isFirst = false) const override - { - WriteLedgerHeaderCallbackData* headerCb = - new WriteLedgerHeaderCallbackData( - this, ledgerInfo.seq, std::move(header)); - WriteLedgerHashCallbackData* hashCb = new WriteLedgerHashCallbackData( - this, ledgerInfo.hash, ledgerInfo.seq); - writeLedgerHeader(*headerCb, false); - writeLedgerHash(*hashCb, false); - ledgerSequence_ = ledgerInfo.seq; - isFirstLedger_ = isFirst; - } + bool isFirst = false) const override; void writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const { @@ -1174,6 +1163,7 @@ public: } }; + /* void write(WriteCallbackData& data, bool isRetry) const { @@ -1185,7 +1175,7 @@ public: executeAsyncWrite(statement, flatMapWriteCallback, data, isRetry); } - } + }*/ void doWriteLedgerObject( @@ -1194,53 +1184,11 @@ public: std::string&& blob, bool isCreated, bool isDeleted, - std::optional&& book) const override - { - BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; - bool hasBook = book.has_value(); - WriteCallbackData* data = new WriteCallbackData( - this, - std::move(key), - seq, - std::move(blob), - isCreated, - isDeleted, - std::move(book)); - - write(*data, false); - } + std::optional&& book) const override; void writeAccountTransactions( - std::vector&& data) const override - { - for (auto& record : data) - { - for (auto& account : record.accounts) - { - WriteAccountTxCallbackData* cbData = - new WriteAccountTxCallbackData( - this, - std::move(account), - record.ledgerSequence, - record.transactionIndex, - std::move(record.txHash)); - writeAccountTx(*cbData, false); - } - } - } - - void - writeAccountTx(WriteAccountTxCallbackData& data, bool isRetry) const - { - CassandraStatement statement(insertAccountTx_); - statement.bindBytes(data.account); - statement.bindIntTuple(data.ledgerSequence, data.transactionIndex); - statement.bindBytes(data.txHash); - - executeAsyncWrite( - statement, flatMapWriteAccountTxCallback, data, isRetry); - } + std::vector&& data) const override; struct WriteTransactionCallbackData { @@ -1271,18 +1219,6 @@ public: } }; - void - writeTransaction(WriteTransactionCallbackData& data, bool isRetry) const - { - CassandraStatement statement{insertTransaction_}; - statement.bindBytes(data.hash); - statement.bindInt(data.sequence); - statement.bindBytes(data.transaction); - statement.bindBytes(data.metadata); - - executeAsyncWrite( - statement, flatMapWriteTransactionCallback, data, isRetry); - } void writeTransaction( std::string&& hash, From 607ea0a76e4ee3e81ae68492477370d332c535b6 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 9 Jul 2021 23:45:01 +0000 Subject: [PATCH 07/11] refactor --- src/backend/CassandraBackend.cpp | 784 +++---------------------------- src/backend/CassandraBackend.h | 209 +------- 2 files changed, 65 insertions(+), 928 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 971d907b..19fe7c25 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -55,134 +55,31 @@ processAsyncWrite(CassFuture* fut, void* cbData) T& requestParams = *static_cast(cbData); processAsyncWriteResponse(requestParams, fut, requestParams.retry); } + /* -// Process the result of an asynchronous write. Retry on error -// @param fut cassandra future associated with the write -// @param cbData struct that holds the request parameters +template void -flatMapWriteCallback(CassFuture* fut, void* cbData) +processAsyncRead(CassFuture* fut, void* cbData) { - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->write(params, retry); - }; - - processAsyncWriteResponse(requestParams, fut, func); -} -*/ -/* - -void -retryWriteKey(CassandraBackend::WriteCallbackData& requestParams, bool isRetry) -{ - auto const& backend = *requestParams.backend; - if (requestParams.isDeleted) - backend.writeDeletedKey(requestParams, true); - else - backend.writeKey(requestParams, true); -} - -void -flatMapWriteKeyCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - processAsyncWriteResponse(requestParams, fut, retryWriteKey); -} - -void -flatMapGetCreatedCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - CassandraBackend const& backend = *requestParams.backend; - auto rc = cass_future_error_code(fut); + T& requestParams = *static_cast(cbData); + CassError rc = cass_future_error_code(fut); if (rc != CASS_OK) - BOOST_LOG_TRIVIAL(info) << __func__; { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.ioContext_, std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &backend]( - const boost::system::error_code& error) { - backend.writeKey(requestParams, true); - }); + requestParams.result = {}; } else { - auto finish = [&backend]() { - --(backend.numRequestsOutstanding_); + CassandraResult result = + std::move(CassandraResult(cass_future_get_result(fut))); + requestParams.populate(result); - backend.throttleCv_.notify_all(); - if (backend.numRequestsOutstanding_ == 0) - backend.syncCv_.notify_all(); - }; - CassandraResult result{cass_future_get_result(fut)}; - - if (!result) - { - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc - << ", " << cass_error_desc(rc); - finish(); - return; - } - requestParams.createdSequence = result.getUInt32(); - backend.writeDeletedKey(requestParams, false); + std::lock_guard lck(requestParams.mtx); + size_t batchSize = requestParams.batchSize; + if (++(requestParams_.numFinished) == batchSize) + requestParams_.cv.notify_all(); } } */ -/* -void -flatMapWriteTransactionCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteTransactionCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeTransaction(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} -void -flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteAccountTxCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeAccountTx(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} -void -flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteLedgerHeaderCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeLedgerHeader(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} - -void -flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteLedgerHashCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeLedgerHash(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} -*/ - // Process the result of an asynchronous read. Retry on error // @param fut cassandra future associated with the read // @param cbData struct that holds the request parameters @@ -224,6 +121,26 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData) } } +/* +template +struct ReadCallbackData +{ + using Finisher = std::function; + T data; + CassandraBackend const* backend; + Finisher finish; + ReadCallbackData(CassandraBackend const* backend, T&& d, Finisher f) + : backend(b), data(d), finish(f) + { + } + + void + finish(CassandraResult& res) + { + finish(res) + } +}; +*/ template struct CallbackData { @@ -292,11 +209,10 @@ struct BulkWriteCallbackData : public CallbackData void finish() override { - { - std::lock_guard lck(mtx); - --numRemaining; - cv.notify_one(); - } + // TODO: it would be nice to avoid this lock. + std::lock_guard lck(mtx); + --numRemaining; + cv.notify_one(); } ~BulkWriteCallbackData() { @@ -531,120 +447,6 @@ CassandraBackend::fetchAllTransactionHashesInLedger( return hashes; } -struct ReadDiffCallbackData -{ - CassandraBackend const& backend; - uint32_t sequence; - std::vector& result; - std::condition_variable& cv; - - std::atomic_uint32_t& numFinished; - size_t batchSize; - - ReadDiffCallbackData( - CassandraBackend const& backend, - uint32_t sequence, - std::vector& result, - std::condition_variable& cv, - std::atomic_uint32_t& numFinished, - size_t batchSize) - : backend(backend) - , sequence(sequence) - , result(result) - , cv(cv) - , numFinished(numFinished) - , batchSize(batchSize) - { - } -}; - -void -flatMapReadDiffCallback(CassFuture* fut, void* cbData); -void -readDiff(ReadDiffCallbackData& data) -{ - CassandraStatement statement{ - data.backend.getSelectLedgerDiffPreparedStatement()}; - statement.bindInt(data.sequence); - - data.backend.executeAsyncRead(statement, flatMapReadDiffCallback, data); -} -// Process the result of an asynchronous read. Retry on error -// @param fut cassandra future associated with the read -// @param cbData struct that holds the request parameters -void -flatMapReadDiffCallback(CassFuture* fut, void* cbData) -{ - ReadDiffCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params) { readDiff(params); }; - CassandraAsyncResult asyncResult{requestParams, fut, func, true}; - CassandraResult& result = asyncResult.getResult(); - - if (!!result) - { - do - { - requestParams.result.push_back( - {result.getUInt256(), result.getBytes()}); - } while (result.nextRow()); - } -} -std::map> -CassandraBackend::fetchLedgerDiffs(std::vector const& sequences) const -{ - std::atomic_uint32_t numFinished = 0; - std::condition_variable cv; - std::mutex mtx; - std::map> results; - std::vector> cbs; - cbs.reserve(sequences.size()); - for (std::size_t i = 0; i < sequences.size(); ++i) - { - cbs.push_back(std::make_shared( - *this, - sequences[i], - results[sequences[i]], - cv, - numFinished, - sequences.size())); - readDiff(*cbs[i]); - } - assert(results.size() == cbs.size()); - - std::unique_lock lck(mtx); - cv.wait(lck, [&numFinished, &sequences]() { - return numFinished == sequences.size(); - }); - - return results; -} - -std::vector -CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const -{ - CassandraStatement statement{selectLedgerDiff_}; - statement.bindInt(ledgerSequence); - - auto start = std::chrono::system_clock::now(); - CassandraResult result = executeSyncRead(statement); - - auto mid = std::chrono::system_clock::now(); - if (!result) - return {}; - std::vector objects; - do - { - objects.push_back({result.getUInt256(), result.getBytes()}); - } while (result.nextRow()); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Fetched diff. Fetch time = " - << std::to_string((mid - start).count() / 1000000000.0) - << " . total time = " - << std::to_string((end - start).count() / 1000000000.0); - return objects; -} LedgerPage CassandraBackend::doFetchLedgerPage( std::optional const& cursor, @@ -746,241 +548,6 @@ CassandraBackend::fetchLedgerObjects( << "Fetched " << numKeys << " records from Cassandra"; return results; } -struct WriteBookCallbackData -{ - CassandraBackend const& backend; - ripple::uint256 book; - ripple::uint256 offerKey; - uint32_t ledgerSequence; - std::condition_variable& cv; - std::atomic_uint32_t& numOutstanding; - std::mutex& mtx; - uint32_t currentRetries = 0; - WriteBookCallbackData( - CassandraBackend const& backend, - ripple::uint256 const& book, - ripple::uint256 const& offerKey, - uint32_t ledgerSequence, - std::condition_variable& cv, - std::mutex& mtx, - std::atomic_uint32_t& numOutstanding) - : backend(backend) - , book(book) - , offerKey(offerKey) - , ledgerSequence(ledgerSequence) - , cv(cv) - , mtx(mtx) - , numOutstanding(numOutstanding) - - { - } -}; -void -writeBookCallback(CassFuture* fut, void* cbData); -void -writeBook(WriteBookCallbackData& cb) -{ - CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()}; - statement.bindBytes(cb.book.data(), 24); - statement.bindInt(cb.ledgerSequence); - statement.bindBytes(cb.book.data() + 24, 8); - statement.bindBytes(cb.offerKey); - // Passing isRetry as true bypasses incrementing numOutstanding - cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true); -} -void -writeBookCallback(CassFuture* fut, void* cbData) -{ - WriteBookCallbackData& requestParams = - *static_cast(cbData); - - CassandraBackend const& backend = requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert book error: " << rc << ", " - << cass_error_desc(rc) << ", retrying in " << wait.count() - << " milliseconds"; - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.getIOContext(), - std::chrono::steady_clock::now() + wait); - timer->async_wait( - [timer, &requestParams](const boost::system::error_code& error) { - writeBook(requestParams); - }); - } - else - { - BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book"; - { - std::lock_guard lck(requestParams.mtx); - --requestParams.numOutstanding; - requestParams.cv.notify_one(); - } - } -} - -struct WriteKeyCallbackData -{ - CassandraBackend const& backend; - ripple::uint256 key; - uint32_t ledgerSequence; - std::condition_variable& cv; - std::atomic_uint32_t& numRemaining; - std::mutex& mtx; - uint32_t currentRetries = 0; - WriteKeyCallbackData( - CassandraBackend const& backend, - ripple::uint256 const& key, - uint32_t ledgerSequence, - std::condition_variable& cv, - std::mutex& mtx, - std::atomic_uint32_t& numRemaining) - : backend(backend) - , key(key) - , ledgerSequence(ledgerSequence) - , cv(cv) - , mtx(mtx) - , numRemaining(numRemaining) - - { - } -}; -struct OnlineDeleteCallbackData -{ - CassandraBackend const& backend; - ripple::uint256 key; - uint32_t ledgerSequence; - std::vector object; - std::condition_variable& cv; - std::atomic_uint32_t& numOutstanding; - std::mutex& mtx; - uint32_t currentRetries = 0; - OnlineDeleteCallbackData( - CassandraBackend const& backend, - ripple::uint256&& key, - uint32_t ledgerSequence, - std::vector&& object, - std::condition_variable& cv, - std::mutex& mtx, - std::atomic_uint32_t& numOutstanding) - : backend(backend) - , key(std::move(key)) - , ledgerSequence(ledgerSequence) - , object(std::move(object)) - , cv(cv) - , mtx(mtx) - , numOutstanding(numOutstanding) - - { - } -}; -void -onlineDeleteCallback(CassFuture* fut, void* cbData); -void -onlineDelete(OnlineDeleteCallbackData& cb) -{ - { - CassandraStatement statement{ - cb.backend.getInsertObjectPreparedStatement()}; - statement.bindBytes(cb.key); - statement.bindInt(cb.ledgerSequence); - statement.bindBytes(cb.object); - - cb.backend.executeAsyncWrite(statement, onlineDeleteCallback, cb, true); - } -} -void -onlineDeleteCallback(CassFuture* fut, void* cbData) -{ - OnlineDeleteCallbackData& requestParams = - *static_cast(cbData); - - CassandraBackend const& backend = requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert book error: " << rc << ", " - << cass_error_desc(rc) << ", retrying in " << wait.count() - << " milliseconds"; - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.getIOContext(), - std::chrono::steady_clock::now() + wait); - timer->async_wait( - [timer, &requestParams](const boost::system::error_code& error) { - onlineDelete(requestParams); - }); - } - else - { - BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book"; - { - std::lock_guard lck(requestParams.mtx); - --requestParams.numOutstanding; - requestParams.cv.notify_one(); - } - } -} -void -writeKeyCallback(CassFuture* fut, void* cbData); -void -writeKey(WriteKeyCallbackData& cb) -{ - CassandraStatement statement{cb.backend.getInsertKeyPreparedStatement()}; - statement.bindInt(cb.ledgerSequence); - statement.bindBytes(cb.key); - // Passing isRetry as true bypasses incrementing numOutstanding - cb.backend.executeAsyncWrite(statement, writeKeyCallback, cb, true); -} -void -writeKeyCallback(CassFuture* fut, void* cbData) -{ - WriteKeyCallbackData& requestParams = - *static_cast(cbData); - - CassandraBackend const& backend = requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert key error: " << rc << ", " - << cass_error_desc(rc) << ", retrying in " << wait.count() - << " milliseconds"; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.getIOContext(), - std::chrono::steady_clock::now() + wait); - timer->async_wait( - [timer, &requestParams](const boost::system::error_code& error) { - writeKey(requestParams); - }); - } - else - { - BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a key"; - { - std::lock_guard lck(requestParams.mtx); - --requestParams.numRemaining; - requestParams.cv.notify_one(); - } - } -} bool CassandraBackend::writeKeys( @@ -995,7 +562,7 @@ CassandraBackend::writeKeys( statement.bindBytes(key); return statement; }; - std::atomic_int numRemaining = keys.size(); + std::atomic_int numOutstanding = keys.size(); std::condition_variable cv; std::mutex mtx; std::vector lck(mtx); - BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; - cv.wait(lck, [&numRemaining, numSubmitted, concurrentLimit, &keys]() { - BOOST_LOG_TRIVIAL(trace) << std::to_string(numSubmitted) << " " - << std::to_string(numRemaining) << " " - << std::to_string(keys.size()) << " " - << std::to_string(concurrentLimit); + cv.wait(lck, [&numOutstanding, concurrentLimit, &keys]() { // keys.size() - i is number submitted. keys.size() - // numRemaining is number completed Difference is num // outstanding - return (numSubmitted - (keys.size() - numRemaining)) < - concurrentLimit; + return numOutstanding < concurrentLimit; }); if (numSubmitted % 100000 == 0) BOOST_LOG_TRIVIAL(debug) - << __func__ << " Submitted " << std::to_string(numSubmitted) - << " write requests. Completed " - << (keys.size() - numRemaining); + << __func__ << " Submitted " << std::to_string(numSubmitted); } std::unique_lock lck(mtx); - cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); return true; } -bool -CassandraBackend::isIndexed(uint32_t ledgerSequence) const -{ - return false; - /* - auto rng = fetchLedgerRange(); - if (!rng) - return false; - if (ledgerSequence != rng->minSequence && - ledgerSequence != (ledgerSequence >> indexerShift_ << indexerShift_)) - ledgerSequence = ((ledgerSequence >> indexerShift_) << indexerShift_) + - (1 << indexerShift_); - CassandraStatement statement{selectKeys_}; - statement.bindInt(ledgerSequence); - ripple::uint256 zero; - statement.bindBytes(zero); - statement.bindUInt(1); - CassandraResult result = executeSyncRead(statement); - return !!result; - */ -} - -std::optional -CassandraBackend::getNextToIndex() const -{ - return {}; - /* - auto rng = fetchLedgerRange(); - if (!rng) - return {}; - uint32_t cur = rng->minSequence; - while (isIndexed(cur)) - { - cur = ((cur >> indexerShift_) << indexerShift_) + (1 << indexerShift_); - } - return cur; - */ -} - -bool -CassandraBackend::runIndexer(uint32_t ledgerSequence) const -{ - return false; - /* - auto start = std::chrono::system_clock::now(); - constexpr uint32_t limit = 2048; - std::unordered_set keys; - std::unordered_map offers; - std::unordered_map> - books; - std::optional cursor; - size_t numOffers = 0; - uint32_t base = ledgerSequence; - auto rng = fetchLedgerRange(); - if (base != rng->minSequence) - { - base = (base >> indexerShift_) << indexerShift_; - base -= (1 << indexerShift_); - if (base < rng->minSequence) - base = rng->minSequence; - } - BOOST_LOG_TRIVIAL(info) - << __func__ << " base = " << std::to_string(base) - << " next to index = " << std::to_string(ledgerSequence); - while (true) - { - try - { - auto [objects, curCursor] = fetchLedgerPage(cursor, base, limit); - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; - cursor = curCursor; - for (auto& obj : objects) - { - if (isOffer(obj.blob)) - { - auto bookDir = getBook(obj.blob); - books[bookDir].insert(obj.key); - offers[obj.key] = bookDir; - ++numOffers; - } - keys.insert(std::move(obj.key)); - if (keys.size() % 100000 == 0) - BOOST_LOG_TRIVIAL(info) - << __func__ << " Fetched " - << std::to_string(keys.size()) << "keys"; - } - if (!cursor) - break; - } - catch (DatabaseTimeout const& e) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " Database timeout fetching keys"; - std::this_thread::sleep_for(std::chrono::seconds(2)); - } - } - auto mid = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Fetched all keys from ledger " << std::to_string(base) - << " . num keys = " << keys.size() << " num books = " << books.size() - << " num offers = " << numOffers << " . Took " - << (mid - start).count() / 1000000000.0; - if (base == ledgerSequence) - { - BOOST_LOG_TRIVIAL(info) << __func__ << "Writing keys"; - writeKeys(keys, ledgerSequence); - writeBooks(books, ledgerSequence, numOffers); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Wrote all keys from ledger " - << std::to_string(ledgerSequence) << " . num keys = " << keys.size() - << " . Took " << (end - mid).count() / 1000000000.0 - << ". Entire operation took " - << (end - start).count() / 1000000000.0; - } - else - { - writeBooks(books, base, numOffers); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Wrote books. Skipping writing keys"; - } - - uint32_t prevLedgerSequence = base; - uint32_t nextLedgerSequence = - ((prevLedgerSequence >> indexerShift_) << indexerShift_); - BOOST_LOG_TRIVIAL(info) - << __func__ << " next base = " << std::to_string(nextLedgerSequence); - nextLedgerSequence += (1 << indexerShift_); - BOOST_LOG_TRIVIAL(info) - << __func__ << " next = " << std::to_string(nextLedgerSequence); - while (true) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " Processing diffs. nextLedger = " - << std::to_string(nextLedgerSequence); - auto rng = fetchLedgerRange(); - if (rng->maxSequence < nextLedgerSequence) - break; - start = std::chrono::system_clock::now(); - for (size_t i = prevLedgerSequence; i <= nextLedgerSequence; i += 256) - { - auto start2 = std::chrono::system_clock::now(); - std::unordered_map< - ripple::uint256, - std::unordered_set> - booksDeleted; - size_t numOffersDeleted = 0; - // Get the diff and update keys - std::vector objs; - std::vector sequences(256, 0); - std::iota(sequences.begin(), sequences.end(), i + 1); - - auto diffs = fetchLedgerDiffs(sequences); - for (auto const& diff : diffs) - { - for (auto const& obj : diff.second) - { - // remove deleted keys - if (obj.blob.size() == 0) - { - keys.erase(obj.key); - if (offers.count(obj.key) > 0) - { - auto book = offers[obj.key]; - if (booksDeleted[book].insert(obj.key).second) - ++numOffersDeleted; - offers.erase(obj.key); - } - } - else - { - // insert other keys. keys is a set, so this is a - // noop if obj.key is already in keys - keys.insert(obj.key); - // if the object is an offer, add to books - if (isOffer(obj.blob)) - { - auto book = getBook(obj.blob); - if (books[book].insert(obj.key).second) - ++numOffers; - offers[obj.key] = book; - } - } - } - } - if (sequences.back() % 256 != 0) - { - BOOST_LOG_TRIVIAL(error) - << __func__ - << " back : " << std::to_string(sequences.back()) - << " front : " << std::to_string(sequences.front()) - << " size : " << std::to_string(sequences.size()); - throw std::runtime_error( - "Last sequence is not divisible by 256"); - } - - for (auto& book : booksDeleted) - { - for (auto& offerKey : book.second) - { - if (books[book.first].erase(offerKey)) - --numOffers; - } - } - writeBooks(books, sequences.back(), numOffers); - writeBooks(booksDeleted, sequences.back(), numOffersDeleted); - auto mid = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) << __func__ << " Fetched 256 diffs. Took " - << (mid - start2).count() / 1000000000.0; - } - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Fetched all from diffs " - << std::to_string(nextLedgerSequence) - << " shift width = " << std::to_string(indexerShift_) - << ". num keys = " << keys.size() << " . Took " - << (end - start).count() / 1000000000.0 - << " prev ledger = " << std::to_string(prevLedgerSequence); - writeKeys(keys, nextLedgerSequence); - prevLedgerSequence = nextLedgerSequence; - nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); - } - return true; -*/ -} bool CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const { @@ -1285,11 +619,22 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const uint32_t minLedger = rng->maxSequence - numLedgersToKeep; if (minLedger <= rng->minSequence) return false; + auto bind = [this](auto& params) { + auto& [key, seq, obj] = params.data; + CassandraStatement statement{insertObject_}; + statement.bindBytes(key); + statement.bindInt(seq); + statement.bindBytes(obj); + return statement; + }; std::condition_variable cv; std::mutex mtx; - std::vector> cbs; + std::vector, + typename std::remove_reference::type>>> + cbs; uint32_t concurrentLimit = 10; - std::atomic_uint32_t numOutstanding = 0; + std::atomic_int numOutstanding = 0; // iterate through latest ledger, updating TTL std::optional cursor; @@ -1311,16 +656,15 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const for (auto& obj : objects) { ++numOutstanding; - cbs.push_back(std::make_shared( - *this, - std::move(obj.key), - minLedger, - std::move(obj.blob), - cv, + cbs.push_back(makeAndExecuteBulkAsyncWrite( + this, + std::make_tuple( + std::move(obj.key), minLedger, std::move(obj.blob)), + bind, + numOutstanding, mtx, - numOutstanding)); + cv)); - onlineDelete(*cbs.back()); std::unique_lock lck(mtx); BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; cv.wait(lck, [&numOutstanding, concurrentLimit]() { diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index 1a0ce7a4..b2322e94 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -36,26 +36,12 @@ namespace Backend { -void -flatMapWriteCallback(CassFuture* fut, void* cbData); -void -flatMapWriteKeyCallback(CassFuture* fut, void* cbData); -void -flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); -void -flatMapWriteBookCallback(CassFuture* fut, void* cbData); -void -flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData); void flatMapReadCallback(CassFuture* fut, void* cbData); void flatMapReadObjectCallback(CassFuture* fut, void* cbData); void flatMapGetCreatedCallback(CassFuture* fut, void* cbData); -void -flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); -void -flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); class CassandraPreparedStatement { @@ -571,7 +557,7 @@ public: ~CassandraAsyncResult() { - if (result_.isOk() or timedOut_) + if (result_.isOk() || timedOut_) { BOOST_LOG_TRIVIAL(trace) << "finished a request"; size_t batchSize = requestParams_.batchSize; @@ -644,11 +630,6 @@ private: CassandraPreparedStatement insertKey_; CassandraPreparedStatement selectKeys_; CassandraPreparedStatement getBook_; - CassandraPreparedStatement selectBook_; - CassandraPreparedStatement completeBook_; - CassandraPreparedStatement insertBook_; - CassandraPreparedStatement insertBook2_; - CassandraPreparedStatement deleteBook_; CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement selectAccountTx_; CassandraPreparedStatement insertLedgerHeader_; @@ -659,7 +640,6 @@ private: CassandraPreparedStatement selectLedgerBySeq_; CassandraPreparedStatement selectLatestLedger_; CassandraPreparedStatement selectLedgerRange_; - CassandraPreparedStatement selectLedgerDiff_; // io_context used for exponential backoff for write retries mutable boost::asio::io_context ioContext_; @@ -732,27 +712,11 @@ public: open_ = false; } CassandraPreparedStatement const& - getInsertKeyPreparedStatement() const - { - return insertKey_; - } - CassandraPreparedStatement const& - getInsertBookPreparedStatement() const - { - return insertBook2_; - } - CassandraPreparedStatement const& getInsertObjectPreparedStatement() const { return insertObject_; } - CassandraPreparedStatement const& - getSelectLedgerDiffPreparedStatement() const - { - return selectLedgerDiff_; - } - std::pair< std::vector, std::optional> @@ -803,39 +767,6 @@ public: return {{}, {}}; } - struct WriteLedgerHeaderCallbackData - { - CassandraBackend const* backend; - uint32_t sequence; - std::string header; - uint32_t currentRetries = 0; - - std::atomic refs = 1; - WriteLedgerHeaderCallbackData( - CassandraBackend const* f, - uint32_t sequence, - std::string&& header) - : backend(f), sequence(sequence), header(std::move(header)) - { - } - }; - struct WriteLedgerHashCallbackData - { - CassandraBackend const* backend; - ripple::uint256 hash; - uint32_t sequence; - uint32_t currentRetries = 0; - - std::atomic refs = 1; - WriteLedgerHashCallbackData( - CassandraBackend const* f, - ripple::uint256 hash, - uint32_t sequence) - : backend(f), hash(hash), sequence(sequence) - { - } - }; - bool doFinishWrites() const override { @@ -870,25 +801,6 @@ public: ripple::LedgerInfo const& ledgerInfo, std::string&& header, bool isFirst = false) const override; - void - writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const - { - CassandraStatement statement{insertLedgerHash_}; - statement.bindBytes(cb.hash); - statement.bindInt(cb.sequence); - executeAsyncWrite( - statement, flatMapWriteLedgerHashCallback, cb, isRetry); - } - - void - writeLedgerHeader(WriteLedgerHeaderCallbackData& cb, bool isRetry) const - { - CassandraStatement statement{insertLedgerHeader_}; - statement.bindInt(cb.sequence); - statement.bindBytes(cb.header); - executeAsyncWrite( - statement, flatMapWriteLedgerHeaderCallback, cb, isRetry); - } std::optional fetchLatestLedgerSequence() const override @@ -1104,79 +1016,6 @@ public: std::vector const& keys, uint32_t sequence) const override; - struct WriteCallbackData - { - CassandraBackend const* backend; - std::string key; - uint32_t sequence; - uint32_t createdSequence = 0; - std::string blob; - bool isCreated; - bool isDeleted; - std::optional book; - - uint32_t currentRetries = 0; - std::atomic refs = 1; - - WriteCallbackData( - CassandraBackend const* f, - std::string&& key, - uint32_t sequence, - std::string&& blob, - bool isCreated, - bool isDeleted, - std::optional&& inBook) - : backend(f) - , key(std::move(key)) - , sequence(sequence) - , blob(std::move(blob)) - , isCreated(isCreated) - , isDeleted(isDeleted) - , book(std::move(inBook)) - { - } - }; - - struct WriteAccountTxCallbackData - { - CassandraBackend const* backend; - ripple::AccountID account; - uint32_t ledgerSequence; - uint32_t transactionIndex; - ripple::uint256 txHash; - - uint32_t currentRetries = 0; - std::atomic refs = 1; - - WriteAccountTxCallbackData( - CassandraBackend const* f, - ripple::AccountID&& account, - uint32_t lgrSeq, - uint32_t txIdx, - ripple::uint256&& hash) - : backend(f) - , account(std::move(account)) - , ledgerSequence(lgrSeq) - , transactionIndex(txIdx) - , txHash(std::move(hash)) - { - } - }; - - /* - void - write(WriteCallbackData& data, bool isRetry) const - { - { - CassandraStatement statement{insertObject_}; - statement.bindBytes(data.key); - statement.bindInt(data.sequence); - statement.bindBytes(data.blob); - - executeAsyncWrite(statement, flatMapWriteCallback, data, isRetry); - } - }*/ - void doWriteLedgerObject( std::string&& key, @@ -1190,35 +1029,6 @@ public: writeAccountTransactions( std::vector&& data) const override; - struct WriteTransactionCallbackData - { - CassandraBackend const* backend; - // The shared pointer to the node object must exist until it's - // confirmed persisted. Otherwise, it can become deleted - // prematurely if other copies are removed from caches. - std::string hash; - uint32_t sequence; - std::string transaction; - std::string metadata; - - uint32_t currentRetries = 0; - - std::atomic refs = 1; - WriteTransactionCallbackData( - CassandraBackend const* f, - std::string&& hash, - uint32_t sequence, - std::string&& transaction, - std::string&& metadata) - : backend(f) - , hash(std::move(hash)) - , sequence(sequence) - , transaction(std::move(transaction)) - , metadata(std::move(metadata)) - { - } - }; - void writeTransaction( std::string&& hash, @@ -1247,27 +1057,10 @@ public: return ioContext_; } - friend void - flatMapWriteCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteKeyCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteBookCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); - friend void flatMapReadCallback(CassFuture* fut, void* cbData); friend void flatMapReadObjectCallback(CassFuture* fut, void* cbData); - friend void - flatMapGetCreatedCallback(CassFuture* fut, void* cbData); inline void incremementOutstandingRequestCount() const From 599bd1b6554175f63e9211501038fd600176f221 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Mon, 12 Jul 2021 20:57:03 +0000 Subject: [PATCH 08/11] refactor of cassandra backend, needs testing --- src/backend/CassandraBackend.cpp | 204 ++++++++++++------------------- src/backend/CassandraBackend.h | 178 --------------------------- 2 files changed, 75 insertions(+), 307 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 19fe7c25..d45e1eb3 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -2,19 +2,6 @@ #include #include #include -/* -namespace std { -template <> -struct hash -{ - std::size_t - operator()(const ripple::uint256& k) const noexcept - { - return boost::hash_range(k.begin(), k.end()); - } -}; -} // namespace std -*/ namespace Backend { template void @@ -53,104 +40,20 @@ void processAsyncWrite(CassFuture* fut, void* cbData) { T& requestParams = *static_cast(cbData); + // TODO don't pass in func processAsyncWriteResponse(requestParams, fut, requestParams.retry); } -/* -template -void -processAsyncRead(CassFuture* fut, void* cbData) -{ - T& requestParams = *static_cast(cbData); - CassError rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - requestParams.result = {}; - } - else - { - CassandraResult result = - std::move(CassandraResult(cass_future_get_result(fut))); - requestParams.populate(result); - - std::lock_guard lck(requestParams.mtx); - size_t batchSize = requestParams.batchSize; - if (++(requestParams_.numFinished) == batchSize) - requestParams_.cv.notify_all(); - } -} -*/ -// Process the result of an asynchronous read. Retry on error -// @param fut cassandra future associated with the read -// @param cbData struct that holds the request parameters -void -flatMapReadCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::ReadCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params) { params.backend.read(params); }; - CassandraAsyncResult asyncResult{requestParams, fut, func}; - if (asyncResult.timedOut()) - requestParams.result.transaction = {0}; - CassandraResult& result = asyncResult.getResult(); - - if (!!result) - { - requestParams.result = { - result.getBytes(), result.getBytes(), result.getUInt32()}; - } -} - -// Process the result of an asynchronous read. Retry on error -// @param fut cassandra future associated with the read -// @param cbData struct that holds the request parameters -void -flatMapReadObjectCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::ReadObjectCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params) { params.backend.readObject(params); }; - CassandraAsyncResult asyncResult{requestParams, fut, func}; - if (asyncResult.timedOut()) - requestParams.result = {0}; - CassandraResult& result = asyncResult.getResult(); - - if (!!result) - { - requestParams.result = result.getBytes(); - } -} - -/* -template -struct ReadCallbackData -{ - using Finisher = std::function; - T data; - CassandraBackend const* backend; - Finisher finish; - ReadCallbackData(CassandraBackend const* backend, T&& d, Finisher f) - : backend(b), data(d), finish(f) - { - } - - void - finish(CassandraResult& res) - { - finish(res) - } -}; -*/ template -struct CallbackData +struct WriteCallbackData { CassandraBackend const* backend; T data; - std::function&, bool)> retry; + std::function&, bool)> retry; uint32_t currentRetries; std::atomic refs = 1; - CallbackData(CassandraBackend const* b, T&& d, B bind) + WriteCallbackData(CassandraBackend const* b, T&& d, B bind) : backend(b), data(std::move(d)) { retry = [bind, this](auto& params, bool isRetry) { @@ -177,12 +80,12 @@ struct CallbackData if (remaining == 0) delete this; } - virtual ~CallbackData() + virtual ~WriteCallbackData() { } }; template -struct BulkWriteCallbackData : public CallbackData +struct BulkWriteCallbackData : public WriteCallbackData { std::mutex& mtx; std::condition_variable& cv; @@ -194,7 +97,7 @@ struct BulkWriteCallbackData : public CallbackData std::atomic_int& r, std::mutex& m, std::condition_variable& c) - : CallbackData(b, std::move(d), bind) + : WriteCallbackData(b, std::move(d), bind) , numRemaining(r) , mtx(m) , cv(c) @@ -211,8 +114,8 @@ struct BulkWriteCallbackData : public CallbackData { // TODO: it would be nice to avoid this lock. std::lock_guard lck(mtx); - --numRemaining; - cv.notify_one(); + if (--numRemaining == 0) + cv.notify_one(); } ~BulkWriteCallbackData() { @@ -223,7 +126,7 @@ template void makeAndExecuteAsyncWrite(CassandraBackend const* b, T&& d, B bind) { - auto* cb = new CallbackData(b, std::move(d), bind); + auto* cb = new WriteCallbackData(b, std::move(d), bind); cb->start(); } template @@ -380,12 +283,53 @@ CassandraBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence); return fetchTransactions(hashes); } + +struct ReadCallbackData +{ + std::function onSuccess; + std::atomic_int& numOutstanding; + std::mutex& mtx; + std::condition_variable& cv; + bool errored = false; + ReadCallbackData( + std::atomic_int& numOutstanding, + std::mutex& m, + std::condition_variable& cv, + std::function onSuccess) + : numOutstanding(numOutstanding), mtx(m), cv(cv), onSuccess(onSuccess) + { + } + + void + finish(CassFuture* fut) + { + CassError rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + errored = true; + } + else + { + CassandraResult result{cass_future_get_result(fut)}; + onSuccess(result); + } + std::lock_guard lck(mtx); + if (--numOutstanding == 0) + cv.notify_one(); + } +}; +void +processAsyncRead(CassFuture* fut, void* cbData) +{ + ReadCallbackData cb = *static_cast(cbData); + cb.finish(fut); +} std::vector CassandraBackend::fetchTransactions( std::vector const& hashes) const { std::size_t const numHashes = hashes.size(); - std::atomic_uint32_t numFinished = 0; + std::atomic_int numOutstanding = numHashes; std::condition_variable cv; std::mutex mtx; std::vector results{numHashes}; @@ -394,19 +338,23 @@ CassandraBackend::fetchTransactions( auto start = std::chrono::system_clock::now(); for (std::size_t i = 0; i < hashes.size(); ++i) { + CassandraStatement statement{selectTransaction_}; + statement.bindBytes(hashes[i]); cbs.push_back(std::make_shared( - *this, hashes[i], results[i], cv, numFinished, numHashes)); - read(*cbs[i]); + numOutstanding, mtx, cv, [i, &results](auto& result) { + results[i] = { + result.getBytes(), result.getBytes(), result.getUInt32()}; + })); + executeAsyncRead(statement, processAsyncRead, *cbs[i]); } assert(results.size() == cbs.size()); std::unique_lock lck(mtx); - cv.wait( - lck, [&numFinished, &numHashes]() { return numFinished == numHashes; }); + cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); auto end = std::chrono::system_clock::now(); - for (auto const& res : results) + for (auto const& cb : cbs) { - if (res.transaction.size() == 1 && res.transaction[0] == 0) + if (cb->errored) throw DatabaseTimeout(); } @@ -522,25 +470,30 @@ CassandraBackend::fetchLedgerObjects( std::size_t const numKeys = keys.size(); BOOST_LOG_TRIVIAL(trace) << "Fetching " << numKeys << " records from Cassandra"; - std::atomic_uint32_t numFinished = 0; + std::atomic_int numOutstanding = numKeys; std::condition_variable cv; std::mutex mtx; std::vector results{numKeys}; - std::vector> cbs; + std::vector> cbs; cbs.reserve(numKeys); for (std::size_t i = 0; i < keys.size(); ++i) { - cbs.push_back(std::make_shared( - *this, keys[i], sequence, results[i], cv, numFinished, numKeys)); - readObject(*cbs[i]); + cbs.push_back(std::make_shared( + numOutstanding, mtx, cv, [i, &results](auto& result) { + results[i] = result.getBytes(); + })); + CassandraStatement statement{selectObject_}; + statement.bindBytes(keys[i]); + statement.bindInt(sequence); + executeAsyncRead(statement, processAsyncRead, *cbs[i]); } assert(results.size() == cbs.size()); std::unique_lock lck(mtx); - cv.wait(lck, [&numFinished, &numKeys]() { return numFinished == numKeys; }); - for (auto const& res : results) + cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); + for (auto const& cb : cbs) { - if (res.size() == 1 && res[0] == 0) + if (cb->errored) throw DatabaseTimeout(); } @@ -1220,13 +1173,6 @@ CassandraBackend::open(bool readOnly) << " is_latest IN (true, false)"; if (!selectLedgerRange_.prepareStatement(query, session_.get())) continue; - /* - query.str(""); - query << " SELECT key,object FROM " << tablePrefix - << "objects WHERE sequence = ?"; - if (!selectLedgerDiff_.prepareStatement(query, session_.get())) - continue; - */ setupPreparedStatements = true; } diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index b2322e94..efa9baa3 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -36,13 +36,6 @@ namespace Backend { -void -flatMapReadCallback(CassFuture* fut, void* cbData); -void -flatMapReadObjectCallback(CassFuture* fut, void* cbData); -void -flatMapGetCreatedCallback(CassFuture* fut, void* cbData); - class CassandraPreparedStatement { private: @@ -521,63 +514,6 @@ isTimeout(CassError rc) return true; return false; } -template -class CassandraAsyncResult -{ - T& requestParams_; - CassandraResult result_; - bool timedOut_ = false; - bool retryOnTimeout_ = false; - -public: - CassandraAsyncResult( - T& requestParams, - CassFuture* fut, - F retry, - bool retryOnTimeout = false) - : requestParams_(requestParams), retryOnTimeout_(retryOnTimeout) - { - CassError rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - // TODO - should we ever be retrying requests? These are reads, - // and they usually only fail when the db is under heavy load. Seems - // best to just return an error to the client and have the client - // try again - if (isTimeout(rc)) - timedOut_ = true; - if (!timedOut_ || retryOnTimeout_) - retry(requestParams_); - } - else - { - result_ = std::move(CassandraResult(cass_future_get_result(fut))); - } - } - - ~CassandraAsyncResult() - { - if (result_.isOk() || timedOut_) - { - BOOST_LOG_TRIVIAL(trace) << "finished a request"; - size_t batchSize = requestParams_.batchSize; - if (++(requestParams_.numFinished) == batchSize) - requestParams_.cv.notify_all(); - } - } - - CassandraResult& - getResult() - { - return result_; - } - - bool - timedOut() - { - return timedOut_; - } -}; class CassandraBackend : public BackendInterface { @@ -629,7 +565,6 @@ private: CassandraPreparedStatement getToken_; CassandraPreparedStatement insertKey_; CassandraPreparedStatement selectKeys_; - CassandraPreparedStatement getBook_; CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement selectAccountTx_; CassandraPreparedStatement insertLedgerHeader_; @@ -677,17 +612,10 @@ public: ~CassandraBackend() override { - BOOST_LOG_TRIVIAL(info) << __func__; if (open_) close(); } - std::string - getName() - { - return "cassandra"; - } - bool isOpen() { @@ -711,11 +639,6 @@ public: } open_ = false; } - CassandraPreparedStatement const& - getInsertObjectPreparedStatement() const - { - return insertObject_; - } std::pair< std::vector, @@ -904,18 +827,6 @@ public: std::optional const& cursor, std::uint32_t ledgerSequence, std::uint32_t limit) const override; - std::vector - fetchLedgerDiff(uint32_t ledgerSequence) const; - std::map> - fetchLedgerDiffs(std::vector const& sequences) const; - - bool - runIndexer(uint32_t ledgerSequence) const; - bool - isIndexed(uint32_t ledgerSequence) const; - - std::optional - getNextToIndex() const; bool writeKeys( @@ -923,94 +834,10 @@ public: KeyIndex const& index, bool isAsync = false) const override; - bool - canFetchBatch() - { - return true; - } - - struct ReadCallbackData - { - CassandraBackend const& backend; - ripple::uint256 const& hash; - TransactionAndMetadata& result; - std::condition_variable& cv; - - std::atomic_uint32_t& numFinished; - size_t batchSize; - - ReadCallbackData( - CassandraBackend const& backend, - ripple::uint256 const& hash, - TransactionAndMetadata& result, - std::condition_variable& cv, - std::atomic_uint32_t& numFinished, - size_t batchSize) - : backend(backend) - , hash(hash) - , result(result) - , cv(cv) - , numFinished(numFinished) - , batchSize(batchSize) - { - } - - ReadCallbackData(ReadCallbackData const& other) = default; - }; - std::vector fetchTransactions( std::vector const& hashes) const override; - void - read(ReadCallbackData& data) const - { - CassandraStatement statement{selectTransaction_}; - statement.bindBytes(data.hash); - executeAsyncRead(statement, flatMapReadCallback, data); - } - - struct ReadObjectCallbackData - { - CassandraBackend const& backend; - ripple::uint256 const& key; - uint32_t sequence; - Blob& result; - std::condition_variable& cv; - - std::atomic_uint32_t& numFinished; - size_t batchSize; - - ReadObjectCallbackData( - CassandraBackend const& backend, - ripple::uint256 const& key, - uint32_t sequence, - Blob& result, - std::condition_variable& cv, - std::atomic_uint32_t& numFinished, - size_t batchSize) - : backend(backend) - , key(key) - , sequence(sequence) - , result(result) - , cv(cv) - , numFinished(numFinished) - , batchSize(batchSize) - { - } - - ReadObjectCallbackData(ReadObjectCallbackData const& other) = default; - }; - - void - readObject(ReadObjectCallbackData& data) const - { - CassandraStatement statement{selectObject_}; - statement.bindBytes(data.key); - statement.bindInt(data.sequence); - - executeAsyncRead(statement, flatMapReadObjectCallback, data); - } std::vector fetchLedgerObjects( std::vector const& keys, @@ -1057,11 +884,6 @@ public: return ioContext_; } - friend void - flatMapReadCallback(CassFuture* fut, void* cbData); - friend void - flatMapReadObjectCallback(CassFuture* fut, void* cbData); - inline void incremementOutstandingRequestCount() const { From 1cdd238ad36f5ad0e04495dfb26ed67b39005d29 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 13 Jul 2021 17:06:15 +0000 Subject: [PATCH 09/11] bug fix --- src/backend/BackendIndexer.cpp | 1 - src/backend/CassandraBackend.cpp | 19 +++++++++++++++---- src/backend/CassandraBackend.h | 13 +++++++++++++ src/etl/ReportingETL.cpp | 4 ++++ 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/backend/BackendIndexer.cpp b/src/backend/BackendIndexer.cpp index 4aa17fa9..f3ddf8c5 100644 --- a/src/backend/BackendIndexer.cpp +++ b/src/backend/BackendIndexer.cpp @@ -209,7 +209,6 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) BOOST_LOG_TRIVIAL(debug) << __func__ << " starting. sequence = " << std::to_string(ledgerSequence); - bool isFirst = false; auto keyIndex = getKeyIndexOfSeq(ledgerSequence); if (isFirst_) { diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index d45e1eb3..439caa7f 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -7,6 +7,7 @@ template void processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) { + BOOST_LOG_TRIVIAL(debug) << __func__ << " Processing async write response"; CassandraBackend const& backend = *requestParams.backend; auto rc = cass_future_error_code(fut); if (rc != CASS_OK) @@ -39,6 +40,7 @@ template void processAsyncWrite(CassFuture* fut, void* cbData) { + BOOST_LOG_TRIVIAL(debug) << __func__ << " processing async write"; T& requestParams = *static_cast(cbData); // TODO don't pass in func processAsyncWriteResponse(requestParams, fut, requestParams.retry); @@ -56,6 +58,7 @@ struct WriteCallbackData WriteCallbackData(CassandraBackend const* b, T&& d, B bind) : backend(b), data(std::move(d)) { + BOOST_LOG_TRIVIAL(debug) << "Making WriteCallbackData"; retry = [bind, this](auto& params, bool isRetry) { auto statement = bind(params); backend->executeAsyncWrite( @@ -69,7 +72,10 @@ struct WriteCallbackData virtual void start() { + BOOST_LOG_TRIVIAL(debug) << "Starting"; + BOOST_LOG_TRIVIAL(debug) << "address is " << this; retry(*this, false); + BOOST_LOG_TRIVIAL(debug) << "Started"; } virtual void @@ -82,6 +88,7 @@ struct WriteCallbackData } virtual ~WriteCallbackData() { + BOOST_LOG_TRIVIAL(debug) << __func__; } }; template @@ -342,8 +349,11 @@ CassandraBackend::fetchTransactions( statement.bindBytes(hashes[i]); cbs.push_back(std::make_shared( numOutstanding, mtx, cv, [i, &results](auto& result) { - results[i] = { - result.getBytes(), result.getBytes(), result.getUInt32()}; + if (result.hasResult()) + results[i] = { + result.getBytes(), + result.getBytes(), + result.getUInt32()}; })); executeAsyncRead(statement, processAsyncRead, *cbs[i]); } @@ -480,7 +490,8 @@ CassandraBackend::fetchLedgerObjects( { cbs.push_back(std::make_shared( numOutstanding, mtx, cv, [i, &results](auto& result) { - results[i] = result.getBytes(); + if (result.hasResult()) + results[i] = result.getBytes(); })); CassandraStatement statement{selectObject_}; statement.bindBytes(keys[i]); @@ -515,7 +526,7 @@ CassandraBackend::writeKeys( statement.bindBytes(key); return statement; }; - std::atomic_int numOutstanding = keys.size(); + std::atomic_int numOutstanding = 0; std::condition_variable cv; std::mutex mtx; std::vector void diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 0cb2a7a7..b577930e 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -114,10 +114,13 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) << "Deserialized ledger header. " << detail::toString(lgrInfo); backend_->startWrites(); + BOOST_LOG_TRIVIAL(debug) << __func__ << " started writes"; backend_->writeLedger( lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true); + BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote ledger"; std::vector accountTxData = insertTransactions(lgrInfo, *ledgerData); + BOOST_LOG_TRIVIAL(debug) << __func__ << " inserted txns"; auto start = std::chrono::system_clock::now(); @@ -126,6 +129,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) // consumes from the queue and inserts the data into the Ledger object. // Once the below call returns, all data has been pushed into the queue loadBalancer_->loadInitialLedger(startingSequence); + BOOST_LOG_TRIVIAL(debug) << __func__ << " loaded initial ledger"; if (!stopping_) { From cb2975dd413de7e5940516268bfcf2190564f5cc Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 13 Jul 2021 20:27:26 +0000 Subject: [PATCH 10/11] smaller partitions on keys table --- src/backend/CassandraBackend.cpp | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 439caa7f..9f2fb5c6 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -407,10 +407,11 @@ CassandraBackend::fetchAllTransactionHashesInLedger( LedgerPage CassandraBackend::doFetchLedgerPage( - std::optional const& cursor, + std::optional const& cursorIn, std::uint32_t ledgerSequence, std::uint32_t limit) const { + std::optional cursor = cursorIn; auto index = getKeyIndexOfSeq(ledgerSequence); if (!index) return {}; @@ -423,13 +424,13 @@ CassandraBackend::doFetchLedgerPage( << __func__ << " - Cursor = " << ripple::strHex(*cursor); CassandraStatement statement{selectKeys_}; statement.bindInt(index->keyIndex); - if (cursor) - statement.bindBytes(*cursor); - else + if (!cursor) { ripple::uint256 zero; - statement.bindBytes(zero); + cursor = zero; } + statement.bindBytes(cursor->data(), 1); + statement.bindBytes(*cursor); statement.bindUInt(limit + 1); CassandraResult result = executeSyncRead(statement); if (!!result) @@ -447,6 +448,12 @@ CassandraBackend::doFetchLedgerPage( page.cursor = keys.back(); ++(*page.cursor); } + else if (cursor->data()[0] != 0xFF) + { + ripple::uint256 zero; + zero.data()[0] = cursor->data()[0] + 1; + page.cursor = zero; + } auto objects = fetchLedgerObjects(keys, ledgerSequence); if (objects.size() != keys.size()) throw std::runtime_error("Mismatch in size of objects and keys"); @@ -523,6 +530,7 @@ CassandraBackend::writeKeys( auto& [lgrSeq, key] = params.data; CassandraStatement statement{insertKey_}; statement.bindInt(lgrSeq); + statement.bindBytes(key.data(), 1); statement.bindBytes(key); return statement; }; @@ -974,8 +982,8 @@ CassandraBackend::open(bool readOnly) query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys" - << " ( sequence bigint, key blob, PRIMARY KEY " - "(sequence, key))" + << " ( sequence bigint, first_byte blob, key blob, PRIMARY KEY " + "((sequence,first_byte), key))" " WITH default_time_to_live = " << std::to_string(keysTtl); if (!executeSimpleStatement(query.str())) @@ -1072,13 +1080,14 @@ CassandraBackend::open(bool readOnly) query.str(""); query << "INSERT INTO " << tablePrefix << "keys" - << " (sequence, key) VALUES (?, ?)"; + << " (sequence,first_byte, key) VALUES (?, ?, ?)"; if (!insertKey_.prepareStatement(query, session_.get())) continue; query.str(""); query << "SELECT key FROM " << tablePrefix << "keys" - << " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?"; + << " WHERE sequence = ? AND first_byte = ? AND key >= ? ORDER BY " + "key ASC LIMIT ?"; if (!selectKeys_.prepareStatement(query, session_.get())) continue; From 649ecf4eda57ef54c45de94da5eb19e1e3ffb092 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 14 Jul 2021 20:29:55 +0000 Subject: [PATCH 11/11] avoid wide rows with account_tx --- src/backend/CassandraBackend.cpp | 108 +++++++++++++++++++++++++++++-- src/backend/CassandraBackend.h | 51 +++------------ test.py | 5 +- 3 files changed, 113 insertions(+), 51 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 9f2fb5c6..c7719cd3 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -222,6 +222,9 @@ CassandraBackend::writeAccountTransactions( CassandraStatement statement(insertAccountTx_); auto& [account, lgrSeq, txnIdx, hash] = params.data; statement.bindBytes(account); + char firstByte = static_cast(lgrSeq >> 20); + statement.bindBytes(&firstByte, 1); + statement.bindIntTuple(lgrSeq, txnIdx); statement.bindBytes(hash); return statement; @@ -404,6 +407,101 @@ CassandraBackend::fetchAllTransactionHashesInLedger( << " milliseconds"; return hashes; } +std::pair< + std::vector, + std::optional> +CassandraBackend::fetchAccountTransactions( + ripple::AccountID const& account, + std::uint32_t limit, + std::optional const& cursorIn) const +{ + auto rng = fetchLedgerRange(); + if (!rng) + return {{}, {}}; + std::pair< + std::vector, + std::optional> + res; + auto cursor = cursorIn; + do + { + auto interim = doFetchAccountTransactions(account, limit, cursor); + for (auto& txn : interim.first) + { + res.first.push_back(txn); + } + res.second = cursor = interim.second; + limit -= interim.first.size(); + uint32_t seq = cursor->ledgerSequence; + seq = ((seq >> 20) - 1) << 20; + cursor->ledgerSequence = seq; + } while (res.first.size() < limit && + cursor->ledgerSequence >= rng->minSequence); + if (res.first.size() < limit) + res.second = {}; + return res; +} +std::pair< + std::vector, + std::optional> +CassandraBackend::doFetchAccountTransactions( + ripple::AccountID const& account, + std::uint32_t limit, + std::optional const& cursor) const +{ + BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx"; + CassandraStatement statement{selectAccountTx_}; + statement.bindBytes(account); + if (cursor) + { + char firstByte = static_cast(cursor->ledgerSequence >> 20); + statement.bindBytes(&firstByte, 1); + statement.bindIntTuple( + cursor->ledgerSequence, cursor->transactionIndex); + } + else + { + auto rng = fetchLedgerRange(); + if (!rng) + return {{}, {}}; + uint32_t max = rng->maxSequence; + char firstByte = static_cast(max >> 20); + statement.bindBytes(&firstByte, 1); + + statement.bindIntTuple(INT32_MAX, INT32_MAX); + } + statement.bindUInt(limit); + CassandraResult result = executeSyncRead(statement); + if (!result.hasResult()) + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned"; + return {{}, {}}; + } + + std::vector hashes; + size_t numRows = result.numRows(); + std::optional retCursor; + + BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows); + do + { + hashes.push_back(result.getUInt256()); + --numRows; + if (numRows == 0) + { + auto [lgrSeq, txnIdx] = result.getInt64Tuple(); + retCursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; + } + } while (result.nextRow()); + + BOOST_LOG_TRIVIAL(debug) + << "doAccountTx - populated hashes. num hashes = " << hashes.size(); + if (hashes.size()) + { + return {fetchTransactions(hashes), retCursor}; + } + return {{}, {}}; +} LedgerPage CassandraBackend::doFetchLedgerPage( @@ -996,10 +1094,11 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx" - << " ( account blob, seq_idx tuple, " + << " ( account blob, seq_first_byte blob, seq_idx " + "tuple, " " hash blob, " "PRIMARY KEY " - "(account, seq_idx)) WITH " + "((account,seq_first_byte), seq_idx)) WITH " "CLUSTERING ORDER BY (seq_idx desc)" << " AND default_time_to_live = " << std::to_string(ttl); @@ -1138,14 +1237,15 @@ CassandraBackend::open(bool readOnly) query.str(""); query << " INSERT INTO " << tablePrefix << "account_tx" - << " (account, seq_idx, hash) " - << " VALUES (?,?,?)"; + << " (account, seq_first_byte, seq_idx, hash) " + << " VALUES (?,?,?,?)"; if (!insertAccountTx_.prepareStatement(query, session_.get())) continue; query.str(""); query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx" << " WHERE account = ? " + << " AND seq_first_byte = ? " << " AND seq_idx < ? LIMIT ?"; if (!selectAccountTx_.prepareStatement(query, session_.get())) continue; diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index 92ea0889..f62343f4 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -655,49 +655,14 @@ public: fetchAccountTransactions( ripple::AccountID const& account, std::uint32_t limit, - std::optional const& cursor) const override - { - BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx"; - CassandraStatement statement{selectAccountTx_}; - statement.bindBytes(account); - if (cursor) - statement.bindIntTuple( - cursor->ledgerSequence, cursor->transactionIndex); - else - statement.bindIntTuple(INT32_MAX, INT32_MAX); - statement.bindUInt(limit); - CassandraResult result = executeSyncRead(statement); - if (!result.hasResult()) - { - BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned"; - return {{}, {}}; - } - - std::vector hashes; - size_t numRows = result.numRows(); - bool returnCursor = numRows == limit; - std::optional retCursor; - - BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows); - do - { - hashes.push_back(result.getUInt256()); - --numRows; - if (numRows == 0 && returnCursor) - { - auto [lgrSeq, txnIdx] = result.getInt64Tuple(); - retCursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; - } - } while (result.nextRow()); - - BOOST_LOG_TRIVIAL(debug) - << "doAccountTx - populated hashes. num hashes = " << hashes.size(); - if (hashes.size()) - { - return {fetchTransactions(hashes), retCursor}; - } - return {{}, {}}; - } + std::optional const& cursor) const override; + std::pair< + std::vector, + std::optional> + doFetchAccountTransactions( + ripple::AccountID const& account, + std::uint32_t limit, + std::optional const& cursor) const; bool doFinishWrites() const override diff --git a/test.py b/test.py index 07a45cf6..36126d8a 100755 --- a/test.py +++ b/test.py @@ -347,9 +347,6 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No else: print(res) break - if numCalls > numPages: - print("breaking") - break return results except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -1043,7 +1040,7 @@ def run(args): args.account = res["transaction"]["Account"] print("starting") res = asyncio.get_event_loop().run_until_complete( - account_tx_full(args.ip, args.port, args.account, args.binary,None,None,int(args.numPages))) + account_tx_full(args.ip, args.port, args.account, args.binary,None,None)) rng = getMinAndMax(res) print(len(res["transactions"])) print(args.account)