From d62f7accfea8ff9ff9d013e1bfe7868230d24138 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Thu, 11 Mar 2021 13:14:21 -0500 Subject: [PATCH] fix race condition in Postgres backend. add cursor to account_tx response. test script verifies account_tx --- handlers/AccountTx.cpp | 8 +++- reporting/PostgresBackend.cpp | 79 +++++++++++++++------------------ reporting/PostgresBackend.h | 2 +- test.py | 82 ++++++++++++++++++++++++++++++----- 4 files changed, 115 insertions(+), 56 deletions(-) diff --git a/handlers/AccountTx.cpp b/handlers/AccountTx.cpp index cbab3203..8b35e293 100644 --- a/handlers/AccountTx.cpp +++ b/handlers/AccountTx.cpp @@ -209,7 +209,13 @@ doAccountTx(boost::json::object const& request, BackendInterface const& backend) txns.push_back(obj); } response["transactions"] = txns; - response["cursor"] = {}; + if (retCursor) + { + boost::json::object cursorJson; + cursorJson["ledger_sequence"] = retCursor->ledgerSequence; + cursorJson["transaction_index"] = retCursor->transactionIndex; + response["cursor"] = cursorJson; + } return response; } diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 62335b93..73c41f74 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -12,35 +12,13 @@ PostgresBackend::writeLedger( std::string&& ledgerHeader, bool isFirst) const { - PgQuery pgQuery(pgPool_); - BOOST_LOG_TRIVIAL(debug) << __func__; - auto cmd = boost::format( - R"(INSERT INTO ledgers - VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))"); - - auto ledgerInsert = boost::str( - cmd % ledgerInfo.seq % ripple::strHex(ledgerInfo.hash) % - ripple::strHex(ledgerInfo.parentHash) % ledgerInfo.drops.drops() % - ledgerInfo.closeTime.time_since_epoch().count() % - ledgerInfo.parentCloseTime.time_since_epoch().count() % - ledgerInfo.closeTimeResolution.count() % ledgerInfo.closeFlags % - ripple::strHex(ledgerInfo.accountHash) % - ripple::strHex(ledgerInfo.txHash)); - BOOST_LOG_TRIVIAL(info) << __func__ << " : " - << " : " - << "query string = " << ledgerInsert; - - auto res = pgQuery(ledgerInsert.data()); - - abortWrite_ = !res; + ledgerHeader_ = ledgerInfo; } void PostgresBackend::writeAccountTransactions( std::vector&& data) const { - if (abortWrite_) - return; PgQuery pg(pgPool_); for (auto const& record : data) { @@ -64,8 +42,6 @@ PostgresBackend::writeLedgerObject( bool isDeleted, std::optional&& book) const { - if (abortWrite_) - return; objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t' << std::to_string(seq) << '\t' << "\\\\x" << ripple::strHex(blob) << '\n'; @@ -94,8 +70,6 @@ PostgresBackend::writeTransaction( std::string&& transaction, std::string&& metadata) const { - if (abortWrite_) - return; transactionsBuffer_ << "\\\\x" << ripple::strHex(hash) << '\t' << std::to_string(seq) << '\t' << "\\\\x" << ripple::strHex(transaction) << '\t' << "\\\\x" @@ -476,9 +450,12 @@ PostgresBackend::fetchAccountTransactions( "account_transactions WHERE account = " << "\'\\x" << ripple::strHex(account) << "\'"; if (cursor) - sql << " AND ledger_seq < " << cursor->ledgerSequence - << " AND transaction_index < " << cursor->transactionIndex; + sql << " AND (ledger_seq < " << cursor->ledgerSequence + << " OR (ledger_seq = " << cursor->ledgerSequence + << " AND transaction_index < " << cursor->transactionIndex << "))"; + sql << " ORDER BY ledger_seq DESC, transaction_index DESC"; sql << " LIMIT " << std::to_string(limit); + BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << sql.str(); auto res = pgQuery(sql.str().data()); if (size_t numRows = checkResult(res, 3)) { @@ -515,6 +492,12 @@ PostgresBackend::close() void PostgresBackend::startWrites() const +{ + numRowsInObjectsBuffer_ = 0; +} + +bool +PostgresBackend::finishWrites() const { PgQuery pg(pgPool_); auto res = pg("BEGIN"); @@ -524,22 +507,30 @@ PostgresBackend::startWrites() const msg << "Postgres error creating transaction: " << res.msg(); throw std::runtime_error(msg.str()); } - numRowsInObjectsBuffer_ = 0; -} + auto cmd = boost::format( + R"(INSERT INTO ledgers + VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))"); -bool -PostgresBackend::finishWrites() const -{ - if (abortWrite_) - return false; - PgQuery pg(pgPool_); - pg.bulkInsert("transactions", transactionsBuffer_.str()); - pg.bulkInsert("books", booksBuffer_.str()); - pg.bulkInsert("account_transactions", accountTxBuffer_.str()); - std::string objectsStr = objectsBuffer_.str(); - if (objectsStr.size()) - pg.bulkInsert("objects", objectsStr); - auto res = pg("COMMIT"); + auto ledgerInsert = boost::str( + cmd % ledgerHeader_.seq % ripple::strHex(ledgerHeader_.hash) % + ripple::strHex(ledgerHeader_.parentHash) % ledgerHeader_.drops.drops() % + ledgerHeader_.closeTime.time_since_epoch().count() % + ledgerHeader_.parentCloseTime.time_since_epoch().count() % + ledgerHeader_.closeTimeResolution.count() % ledgerHeader_.closeFlags % + ripple::strHex(ledgerHeader_.accountHash) % + ripple::strHex(ledgerHeader_.txHash)); + + res = pg(ledgerInsert.data()); + if (res) + { + pg.bulkInsert("transactions", transactionsBuffer_.str()); + pg.bulkInsert("books", booksBuffer_.str()); + pg.bulkInsert("account_transactions", accountTxBuffer_.str()); + std::string objectsStr = objectsBuffer_.str(); + if (objectsStr.size()) + pg.bulkInsert("objects", objectsStr); + } + res = pg("COMMIT"); if (!res || res.status() != PGRES_COMMAND_OK) { std::stringstream msg; diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index 238d5749..ed9fc5b6 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -12,7 +12,7 @@ private: mutable std::stringstream transactionsBuffer_; mutable std::stringstream booksBuffer_; mutable std::stringstream accountTxBuffer_; - mutable bool abortWrite_ = false; + mutable ripple::LedgerInfo ledgerHeader_; public: std::shared_ptr pgPool_; diff --git a/test.py b/test.py index f54e7c03..b36dabce 100755 --- a/test.py +++ b/test.py @@ -54,18 +54,26 @@ def compareTx(aldous, p2p): return True def compareAccountTx(aldous, p2p): - p2p = p2p["result"] + print(p2p) + if "result" in p2p: + p2p = p2p["result"] + maxLedger = getMinAndMax(aldous)[1] + minLedger = getMinAndMax(p2p)[0] p2pTxns = [] p2pMetas = [] p2pLedgerSequences = [] for x in p2p["transactions"]: + if int(x["ledger_index"]) > maxLedger: + continue p2pTxns.append(x["tx_blob"]) p2pMetas.append(x["meta"]) - p2pLedgerSequences.append(x["ledger_sequence"]) + p2pLedgerSequences.append(x["ledger_index"]) aldousTxns = [] aldousMetas = [] aldousLedgerSequences = [] for x in aldous["transactions"]: + if int(x["ledger_sequence"]) < minLedger: + continue aldousTxns.append(x["transaction"]) aldousMetas.append(x["metadata"]) aldousLedgerSequences.append(x["ledger_sequence"]) @@ -78,10 +86,13 @@ def compareAccountTx(aldous, p2p): aldousLedgerSequences.sort() if p2pTxns == aldousTxns and p2pMetas == aldousMetas and p2pLedgerSequences == aldousLedgerSequences: print("Responses match!!!") + print(len(aldousTxns)) + print(len(p2pTxns)) else: print("Mismatch responses") - print(aldous) - print(p2p) + print(len(aldousTxns)) + print(len(p2pTxns)) + print(maxLedger) def compareLedgerData(aldous, p2p): aldous[0].sort() @@ -129,25 +140,67 @@ def getMinAndMax(res): minSeq = None maxSeq = None for x in res["transactions"]: - seq = int(x["ledger_sequence"]) + print(x) + seq = None + if "ledger_sequence" in x: + seq = int(x["ledger_sequence"]) + else: + seq = int(x["ledger_index"]) if minSeq is None or seq < minSeq: minSeq = seq if maxSeq is None or seq > maxSeq: maxSeq = seq return (minSeq,maxSeq) +async def account_tx(ip, port, account, binary): -async def account_tx(ip, port, account, binary, minLedger,maxLedger): address = 'ws://' + str(ip) + ':' + str(port) try: async with websockets.connect(address) as ws: - await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"ledger_index_min":int(minLedger),"ledger_index_max":int(maxLedger)})) + await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200})) res = json.loads(await ws.recv()) print(json.dumps(res,indent=4,sort_keys=True)) return res except websockets.exceptions.ConnectionClosedError as e: print(e) +async def account_tx_full(ip, port, account, binary): + address = 'ws://' + str(ip) + ':' + str(port) + try: + cursor = None + marker = None + req = {"command":"account_tx","account":account, "binary":bool(binary),"limit":200} + results = {"transactions":[]} + numCalls = 0 + async with websockets.connect(address) as ws: + while True: + numCalls = numCalls+1 + if not cursor is None: + req["cursor"] = cursor + if not marker is None: + req["marker"] = marker + await ws.send(json.dumps(req)) + res = json.loads(await ws.recv()) + #print(json.dumps(res,indent=4,sort_keys=True)) + if "result" in res: + print(len(res["result"]["transactions"])) + else: + print(len(res["transactions"])) + if "result" in res: + results["transactions"].extend(res["result"]["transactions"]) + else: + results["transactions"].extend(res["transactions"]) + if "cursor" in res: + cursor = {"ledger_sequence":res["cursor"]["ledger_sequence"],"transaction_index":res["cursor"]["transaction_index"]} + elif "result" in res and "marker" in res["result"]: + print(res["result"]["marker"]) + marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]} + else: + break + return results + except websockets.exceptions.ConnectionClosedError as e: + print(e) + async def tx(ip, port, tx_hash, binary): address = 'ws://' + str(ip) + ':' + str(port) try: @@ -315,7 +368,7 @@ async def ledger_range(ip, port): print(e) parser = argparse.ArgumentParser(description='test script for xrpl-reporting') -parser.add_argument('action', choices=["account_info", "tx", "account_tx", "ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range"]) +parser.add_argument('action', choices=["account_info", "tx", "account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range"]) parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--port', default='8080') parser.add_argument('--hash') @@ -332,6 +385,8 @@ parser.add_argument('--verify',default=False) parser.add_argument('--binary',default=False) parser.add_argument('--expand',default=False) parser.add_argument('--transactions',default=False) +parser.add_argument('--minLedger',default=-1) +parser.add_argument('--maxLedger',default=-1) @@ -358,11 +413,18 @@ def run(args): res = asyncio.get_event_loop().run_until_complete( account_tx(args.ip, args.port, args.account, args.binary)) if args.verify: - minMax = getMinAndMax(res) res2 = asyncio.get_event_loop().run_until_complete( - account_tx(args.ip, args.port, args.account, args.binary, minMax[0],minMax[1])) + account_tx(args.p2pIp, args.p2pPort, args.account, args.binary)) print(compareAccountTx(res,res2)) + elif args.action == "account_tx_full": + res = asyncio.get_event_loop().run_until_complete( + account_tx_full(args.ip, args.port, args.account, args.binary)) + print(len(res["transactions"])) + if args.verify: + res2 = asyncio.get_event_loop().run_until_complete( + account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary)) + print(compareAccountTx(res,res2)) elif args.action == "ledger_data": asyncio.get_event_loop().run_until_complete( ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary))