diff --git a/metrics.py b/metrics.py index 413ab938..16d1be9f 100644 --- a/metrics.py +++ b/metrics.py @@ -1,17 +1,36 @@ #!/usr/bin/python3 import argparse -def parseLogs(filename): +from datetime import datetime + +def getTime(line): + bracketOpen = line.find("[") + bracketClose = line.find("]") + timestampSub = line[bracketOpen+1:bracketClose] + timestamp = datetime.strptime(timestampSub, '%Y-%m-%d %H:%M:%S.%f') + return timestamp.timestamp() + + +def parseLogs(filename, interval): with open(filename) as f: totalTime = 0 totalTxns = 0 totalObjs = 0 - - milTime = 0 - milTxns = 0 - milObjs = 0 + + + start = 0 + end = 0 + totalLedgers = 0 + + intervalTime = 0 + intervalTxns = 0 + intervalObjs = 0 + + intervalStart = 0 + intervalEnd = 0 + intervalLedgers = 0 for line in f: if "Load phase" in line: @@ -36,34 +55,73 @@ def parseLogs(filename): totalTime += float(loadTime); totalTxns += float(txnCount) totalObjs += float(objCount) - milTime += float(loadTime) - milTxns += float(txnCount) - milObjs += float(objCount) - if int(sequence) % 1000000 == 0: - print("This million: ") - print(str(milTxns/milTime) + " : " + str(milObjs/milTime)) - milTime = 0 - milTxns = 0 - milObjs - 0 + intervalTime += float(loadTime) + intervalTxns += float(txnCount) + intervalObjs += float(objCount) + + + + if start == 0: + start = getTime(line) + + + prevEnd = end + end = getTime(line) + if end - prevEnd > 3 and prevEnd != 0: + print("Caught up!") + + if intervalStart == 0: + intervalStart = getTime(line) + + intervalEnd = getTime(line) + + totalLedgers+=1 + intervalLedgers+=1 + ledgersPerSecond = 0 + if end != start: + ledgersPerSecond = float(totalLedgers) / float((end - start)) + intervalLedgersPerSecond = 0 + if intervalEnd != intervalStart: + intervalLedgersPerSecond = float(intervalLedgers) / float((intervalEnd - intervalStart)) + + print("Sequence = " + sequence + " : [time, txCount, objCount, txPerSec, objsPerSec]") - print(loadTime + " : " + txnCount + " : " + objCount + " : " + txnsPerSecond + " : " + objsPerSecond) - print("Aggregate: [txPerSec, objsPerSec]") - print(str(totalTxns/totalTime) + " : " + str(totalObjs/totalTime)) + print(loadTime + " : " + + txnCount + " : " + + objCount + " : " + + txnsPerSecond + " : " + + objsPerSecond) + print("Interval Aggregate ( " + str(interval) + " ) [ledgers, elapsedTime, ledgersPerSec, txPerSec, objsPerSec]: ") + print(str(intervalLedgers) + " : " + + str(intervalEnd - intervalStart) + " : " + + str(intervalLedgersPerSecond) + " : " + + str(intervalTxns/intervalTime) + " : " + + str(intervalObjs/intervalTime)) + print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, txPerSec, objsPerSec]") + print(str(totalLedgers) + " : " + + str(end-start) + " : " + + str(ledgersPerSecond) + " : " + + str(totalTxns/totalTime) + " : " + + str(totalObjs/totalTime)) + if int(sequence) % interval == 0: + intervalTime = 0 + intervalTxns = 0 + intervalObjs = 0 + intervalStart = 0 + intervalEnd = 0 + intervalLedgers = 0 - print("Last million: [txPerSec, objPerSec]") - print(str(milTxns/milTime) + " : " + str(milObjs/milTime)) - print("Totals : [txnPerSec, objPerSec]") - print(str(totalTxns/totalTime) + " : " + str(totalObjs/totalTime)) parser = argparse.ArgumentParser(description='parses logs') parser.add_argument("--filename") +parser.add_argument("--interval",default=100000) args = parser.parse_args() def run(args): - parseLogs(args.filename) + parseLogs(args.filename, int(args.interval)) run(args) diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index 8a6ff90a..c98d2705 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -441,7 +441,7 @@ public: call(stub, cq); } - BOOST_LOG_TRIVIAL(info) << "Writing objects"; + BOOST_LOG_TRIVIAL(trace) << "Writing objects"; for (auto& obj : *(cur_->mutable_ledger_objects()->mutable_objects())) { std::optional book; @@ -465,7 +465,7 @@ public: false, std::move(book)); } - BOOST_LOG_TRIVIAL(info) << "Wrote objects"; + BOOST_LOG_TRIVIAL(trace) << "Wrote objects"; return more ? CallStatus::MORE : CallStatus::DONE; } diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 6def1213..6b932489 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -246,8 +246,8 @@ ReportingETL::fetchLedgerDataAndDiff(uint32_t idx) std::pair ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) { - BOOST_LOG_TRIVIAL(info) << __func__ << " : " - << "Beginning ledger update"; + BOOST_LOG_TRIVIAL(trace) << __func__ << " : " + << "Beginning ledger update"; ripple::LedgerInfo lgrInfo = deserializeHeader(ripple::makeSlice(rawData.ledger_header())); diff --git a/test.py b/test.py index 8b88a3b0..7b014ef2 100755 --- a/test.py +++ b/test.py @@ -91,7 +91,9 @@ def compareAccountTx(aldous, p2p): else: print("Mismatch responses") print(len(aldousTxns)) + print(len(aldous["transactions"])) print(len(p2pTxns)) + print(len(p2p["transactions"])) print(maxLedger) def compareLedgerData(aldous, p2p): @@ -152,19 +154,24 @@ def getMinAndMax(res): maxSeq = seq return (minSeq,maxSeq) -async def account_tx(ip, port, account, binary): + +async def account_tx(ip, port, account, binary, minLedger=None, maxLedger=None): address = 'ws://' + str(ip) + ':' + str(port) try: async with websockets.connect(address) as ws: - await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200})) + if minLedger is None or maxLedger is None: + await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200})) + else: + await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200,"ledger_index_min":minLedger, "ledger_index_max":maxLedger})) + res = json.loads(await ws.recv()) print(json.dumps(res,indent=4,sort_keys=True)) return res except websockets.exceptions.ConnectionClosedError as e: print(e) -async def account_tx_full(ip, port, account, binary): +async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None): address = 'ws://' + str(ip) + ':' + str(port) try: cursor = None @@ -179,6 +186,9 @@ async def account_tx_full(ip, port, account, binary): req["cursor"] = cursor if not marker is None: req["marker"] = marker + if minLedger is not None and maxLedger is not None: + req["ledger_index_min"] = minLedger + req["ledger_index_max"] = maxLedger await ws.send(json.dumps(req)) res = json.loads(await ws.recv()) #print(json.dumps(res,indent=4,sort_keys=True)) @@ -433,9 +443,11 @@ async def ledger_range(ip, port): if "error" in res: await ws.send(json.dumps({"command":"server_info"})) res = json.loads(await ws.recv()) - print(json.dumps(res,indent=4,sort_keys=True)) - return res["result"]["info"]["validated_ledger"]["seq"] - return res["ledger_index_max"] + rng = res["result"]["info"]["complete_ledgers"] + idx = rng.find("-") + return (int(rng[0:idx]),int(rng[idx+1:-1])) + + return (res["ledger_index_min"],res["ledger_index_max"]) except websockets.exceptions.connectionclosederror as e: print(e) @@ -459,7 +471,7 @@ parser.add_argument('--expand',default=False) parser.add_argument('--transactions',default=False) parser.add_argument('--minLedger',default=-1) parser.add_argument('--maxLedger',default=-1) -parser.add_argument('--filename') +parser.add_argument('--filename',default=None) parser.add_argument('--index') @@ -470,7 +482,7 @@ args = parser.parse_args() def run(args): asyncio.set_event_loop(asyncio.new_event_loop()) if(args.ledger is None): - args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)); + args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1] if args.action == "account_info": res1 = asyncio.get_event_loop().run_until_complete( account_info(args.ip, args.port, args.account, args.ledger, args.binary)) @@ -487,19 +499,27 @@ def run(args): asyncio.get_event_loop().run_until_complete( tx(args.ip, args.port, args.hash, args.binary)) elif args.action == "account_tx": + if args.verify: + args.binary=True + + rng = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)) res = asyncio.get_event_loop().run_until_complete( account_tx(args.ip, args.port, args.account, args.binary)) + if args.verify: res2 = asyncio.get_event_loop().run_until_complete( - account_tx(args.p2pIp, args.p2pPort, args.account, args.binary)) + account_tx(args.p2pIp, args.p2pPort, args.account, args.binary,rng[0],rng[1])) print(compareAccountTx(res,res2)) elif args.action == "account_tx_full": + if args.verify: + args.binary=True + rng = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)) res = asyncio.get_event_loop().run_until_complete( account_tx_full(args.ip, args.port, args.account, args.binary)) print(len(res["transactions"])) if args.verify: res2 = asyncio.get_event_loop().run_until_complete( - account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary)) + account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary, rng[0],rng[1])) print(compareAccountTx(res,res2)) elif args.action == "ledger_data": @@ -508,6 +528,12 @@ def run(args): if args.verify: writeLedgerData(res,args.filename) elif args.action == "ledger_data_full": + if args.verify: + args.limit = 2048 + args.binary = True + if args.filename is None: + args.filename = str(args.port) + "." + str(args.ledger) + res = asyncio.get_event_loop().run_until_complete( ledger_data_full(args.ip, args.port, args.ledger, args.binary, args.limit)) if args.verify: @@ -515,6 +541,10 @@ def run(args): elif args.action == "ledger": + if args.verify: + args.binary = True + args.transactions = True + args.expand = True res = asyncio.get_event_loop().run_until_complete( ledger(args.ip, args.port, args.ledger, args.binary, args.transactions, args.expand)) if args.verify: @@ -526,6 +556,8 @@ def run(args): asyncio.get_event_loop().run_until_complete( ledger_range(args.ip, args.port)) elif args.action == "book_offers": + if args.verify: + args.binary=True res = asyncio.get_event_loop().run_until_complete( book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary)) if args.verify: