diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 6815f2b5..d41085b3 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -51,6 +51,13 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) delete &requestParams; } } +template +void +processAsyncWrite(CassFuture* fut, void* cbData) +{ + T& requestParams = *static_cast(cbData); + processAsyncWriteResponse(requestParams, fut, requestParams.retry); +} // Process the result of an asynchronous write. Retry on error // @param fut cassandra future associated with the write // @param cbData struct that holds the request parameters @@ -217,6 +224,54 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData) } } +template +struct CallbackData +{ + CassandraBackend const* backend; + T data; + F retry; + uint32_t currentRetries; + std::atomic refs = 1; + + CallbackData(CassandraBackend const* b, T&& d, F f) + : backend(b), data(std::move(d)), retry(f) + { + } +}; + +void +CassandraBackend::writeTransaction( + std::string&& hash, + uint32_t seq, + std::string&& transaction, + std::string&& metadata) const +{ + BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; + std::string hashCpy = hash; + auto func = [this](auto& params, bool retry) { + CassandraStatement statement{insertLedgerTransaction_}; + statement.bindInt(params.data.first); + statement.bindBytes(params.data.second); + executeAsyncWrite( + statement, + processAsyncWrite< + typename std::remove_reference::type>, + params, + retry); + }; + auto* lgrSeqToHash = + new CallbackData(this, std::make_pair(seq, std::move(hashCpy)), func); + WriteTransactionCallbackData* data = new WriteTransactionCallbackData( + this, + std::move(hash), + seq, + std::move(transaction), + std::move(metadata)); + + writeTransaction(*data, false); + func(*lgrSeqToHash, false); +} + std::optional CassandraBackend::fetchLedgerRange() const { @@ -243,21 +298,8 @@ CassandraBackend::fetchLedgerRange() const std::vector CassandraBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const { - CassandraStatement statement{selectAllTransactionsInLedger_}; - statement.bindInt(ledgerSequence); - CassandraResult result = executeSyncRead(statement); - if (!result) - { - BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; - return {}; - } - std::vector txns; - do - { - txns.push_back( - {result.getBytes(), result.getBytes(), result.getUInt32()}); - } while (result.nextRow()); - return txns; + auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence); + return fetchTransactions(hashes); } std::vector CassandraBackend::fetchAllTransactionHashesInLedger( @@ -1373,6 +1415,14 @@ CassandraBackend::open(bool readOnly) << " WITH default_time_to_live = " << std::to_string(ttl); if (!executeSimpleStatement(query.str())) continue; + query.str(""); + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix + << "ledger_transactions" + << " ( ledger_sequence bigint, hash blob, PRIMARY " + "KEY(ledger_sequence, hash))" + << " WITH default_time_to_live = " << std::to_string(ttl); + if (!executeSimpleStatement(query.str())) + continue; query.str(""); query << "SELECT * FROM " << tablePrefix << "transactions" @@ -1483,6 +1533,12 @@ CassandraBackend::open(bool readOnly) "?, ?)"; if (!insertTransaction_.prepareStatement(query, session_.get())) continue; + query.str(""); + query << "INSERT INTO " << tablePrefix << "ledger_transactions" + << " (ledger_sequence, hash) VALUES " + "(?, ?)"; + if (!insertLedgerTransaction_.prepareStatement(query, session_.get())) + continue; query.str(""); query << "INSERT INTO " << tablePrefix << "keys" @@ -1519,7 +1575,7 @@ CassandraBackend::open(bool readOnly) query, session_.get())) continue; query.str(""); - query << "SELECT hash FROM " << tablePrefix << "transactions" + query << "SELECT hash FROM " << tablePrefix << "ledger_transactions" << " WHERE ledger_sequence = ?"; if (!selectAllTransactionHashesInLedger_.prepareStatement( query, session_.get())) diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index 4f958530..f78f1b88 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -26,13 +26,13 @@ #include #include #include +#include +#include #include #include #include #include #include -#include -#include namespace Backend { @@ -633,6 +633,7 @@ private: // than making a new statement CassandraPreparedStatement insertObject_; CassandraPreparedStatement insertTransaction_; + CassandraPreparedStatement insertLedgerTransaction_; CassandraPreparedStatement selectTransaction_; CassandraPreparedStatement selectAllTransactionsInLedger_; CassandraPreparedStatement selectAllTransactionHashesInLedger_; @@ -1319,18 +1320,7 @@ public: std::string&& hash, uint32_t seq, std::string&& transaction, - std::string&& metadata) const override - { - BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; - WriteTransactionCallbackData* data = new WriteTransactionCallbackData( - this, - std::move(hash), - seq, - std::move(transaction), - std::move(metadata)); - - writeTransaction(*data, false); - } + std::string&& metadata) const override; void startWrites() const override diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 35eff2ae..0cb2a7a7 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -529,7 +529,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) uint32_t currentSequence = startSequence; int counter = 0; - std::atomic_int per = 4; + std::atomic_int per = 100; auto startTimer = [this, &per]() { auto innerFunc = [this, &per](auto& f) -> void { std::shared_ptr timer = @@ -549,7 +549,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) }; innerFunc(innerFunc); }; - startTimer(); + // startTimer(); + auto begin = std::chrono::system_clock::now(); while (!writeConflict) @@ -608,6 +609,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) deleting_ = false; }); } + /* if (++counter >= per) { std::chrono::milliseconds sleep = @@ -619,6 +621,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) counter = 0; begin = std::chrono::system_clock::now(); } + */ } }}; diff --git a/test.py b/test.py index 9b821a46..07a45cf6 100755 --- a/test.py +++ b/test.py @@ -699,11 +699,14 @@ async def ledgers(ip, port, minLedger, maxLedger, transactions, expand, maxCalls start = datetime.datetime.now().timestamp() await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":True, "transactions":bool(transactions),"expand":bool(expand)})) res = json.loads(await ws.recv()) - print(res["header"]["blob"]) end = datetime.datetime.now().timestamp() if (end - start) > 0.1: - print("request took more than 100ms") + print("request took more than 100ms : " + str(end - start)) numCalls = numCalls + 1 + if "error" in res: + print(res["error"]) + else: + print(res["header"]["blob"]) except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -842,8 +845,13 @@ args = parser.parse_args() def run(args): asyncio.set_event_loop(asyncio.new_event_loop()) - if(args.ledger is None): - args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1] + rng =asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)) + if args.ledger is None: + args.ledger = rng[1] + if args.maxLedger == -1: + args.maxLedger = rng[1] + if args.minLedger == -1: + args.minLedger = rng[0] if args.action == "fee": asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port)) elif args.action == "server_info": @@ -891,6 +899,7 @@ def run(args): end = datetime.datetime.now().timestamp() num = int(args.numRunners) * int(args.numCalls) print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + print("Latency = " + str((end - start) / int(args.numCalls)) + " seconds") elif args.action == "ledger_entries": keys = [] ledger_index = 0