From b734884f20c8a8c4150467b15707e333d9410041 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 28 Apr 2021 20:27:52 +0000 Subject: [PATCH] various improvements around testing --- handlers/AccountInfo.cpp | 8 +- handlers/AccountTx.cpp | 15 +- handlers/BookOffers.cpp | 346 ++++++++++++++++--------------- metrics.py | 91 +++++--- test.py | 413 +++++++++++++++++++++++++++++++++++-- websocket_server_async.cpp | 3 + 6 files changed, 669 insertions(+), 207 deletions(-) diff --git a/handlers/AccountInfo.cpp b/handlers/AccountInfo.cpp index 6986ac1a..34581626 100644 --- a/handlers/AccountInfo.cpp +++ b/handlers/AccountInfo.cpp @@ -73,8 +73,12 @@ doAccountInfo( if (!accountID) { - response["error"] = "couldnt decode account"; - return response; + accountID = ripple::AccountID(); + if (!accountID->parseHex(request.at("account").as_string().c_str())) + { + response["error"] = "account malformed"; + return response; + } } auto key = ripple::keylet::account(accountID.value()); diff --git a/handlers/AccountTx.cpp b/handlers/AccountTx.cpp index 8b35e293..c4d4430d 100644 --- a/handlers/AccountTx.cpp +++ b/handlers/AccountTx.cpp @@ -137,12 +137,16 @@ doAccountTx(boost::json::object const& request, BackendInterface const& backend) return response; } - auto const account = ripple::parseBase58( + auto account = ripple::parseBase58( request.at("account").as_string().c_str()); if (!account) { - response["error"] = "account malformed"; - return response; + account = ripple::AccountID(); + if (!account->parseHex(request.at("account").as_string().c_str())) + { + response["error"] = "account malformed"; + return response; + } } auto ledgerSequence = ledgerSequenceFromRequest(request, backend); if (!ledgerSequence) @@ -182,8 +186,11 @@ doAccountTx(boost::json::object const& request, BackendInterface const& backend) request.at("limit").kind() == boost::json::kind::int64) limit = request.at("limit").as_int64(); boost::json::array txns; + auto start = std::chrono::system_clock::now(); auto [blobs, retCursor] = backend.fetchAccountTransactions(*account, limit, cursor); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) << __func__ << " db fetch took " << ((end - start).count() / 1000000000.0) << " num blobs = " << blobs.size(); for (auto const& txnPlusMeta : blobs) { if (txnPlusMeta.ledgerSequence > ledgerSequence) @@ -216,6 +223,8 @@ doAccountTx(boost::json::object const& request, BackendInterface const& backend) cursorJson["transaction_index"] = retCursor->transactionIndex; response["cursor"] = cursorJson; } + auto end2 = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) << __func__ << " serialization took " << ((end2 - end).count() / 1000000000.0); return response; } diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index 3b93ce96..3a134b2e 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -101,196 +101,214 @@ doBookOffers( response["error"] = "Empty database"; return response; } - if (!request.contains("taker_pays")) + ripple::uint256 bookBase; + if (request.contains("book")) { - response["error"] = "Missing field taker_pays"; - return response; - } - - if (!request.contains("taker_gets")) - { - response["error"] = "Missing field taker_gets"; - return response; - } - - boost::json::object taker_pays; - if (request.at("taker_pays").kind() == boost::json::kind::object) - { - taker_pays = request.at("taker_pays").as_object(); - } - else - { - response["error"] = "Invalid field taker_pays"; - return response; - } - - boost::json::object taker_gets; - if (request.at("taker_gets").kind() == boost::json::kind::object) - { - taker_gets = request.at("taker_gets").as_object(); - } - else - { - response["error"] = "Invalid field taker_gets"; - return response; - } - - if (!taker_pays.contains("currency")) - { - response["error"] = "Missing field taker_pays.currency"; - return response; - } - - if (!taker_pays.at("currency").is_string()) - { - response["error"] = "taker_pays.currency should be string"; - return response; - } - - if (!taker_gets.contains("currency")) - { - response["error"] = "Missing field taker_gets.currency"; - return response; - } - - if (!taker_gets.at("currency").is_string()) - { - response["error"] = "taker_gets.currency should be string"; - return response; - } - - ripple::Currency pay_currency; - - if (!ripple::to_currency( - pay_currency, taker_pays.at("currency").as_string().c_str())) - { - response["error"] = - "Invalid field 'taker_pays.currency', bad currency."; - return response; - } - - ripple::Currency get_currency; - - if (!ripple::to_currency( - get_currency, taker_gets["currency"].as_string().c_str())) - { - response["error"] = - "Invalid field 'taker_gets.currency', bad currency."; - return response; - } - - ripple::AccountID pay_issuer; - - if (taker_pays.contains("issuer")) - { - if (!taker_pays.at("issuer").is_string()) + if (!bookBase.parseHex(request.at("book").as_string().c_str())) { - response["error"] = "taker_pays.issuer should be string"; - return response; - } - - if (!ripple::to_issuer( - pay_issuer, taker_pays.at("issuer").as_string().c_str())) - { - response["error"] = - "Invalid field 'taker_pays.issuer', bad issuer."; - return response; - } - - if (pay_issuer == ripple::noAccount()) - { - response["error"] = - "Invalid field 'taker_pays.issuer', bad issuer account one."; + response["error"] = "Error parsing book"; return response; } } else { - pay_issuer = ripple::xrpAccount(); - } - - if (isXRP(pay_currency) && !isXRP(pay_issuer)) - { - response["error"] = - "Unneeded field 'taker_pays.issuer' for XRP currency " - "specification."; - return response; - } - - if (!isXRP(pay_currency) && isXRP(pay_issuer)) - { - response["error"] = - "Invalid field 'taker_pays.issuer', expected non-XRP issuer."; - return response; - } - - ripple::AccountID get_issuer; - - if (taker_gets.contains("issuer")) - { - if (!taker_gets["issuer"].is_string()) + if (!request.contains("taker_pays")) { - response["error"] = "taker_gets.issuer should be string"; + response["error"] = "Missing field taker_pays"; return response; } - if (!ripple::to_issuer( - get_issuer, taker_gets.at("issuer").as_string().c_str())) + if (!request.contains("taker_gets")) + { + response["error"] = "Missing field taker_gets"; + return response; + } + + boost::json::object taker_pays; + if (request.at("taker_pays").kind() == boost::json::kind::object) + { + taker_pays = request.at("taker_pays").as_object(); + } + else + { + response["error"] = "Invalid field taker_pays"; + return response; + } + + boost::json::object taker_gets; + if (request.at("taker_gets").kind() == boost::json::kind::object) + { + taker_gets = request.at("taker_gets").as_object(); + } + else + { + response["error"] = "Invalid field taker_gets"; + return response; + } + + if (!taker_pays.contains("currency")) + { + response["error"] = "Missing field taker_pays.currency"; + return response; + } + + if (!taker_pays.at("currency").is_string()) + { + response["error"] = "taker_pays.currency should be string"; + return response; + } + + if (!taker_gets.contains("currency")) + { + response["error"] = "Missing field taker_gets.currency"; + return response; + } + + if (!taker_gets.at("currency").is_string()) + { + response["error"] = "taker_gets.currency should be string"; + return response; + } + + ripple::Currency pay_currency; + + if (!ripple::to_currency( + pay_currency, taker_pays.at("currency").as_string().c_str())) { response["error"] = - "Invalid field 'taker_gets.issuer', bad issuer."; + "Invalid field 'taker_pays.currency', bad currency."; return response; } - if (get_issuer == ripple::noAccount()) + ripple::Currency get_currency; + + if (!ripple::to_currency( + get_currency, taker_gets["currency"].as_string().c_str())) { response["error"] = - "Invalid field 'taker_gets.issuer', bad issuer account one."; + "Invalid field 'taker_gets.currency', bad currency."; return response; } - } - else - { - get_issuer = ripple::xrpAccount(); - } - if (ripple::isXRP(get_currency) && !ripple::isXRP(get_issuer)) - { - response["error"] = - "Unneeded field 'taker_gets.issuer' for XRP currency " - "specification."; - return response; - } + ripple::AccountID pay_issuer; - if (!ripple::isXRP(get_currency) && ripple::isXRP(get_issuer)) - { - response["error"] = - "Invalid field 'taker_gets.issuer', expected non-XRP issuer."; - return response; - } - - boost::optional takerID; - if (request.contains("taker")) - { - if (!request.at("taker").is_string()) + if (taker_pays.contains("issuer")) { - response["error"] = "taker should be string"; - return response; - } + if (!taker_pays.at("issuer").is_string()) + { + response["error"] = "taker_pays.issuer should be string"; + return response; + } - takerID = ripple::parseBase58( - request.at("taker").as_string().c_str()); - if (!takerID) + if (!ripple::to_issuer( + pay_issuer, taker_pays.at("issuer").as_string().c_str())) + { + response["error"] = + "Invalid field 'taker_pays.issuer', bad issuer."; + return response; + } + + if (pay_issuer == ripple::noAccount()) + { + response["error"] = + "Invalid field 'taker_pays.issuer', bad issuer account " + "one."; + return response; + } + } + else { - response["error"] = "Invalid taker"; + pay_issuer = ripple::xrpAccount(); + } + + if (isXRP(pay_currency) && !isXRP(pay_issuer)) + { + response["error"] = + "Unneeded field 'taker_pays.issuer' for XRP currency " + "specification."; return response; } - } - if (pay_currency == get_currency && pay_issuer == get_issuer) - { - response["error"] = "Bad market"; - return response; + if (!isXRP(pay_currency) && isXRP(pay_issuer)) + { + response["error"] = + "Invalid field 'taker_pays.issuer', expected non-XRP issuer."; + return response; + } + + ripple::AccountID get_issuer; + + if (taker_gets.contains("issuer")) + { + if (!taker_gets["issuer"].is_string()) + { + response["error"] = "taker_gets.issuer should be string"; + return response; + } + + if (!ripple::to_issuer( + get_issuer, taker_gets.at("issuer").as_string().c_str())) + { + response["error"] = + "Invalid field 'taker_gets.issuer', bad issuer."; + return response; + } + + if (get_issuer == ripple::noAccount()) + { + response["error"] = + "Invalid field 'taker_gets.issuer', bad issuer account " + "one."; + return response; + } + } + else + { + get_issuer = ripple::xrpAccount(); + } + + if (ripple::isXRP(get_currency) && !ripple::isXRP(get_issuer)) + { + response["error"] = + "Unneeded field 'taker_gets.issuer' for XRP currency " + "specification."; + return response; + } + + if (!ripple::isXRP(get_currency) && ripple::isXRP(get_issuer)) + { + response["error"] = + "Invalid field 'taker_gets.issuer', expected non-XRP issuer."; + return response; + } + + boost::optional takerID; + if (request.contains("taker")) + { + if (!request.at("taker").is_string()) + { + response["error"] = "taker should be string"; + return response; + } + + takerID = ripple::parseBase58( + request.at("taker").as_string().c_str()); + if (!takerID) + { + response["error"] = "Invalid taker"; + return response; + } + } + + if (pay_currency == get_currency && pay_issuer == get_issuer) + { + response["error"] = "Bad market"; + return response; + } + ripple::Book book = { + {pay_currency, pay_issuer}, {get_currency, get_issuer}}; + + bookBase = getBookBase(book); } std::uint32_t limit = 200; @@ -305,10 +323,6 @@ doBookOffers( cursor->parseHex(request.at("cursor").as_string().c_str()); } - ripple::Book book = { - {pay_currency, pay_issuer}, {get_currency, get_issuer}}; - - ripple::uint256 bookBase = getBookBase(book); auto start = std::chrono::system_clock::now(); auto [offers, retCursor] = backend.fetchBookOffers(bookBase, *ledgerSequence, limit, cursor); diff --git a/metrics.py b/metrics.py index e13eb630..4fa96bfd 100644 --- a/metrics.py +++ b/metrics.py @@ -10,6 +10,34 @@ def getTime(line): timestamp = datetime.strptime(timestampSub, '%Y-%m-%d %H:%M:%S.%f') return timestamp.timestamp() +def parseAccountTx(filename): + + + with open(filename) as f: + totalProcTime = 0.0 + totalTxnTime = 0.0 + numCalls = 0 + for line in f: + if "executed stored_procedure" in line: + idx = line.find("in ") + idx = idx + 3 + idx2 = line.find("num") + procTime = float(line[idx:idx2]) + totalProcTime += procTime + if "fetchTransactions fetched" in line: + idx = line.find("took ") + idx = idx + 5 + txnTime = float(line[idx:]) + totalTxnTime += txnTime + numCalls = numCalls + 1 + print(totalProcTime) + print(totalProcTime/numCalls) + print(totalTxnTime) + print(totalTxnTime/numCalls) + + + + def parseLogs(filename, interval): @@ -33,7 +61,9 @@ def parseLogs(filename, interval): intervalStart = 0 intervalEnd = 0 intervalLedgers = 0 + ledgersPerSecond = 0 + print("ledgers, transactions, objects, loadTime, loadTime/ledger, ledgers/sec, txns/sec, objs/sec") for line in f: if "Load phase" in line: sequenceIdx = line.find("Sequence : ") @@ -71,8 +101,6 @@ def parseLogs(filename, interval): prevEnd = end end = getTime(line) - if end - prevEnd > 3 and prevEnd != 0: - print("Caught up!") if intervalStart == 0: intervalStart = getTime(line) @@ -92,26 +120,30 @@ def parseLogs(filename, interval): if int(sequence) % interval == 0: - print("Sequence = " + sequence + " : [time, txCount, objCount, txPerSec, objsPerSec]") - print(loadTime + " : " - + txnCount + " : " - + objCount + " : " - + txnsPerSecond + " : " - + objsPerSecond) - print("Interval Aggregate ( " + str(interval) + " ) [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]: ") - print(str(intervalLedgers) + " : " - + str(intervalEnd - intervalStart) + " : " - + str(intervalLedgersPerSecond) + " : " - + str(intervalLoadTime/intervalLedgers) + " : " - + str(intervalTxns/intervalTime) + " : " - + str(intervalObjs/intervalTime)) - print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]") - print(str(totalLedgers) + " : " - + str(end-start) + " : " - + str(ledgersPerSecond) + " : " - + str(totalLoadTime/totalLedgers) + " : " - + str(totalTxns/totalTime) + " : " - + str(totalObjs/totalTime)) + # print("Sequence = " + sequence + " : [time, txCount, objCount, txPerSec, objsPerSec]") + # print(loadTime + " , " + # + txnCount + " , " + # + objCount + " , " + # + txnsPerSecond + " , " + # + objsPerSecond) + # print("Interval Aggregate ( " + str(interval) + " ) [ledgers, txns, objects, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]: ") + print(str(intervalLedgers) + " , " + + str(intervalTxns) + " , " + + str(intervalObjs) + " , " + + str(intervalLoadTime) + " , " + + str(intervalLoadTime/intervalLedgers) + " , " + + str(intervalLedgers/intervalLoadTime) + " , " + + str(intervalTxns/intervalLoadTime) + " , " + + str(intervalObjs/intervalLoadTime)) + # print("Total Aggregate: [ledgers, txns, objects, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]") + # print(str(totalLedgers) + " , " + # + str(totalTxns) + " , " + # + str(totalObjs) + " , " + # + str(end-start) + " , " + # + str(ledgersPerSecond) + " , " + # + str(totalLoadTime/totalLedgers) + " , " + # + str(totalTxns/totalTime) + " , " + # + str(totalObjs/totalTime)) if int(sequence) % interval == 0: intervalTime = 0 intervalTxns = 0 @@ -120,6 +152,15 @@ def parseLogs(filename, interval): intervalEnd = 0 intervalLedgers = 0 intervalLoadTime = 0 + print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]") + print(totalLedgers) + print(totalLoadTime) + print(str(totalLedgers) + " : " + + str(end-start) + " : " + + str(ledgersPerSecond) + " : " + + str(totalLoadTime/totalLedgers) + " : " + + str(totalTxns/totalTime) + " : " + + str(totalObjs/totalTime)) @@ -127,10 +168,14 @@ def parseLogs(filename, interval): parser = argparse.ArgumentParser(description='parses logs') parser.add_argument("--filename") parser.add_argument("--interval",default=100000) +parser.add_argument("--account_tx",default=False) args = parser.parse_args() def run(args): - parseLogs(args.filename, int(args.interval)) + if args.account_tx: + parseAccountTx(args.filename) + else: + parseLogs(args.filename, int(args.interval)) run(args) diff --git a/test.py b/test.py index 82035bad..c8ce5c78 100755 --- a/test.py +++ b/test.py @@ -95,6 +95,47 @@ def compareAccountTx(aldous, p2p): print(len(p2p["transactions"])) print(maxLedger) +def getAccounts(filename): + accounts = [] + with open(filename) as f: + for line in f: + if line[0] == "{": + jv = json.loads(line) + accounts.append(jv["Account"]) + if len(line) == 35: + accounts.append(line[0:34]) + if len(line) == 44: + accounts.append(line[3:43]) + if len(line) == 65: + accounts.append(line[0:64]) + if len(line) == 41 or len(line) == 40: + accounts.append(line[0:40]) + elif len(line) == 43: + accounts.append(line[2:42]) + return accounts +def getAccountsAndCursors(filename): + accounts = [] + cursors = [] + with open(filename) as f: + for line in f: + if len(line) == 0: + continue + space = line.find(" ") + cursor = line[space+1:len(line)-1] + if cursor == "None": + cursors.append(None) + else: + cursors.append(json.loads(cursor)) + accounts.append(line[0:space]) + + return (accounts,cursors) +def getBooks(filename): + books = [] + with open(filename) as f: + for line in f: + if len(line) == 68: + books.append(line[3:67]) + return books def compareLedgerData(aldous, p2p): aldous[0].sort() aldous[1].sort() @@ -118,6 +159,24 @@ def compareLedgerData(aldous, p2p): +async def account_infos(ip, port, accounts, numCalls): + + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + try: + async with websockets.connect(address,max_size=1000000000) as ws: + print(len(accounts)) + for x in range(0,numCalls): + account = accounts[random.randrange(0,len(accounts))] + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"account_info","account":account,"binary":True})) + res = json.loads(await ws.recv()) + end = datetime.datetime.now().timestamp() + if (end - start) > 0.1: + print("request took more than 100ms") + + except websockets.exceptions.connectionclosederror as e: + print(e) async def account_info(ip, port, account, ledger, binary): @@ -165,10 +224,88 @@ async def account_tx(ip, port, account, binary, minLedger=None, maxLedger=None): await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200,"ledger_index_min":minLedger, "ledger_index_max":maxLedger})) res = json.loads(await ws.recv()) - print(json.dumps(res,indent=4,sort_keys=True)) + #print(json.dumps(res,indent=4,sort_keys=True)) return res except websockets.exceptions.ConnectionClosedError as e: print(e) +async def account_txs_full(ip, port, accounts, cursors, numCalls, limit): + + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + try: + async with websockets.connect(address,max_size=1000000000) as ws: + print(len(accounts)) + cursor = None + account = None + time = 0.0 + for x in range(0,numCalls): + + idx = random.randrange(0,len(accounts)) + account = accounts[idx] + cursor = cursors[idx] + start = datetime.datetime.now().timestamp() + if cursor is None: + await ws.send(json.dumps({"command":"account_tx","account":account,"binary":True,"limit":limit})) + else: + marker = {} + marker["ledger"] = cursor["ledger_sequence"] + marker["seq"] = cursor["transaction_index"] + await ws.send(json.dumps({"command":"account_tx","account":account,"cursor":cursor,"marker":marker,"binary":True,"limit":limit,"forward":False})) + + res = json.loads(await ws.recv()) + end = datetime.datetime.now().timestamp() + print(end-start) + time += (end - start) + txns = [] + if "result" in res: + txns = res["result"]["transactions"] + else: + txns = res["transactions"] + print(len(txns)) + print(account + " " + json.dumps(cursor)) + if (end - start) > 0.1: + print("request took more than 100ms") + print("Latency = " + str(time / numCalls)) + + except websockets.exceptions.connectionclosederror as e: + print(e) +async def account_txs(ip, port, accounts, numCalls): + + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + try: + async with websockets.connect(address,max_size=1000000000) as ws: + print(len(accounts)) + cursor = None + account = None + for x in range(0,numCalls): + + if cursor is None: + account = accounts[random.randrange(0,len(accounts))] + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"account_tx","account":account,"binary":True,"limit":200})) + else: + await ws.send(json.dumps({"command":"account_tx","account":account,"cursor":cursor,"binary":True,"limit":200})) + + + res = json.loads(await ws.recv()) + if "cursor" in res: + if cursor: + print(account + " " + json.dumps(cursor)) + else: + print(account + " " + "None") + #cursor = res["cursor"] + elif cursor: + print(account + " " + json.dumps(cursor)) + cursor = None + + + end = datetime.datetime.now().timestamp() + if (end - start) > 0.1: + print("request took more than 100ms") + + except websockets.exceptions.connectionclosederror as e: + print(e) async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None): address = 'ws://' + str(ip) + ':' + str(port) @@ -188,8 +325,14 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No if minLedger is not None and maxLedger is not None: req["ledger_index_min"] = minLedger req["ledger_index_max"] = maxLedger + start = datetime.datetime.now().timestamp() await ws.send(json.dumps(req)) - res = json.loads(await ws.recv()) + res = await ws.recv() + + end = datetime.datetime.now().timestamp() + + print(end - start) + res = json.loads(res) #print(json.dumps(res,indent=4,sort_keys=True)) if "result" in res: print(len(res["result"]["transactions"])) @@ -221,6 +364,20 @@ async def tx(ip, port, tx_hash, binary): return res except websockets.exceptions.connectionclosederror as e: print(e) +async def txs(ip, port, hashes, numCalls): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + for x in range(0,numCalls): + h = hashes[random.randrange(0,len(hashes))] + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"tx","transaction":h,"binary":True})) + res = json.loads(await ws.recv()) + end = datetime.datetime.now().timestamp() + if (end - start) > 0.1: + print("request took more than 100ms") + except websockets.exceptions.connectionclosederror as e: + print(e) async def ledger_entry(ip, port, index, ledger, binary): address = 'ws://' + str(ip) + ':' + str(port) @@ -231,6 +388,23 @@ async def ledger_entry(ip, port, index, ledger, binary): print(json.dumps(res,indent=4,sort_keys=True)) except websockets.exceptions.connectionclosederror as e: print(e) +async def ledger_entries(ip, port, ledger, keys, numCalls): + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + try: + async with websockets.connect(address) as ws: + print(len(keys)) + for x in range(0,numCalls): + index = keys[random.randrange(0,len(keys))] + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"ledger_entry","index":index,"binary":True,"ledger_index":int(ledger)})) + res = json.loads(await ws.recv()) + end = datetime.datetime.now().timestamp() + if (end - start) > 0.1: + print("request took more than 100ms") + + except websockets.exceptions.connectionclosederror as e: + print(e) async def ledger_data(ip, port, ledger, limit, binary, cursor): @@ -268,12 +442,12 @@ def writeLedgerData(data,filename): f.write('\n') -async def ledger_data_full(ip, port, ledger, binary, limit): +async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1): address = 'ws://' + str(ip) + ':' + str(port) try: blobs = [] keys = [] - async with websockets.connect(address) as ws: + async with websockets.connect(address,max_size=1000000000) as ws: if int(limit) < 2048: limit = 2048 marker = None @@ -287,6 +461,7 @@ async def ledger_data_full(ip, port, ledger, binary, limit): await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"cursor":marker, "marker":marker,"binary":bool(binary), "limit":int(limit)})) res = json.loads(await ws.recv()) + if "error" in res: print(res) @@ -298,11 +473,23 @@ async def ledger_data_full(ip, port, ledger, binary, limit): else: objects = res["objects"] for x in objects: - blobs.append(x["data"]) - keys.append(x["index"]) + if binary: + if typ is None or x["data"][2:6] == typ: + print(json.dumps(x)) + blobs.append(x["data"]) + keys.append(x["index"]) + else: + if typ is None or x["LedgerEntryType"] == typ: + print(json.dumps(x)) + blobs.append(x) + keys.append(x["index"]) + if limit != -1 and len(keys) > count: + print("stopping early") + print(len(keys)) + print("done") + return (keys,blobs) if "cursor" in res: marker = res["cursor"] - print(marker) elif "result" in res and "marker" in res["result"]: marker = res["result"]["marker"] print(marker) @@ -345,6 +532,25 @@ def compare_book_offers(aldous, p2p): return True +async def book_offerses(ip, port, ledger, books, numCalls): + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + try: + async with websockets.connect(address,max_size=1000000000) as ws: + print(len(books)) + for x in range(0,numCalls): + book = books[random.randrange(0,len(books))] + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"book_offers","book":book,"binary":True})) + res = json.loads(await ws.recv()) + end = datetime.datetime.now().timestamp() + print(book) + print(len(res["offers"])) + if (end - start) > 0.1: + print("request took more than 100ms") + + except websockets.exceptions.connectionclosederror as e: + print(e) async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, get_issuer, binary): @@ -415,9 +621,16 @@ def compareLedger(aldous, p2p): print(aldous) print(p2p) +def getHashesFromFile(filename): + hashes = [] + with open(filename) as f: + for line in f: + if len(line) == 65: + hashes.append(line[0:64]) + return hashes + def getHashes(res): - print(json.dumps(res,indent=4,sort_keys=True)) if "result" in res: res = res["result"]["ledger"] @@ -431,16 +644,64 @@ def getHashes(res): hashes.append(x) return hashes +import random +import datetime +numCalls = 0 +async def ledgers(ip, port, minLedger, maxLedger, transactions, expand, maxCalls): + global numCalls + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + ledger = 0 + try: + async with websockets.connect(address,max_size=1000000000) as ws: + global numCalls + for i in range(0, maxCalls): + + + ledger = random.randrange(minLedger,maxLedger) + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":True, "transactions":bool(transactions),"expand":bool(expand)})) + res = json.loads(await ws.recv()) + #print(res["header"]["blob"]) + end = datetime.datetime.now().timestamp() + if (end - start) > 0.1: + print("request took more than 100ms") + numCalls = numCalls + 1 + + except websockets.exceptions.ConnectionClosedError as e: + print(e) + print(ledger) + +async def getManyHashes(ip, port, minLedger,maxLedger): + + hashes = [] + for x in range(minLedger,maxLedger): + res = await ledger(ip, port, x,True, True, False) + hashes.extend(getHashes(res)) + print(len(hashes)) + return hashes +async def getManyHashes(ip, port, minLedger,maxLedger, numHashes): + + random.seed() + hashes = [] + while len(hashes) < numHashes: + + lgr = random.randrange(minLedger,maxLedger) + res = await ledger(ip, port, lgr,True, True, False) + hashes.extend(getHashes(res)) + print(len(hashes)) + return hashes + + async def ledger(ip, port, ledger, binary, transactions, expand): address = 'ws://' + str(ip) + ':' + str(port) try: - async with websockets.connect(address) as ws: + async with websockets.connect(address,max_size=1000000000) as ws: await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":bool(binary), "transactions":bool(transactions),"expand":bool(expand)})) res = json.loads(await ws.recv()) - print(json.dumps(res,indent=4,sort_keys=True)) - print(bool(binary)) + #print(json.dumps(res,indent=4,sort_keys=True)) return res except websockets.exceptions.connectionclosederror as e: @@ -456,6 +717,7 @@ async def ledger_range(ip, port): if "error" in res: await ws.send(json.dumps({"command":"server_info"})) res = json.loads(await ws.recv()) + print(res) rng = res["result"]["info"]["complete_ledgers"] idx = rng.find("-") return (int(rng[0:idx]),int(rng[idx+1:-1])) @@ -464,8 +726,10 @@ async def ledger_range(ip, port): except websockets.exceptions.connectionclosederror as e: print(e) + parser = argparse.ArgumentParser(description='test script for xrpl-reporting') -parser.add_argument('action', choices=["account_info", "tx", "account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry"]) +parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses"]) + parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--port', default='8080') parser.add_argument('--hash') @@ -486,7 +750,11 @@ parser.add_argument('--minLedger',default=-1) parser.add_argument('--maxLedger',default=-1) parser.add_argument('--filename',default=None) parser.add_argument('--index') +parser.add_argument('--type',default=None) parser.add_argument('--cursor',default='0000000000000000000000000000000000000000000000000000000000000000') +parser.add_argument('--numCalls',default=10000) +parser.add_argument('--numRunners',default=1) +parser.add_argument('--count',default=-1) @@ -504,6 +772,124 @@ def run(args): res2 = asyncio.get_event_loop().run_until_complete( account_info(args.p2pIp, args.p2pPort, args.account, args.ledger, args.binary)) print(compareAccountInfo(res1,res2)) + elif args.action == "txs": + #hashes = asyncio.get_event_loop().run_until_complete(getManyHashes(args.ip,args.port, int(args.minLedger),int(args.maxLedger), int(args.numCalls))) + #for x in hashes: + # print(x) + #return + hashes = getHashesFromFile(args.filename) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(txs(args.ip, args.port, hashes,int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + elif args.action == "ledgers": + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(ledgers(args.ip, args.port, int(args.minLedger), int(args.maxLedger), args.transactions, args.expand, int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + elif args.action == "ledger_entries": + keys = [] + ledger_index = 0 + with open(args.filename) as f: + i = 0 + for line in f: + if ledger_index == 0: + ledger_index = int(line) + elif len(line) == 65: + keys.append(line[0:64]) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(ledger_entries(args.ip, args.port, ledger_index,keys, int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + elif args.action == "account_txs": + accounts = getAccounts(args.filename) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(account_txs(args.ip, args.port,accounts, int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + elif args.action == "account_txs_full": + accounts,cursors = getAccountsAndCursors(args.filename) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(account_txs_full(args.ip, args.port,accounts,cursors,int(args.numCalls), int(args.limit)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + print("Latency = " + str((end - start) / int(args.numCalls)) + " seconds") + elif args.action == "account_infos": + accounts = getAccounts(args.filename) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(account_infos(args.ip, args.port,accounts, int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + + elif args.action == "book_offerses": + books = getBooks(args.filename) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(book_offerses(args.ip, args.port,int(args.ledger),books, int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") elif args.action == "ledger_entry": asyncio.get_event_loop().run_until_complete( ledger_entry(args.ip, args.port, args.index, args.ledger, args.binary)) @@ -565,7 +951,8 @@ def run(args): args.filename = str(args.port) + "." + str(args.ledger) res = asyncio.get_event_loop().run_until_complete( - ledger_data_full(args.ip, args.port, args.ledger, args.binary, args.limit)) + ledger_data_full(args.ip, args.port, args.ledger, bool(args.binary), args.limit,args.type, int(args.count))) + print(len(res[0])) if args.verify: writeLedgerData(res,args.filename) diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp index 3c196ff8..1b4d65e3 100644 --- a/websocket_server_async.cpp +++ b/websocket_server_async.cpp @@ -224,7 +224,10 @@ public: BOOST_LOG_TRIVIAL(debug) << " received request : " << request; try { + auto start = std::chrono::system_clock::now(); response = buildResponse(request, backend_); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) << __func__ << " RPC call took " << ((end - start).count() / 1000000000.0) << " . request = " << request; } catch (Backend::DatabaseTimeout const& t) {