fix race condition in Postgres backend. add cursor to account_tx response. test script verifies account_tx

This commit is contained in:
CJ Cobb
2021-03-11 13:14:21 -05:00
parent b340f24e2c
commit d62f7accfe
4 changed files with 115 additions and 56 deletions

View File

@@ -209,7 +209,13 @@ doAccountTx(boost::json::object const& request, BackendInterface const& backend)
txns.push_back(obj); txns.push_back(obj);
} }
response["transactions"] = txns; response["transactions"] = txns;
response["cursor"] = {}; if (retCursor)
{
boost::json::object cursorJson;
cursorJson["ledger_sequence"] = retCursor->ledgerSequence;
cursorJson["transaction_index"] = retCursor->transactionIndex;
response["cursor"] = cursorJson;
}
return response; return response;
} }

View File

@@ -12,35 +12,13 @@ PostgresBackend::writeLedger(
std::string&& ledgerHeader, std::string&& ledgerHeader,
bool isFirst) const bool isFirst) const
{ {
PgQuery pgQuery(pgPool_); ledgerHeader_ = ledgerInfo;
BOOST_LOG_TRIVIAL(debug) << __func__;
auto cmd = boost::format(
R"(INSERT INTO ledgers
VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
auto ledgerInsert = boost::str(
cmd % ledgerInfo.seq % ripple::strHex(ledgerInfo.hash) %
ripple::strHex(ledgerInfo.parentHash) % ledgerInfo.drops.drops() %
ledgerInfo.closeTime.time_since_epoch().count() %
ledgerInfo.parentCloseTime.time_since_epoch().count() %
ledgerInfo.closeTimeResolution.count() % ledgerInfo.closeFlags %
ripple::strHex(ledgerInfo.accountHash) %
ripple::strHex(ledgerInfo.txHash));
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< " : "
<< "query string = " << ledgerInsert;
auto res = pgQuery(ledgerInsert.data());
abortWrite_ = !res;
} }
void void
PostgresBackend::writeAccountTransactions( PostgresBackend::writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const std::vector<AccountTransactionsData>&& data) const
{ {
if (abortWrite_)
return;
PgQuery pg(pgPool_); PgQuery pg(pgPool_);
for (auto const& record : data) for (auto const& record : data)
{ {
@@ -64,8 +42,6 @@ PostgresBackend::writeLedgerObject(
bool isDeleted, bool isDeleted,
std::optional<ripple::uint256>&& book) const std::optional<ripple::uint256>&& book) const
{ {
if (abortWrite_)
return;
objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t' objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x" << std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(blob) << '\n'; << ripple::strHex(blob) << '\n';
@@ -94,8 +70,6 @@ PostgresBackend::writeTransaction(
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const std::string&& metadata) const
{ {
if (abortWrite_)
return;
transactionsBuffer_ << "\\\\x" << ripple::strHex(hash) << '\t' transactionsBuffer_ << "\\\\x" << ripple::strHex(hash) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x" << std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(transaction) << '\t' << "\\\\x" << ripple::strHex(transaction) << '\t' << "\\\\x"
@@ -476,9 +450,12 @@ PostgresBackend::fetchAccountTransactions(
"account_transactions WHERE account = " "account_transactions WHERE account = "
<< "\'\\x" << ripple::strHex(account) << "\'"; << "\'\\x" << ripple::strHex(account) << "\'";
if (cursor) if (cursor)
sql << " AND ledger_seq < " << cursor->ledgerSequence sql << " AND (ledger_seq < " << cursor->ledgerSequence
<< " AND transaction_index < " << cursor->transactionIndex; << " OR (ledger_seq = " << cursor->ledgerSequence
<< " AND transaction_index < " << cursor->transactionIndex << "))";
sql << " ORDER BY ledger_seq DESC, transaction_index DESC";
sql << " LIMIT " << std::to_string(limit); sql << " LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << sql.str();
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 3)) if (size_t numRows = checkResult(res, 3))
{ {
@@ -515,6 +492,12 @@ PostgresBackend::close()
void void
PostgresBackend::startWrites() const PostgresBackend::startWrites() const
{
numRowsInObjectsBuffer_ = 0;
}
bool
PostgresBackend::finishWrites() const
{ {
PgQuery pg(pgPool_); PgQuery pg(pgPool_);
auto res = pg("BEGIN"); auto res = pg("BEGIN");
@@ -524,22 +507,30 @@ PostgresBackend::startWrites() const
msg << "Postgres error creating transaction: " << res.msg(); msg << "Postgres error creating transaction: " << res.msg();
throw std::runtime_error(msg.str()); throw std::runtime_error(msg.str());
} }
numRowsInObjectsBuffer_ = 0; auto cmd = boost::format(
} R"(INSERT INTO ledgers
VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
bool auto ledgerInsert = boost::str(
PostgresBackend::finishWrites() const cmd % ledgerHeader_.seq % ripple::strHex(ledgerHeader_.hash) %
{ ripple::strHex(ledgerHeader_.parentHash) % ledgerHeader_.drops.drops() %
if (abortWrite_) ledgerHeader_.closeTime.time_since_epoch().count() %
return false; ledgerHeader_.parentCloseTime.time_since_epoch().count() %
PgQuery pg(pgPool_); ledgerHeader_.closeTimeResolution.count() % ledgerHeader_.closeFlags %
ripple::strHex(ledgerHeader_.accountHash) %
ripple::strHex(ledgerHeader_.txHash));
res = pg(ledgerInsert.data());
if (res)
{
pg.bulkInsert("transactions", transactionsBuffer_.str()); pg.bulkInsert("transactions", transactionsBuffer_.str());
pg.bulkInsert("books", booksBuffer_.str()); pg.bulkInsert("books", booksBuffer_.str());
pg.bulkInsert("account_transactions", accountTxBuffer_.str()); pg.bulkInsert("account_transactions", accountTxBuffer_.str());
std::string objectsStr = objectsBuffer_.str(); std::string objectsStr = objectsBuffer_.str();
if (objectsStr.size()) if (objectsStr.size())
pg.bulkInsert("objects", objectsStr); pg.bulkInsert("objects", objectsStr);
auto res = pg("COMMIT"); }
res = pg("COMMIT");
if (!res || res.status() != PGRES_COMMAND_OK) if (!res || res.status() != PGRES_COMMAND_OK)
{ {
std::stringstream msg; std::stringstream msg;

View File

@@ -12,7 +12,7 @@ private:
mutable std::stringstream transactionsBuffer_; mutable std::stringstream transactionsBuffer_;
mutable std::stringstream booksBuffer_; mutable std::stringstream booksBuffer_;
mutable std::stringstream accountTxBuffer_; mutable std::stringstream accountTxBuffer_;
mutable bool abortWrite_ = false; mutable ripple::LedgerInfo ledgerHeader_;
public: public:
std::shared_ptr<PgPool> pgPool_; std::shared_ptr<PgPool> pgPool_;

78
test.py
View File

@@ -54,18 +54,26 @@ def compareTx(aldous, p2p):
return True return True
def compareAccountTx(aldous, p2p): def compareAccountTx(aldous, p2p):
print(p2p)
if "result" in p2p:
p2p = p2p["result"] p2p = p2p["result"]
maxLedger = getMinAndMax(aldous)[1]
minLedger = getMinAndMax(p2p)[0]
p2pTxns = [] p2pTxns = []
p2pMetas = [] p2pMetas = []
p2pLedgerSequences = [] p2pLedgerSequences = []
for x in p2p["transactions"]: for x in p2p["transactions"]:
if int(x["ledger_index"]) > maxLedger:
continue
p2pTxns.append(x["tx_blob"]) p2pTxns.append(x["tx_blob"])
p2pMetas.append(x["meta"]) p2pMetas.append(x["meta"])
p2pLedgerSequences.append(x["ledger_sequence"]) p2pLedgerSequences.append(x["ledger_index"])
aldousTxns = [] aldousTxns = []
aldousMetas = [] aldousMetas = []
aldousLedgerSequences = [] aldousLedgerSequences = []
for x in aldous["transactions"]: for x in aldous["transactions"]:
if int(x["ledger_sequence"]) < minLedger:
continue
aldousTxns.append(x["transaction"]) aldousTxns.append(x["transaction"])
aldousMetas.append(x["metadata"]) aldousMetas.append(x["metadata"])
aldousLedgerSequences.append(x["ledger_sequence"]) aldousLedgerSequences.append(x["ledger_sequence"])
@@ -78,10 +86,13 @@ def compareAccountTx(aldous, p2p):
aldousLedgerSequences.sort() aldousLedgerSequences.sort()
if p2pTxns == aldousTxns and p2pMetas == aldousMetas and p2pLedgerSequences == aldousLedgerSequences: if p2pTxns == aldousTxns and p2pMetas == aldousMetas and p2pLedgerSequences == aldousLedgerSequences:
print("Responses match!!!") print("Responses match!!!")
print(len(aldousTxns))
print(len(p2pTxns))
else: else:
print("Mismatch responses") print("Mismatch responses")
print(aldous) print(len(aldousTxns))
print(p2p) print(len(p2pTxns))
print(maxLedger)
def compareLedgerData(aldous, p2p): def compareLedgerData(aldous, p2p):
aldous[0].sort() aldous[0].sort()
@@ -129,25 +140,67 @@ def getMinAndMax(res):
minSeq = None minSeq = None
maxSeq = None maxSeq = None
for x in res["transactions"]: for x in res["transactions"]:
print(x)
seq = None
if "ledger_sequence" in x:
seq = int(x["ledger_sequence"]) seq = int(x["ledger_sequence"])
else:
seq = int(x["ledger_index"])
if minSeq is None or seq < minSeq: if minSeq is None or seq < minSeq:
minSeq = seq minSeq = seq
if maxSeq is None or seq > maxSeq: if maxSeq is None or seq > maxSeq:
maxSeq = seq maxSeq = seq
return (minSeq,maxSeq) return (minSeq,maxSeq)
async def account_tx(ip, port, account, binary):
async def account_tx(ip, port, account, binary, minLedger,maxLedger):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"ledger_index_min":int(minLedger),"ledger_index_max":int(maxLedger)})) await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True)) print(json.dumps(res,indent=4,sort_keys=True))
return res return res
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
print(e) print(e)
async def account_tx_full(ip, port, account, binary):
address = 'ws://' + str(ip) + ':' + str(port)
try:
cursor = None
marker = None
req = {"command":"account_tx","account":account, "binary":bool(binary),"limit":200}
results = {"transactions":[]}
numCalls = 0
async with websockets.connect(address) as ws:
while True:
numCalls = numCalls+1
if not cursor is None:
req["cursor"] = cursor
if not marker is None:
req["marker"] = marker
await ws.send(json.dumps(req))
res = json.loads(await ws.recv())
#print(json.dumps(res,indent=4,sort_keys=True))
if "result" in res:
print(len(res["result"]["transactions"]))
else:
print(len(res["transactions"]))
if "result" in res:
results["transactions"].extend(res["result"]["transactions"])
else:
results["transactions"].extend(res["transactions"])
if "cursor" in res:
cursor = {"ledger_sequence":res["cursor"]["ledger_sequence"],"transaction_index":res["cursor"]["transaction_index"]}
elif "result" in res and "marker" in res["result"]:
print(res["result"]["marker"])
marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]}
else:
break
return results
except websockets.exceptions.ConnectionClosedError as e:
print(e)
async def tx(ip, port, tx_hash, binary): async def tx(ip, port, tx_hash, binary):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
@@ -315,7 +368,7 @@ async def ledger_range(ip, port):
print(e) print(e)
parser = argparse.ArgumentParser(description='test script for xrpl-reporting') parser = argparse.ArgumentParser(description='test script for xrpl-reporting')
parser.add_argument('action', choices=["account_info", "tx", "account_tx", "ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range"]) parser.add_argument('action', choices=["account_info", "tx", "account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range"])
parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--ip', default='127.0.0.1')
parser.add_argument('--port', default='8080') parser.add_argument('--port', default='8080')
parser.add_argument('--hash') parser.add_argument('--hash')
@@ -332,6 +385,8 @@ parser.add_argument('--verify',default=False)
parser.add_argument('--binary',default=False) parser.add_argument('--binary',default=False)
parser.add_argument('--expand',default=False) parser.add_argument('--expand',default=False)
parser.add_argument('--transactions',default=False) parser.add_argument('--transactions',default=False)
parser.add_argument('--minLedger',default=-1)
parser.add_argument('--maxLedger',default=-1)
@@ -358,11 +413,18 @@ def run(args):
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
account_tx(args.ip, args.port, args.account, args.binary)) account_tx(args.ip, args.port, args.account, args.binary))
if args.verify: if args.verify:
minMax = getMinAndMax(res)
res2 = asyncio.get_event_loop().run_until_complete( res2 = asyncio.get_event_loop().run_until_complete(
account_tx(args.ip, args.port, args.account, args.binary, minMax[0],minMax[1])) account_tx(args.p2pIp, args.p2pPort, args.account, args.binary))
print(compareAccountTx(res,res2)) print(compareAccountTx(res,res2))
elif args.action == "account_tx_full":
res = asyncio.get_event_loop().run_until_complete(
account_tx_full(args.ip, args.port, args.account, args.binary))
print(len(res["transactions"]))
if args.verify:
res2 = asyncio.get_event_loop().run_until_complete(
account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary))
print(compareAccountTx(res,res2))
elif args.action == "ledger_data": elif args.action == "ledger_data":
asyncio.get_event_loop().run_until_complete( asyncio.get_event_loop().run_until_complete(
ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary)) ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary))