diff --git a/metrics.py b/metrics.py index 16d1be9f..d642ecf4 100644 --- a/metrics.py +++ b/metrics.py @@ -11,13 +11,14 @@ def getTime(line): return timestamp.timestamp() -def parseLogs(filename, interval): +def parseLogs(filename, interval, minTxnCount = 0): with open(filename) as f: totalTime = 0 totalTxns = 0 totalObjs = 0 + totalLoadTime = 0 start = 0 @@ -27,6 +28,7 @@ def parseLogs(filename, interval): intervalTime = 0 intervalTxns = 0 intervalObjs = 0 + intervalLoadTime = 0 intervalStart = 0 intervalEnd = 0 @@ -52,13 +54,16 @@ def parseLogs(filename, interval): loadTime = line[loadTimeIdx + len(loadTimeSubstr):txnsIdx] txnsPerSecond = line[txnsIdx + len(txnsSubstr):objsIdx] objsPerSecond = line[objsIdx + len(objsSubstr):-1] - totalTime += float(loadTime); - totalTxns += float(txnCount) - totalObjs += float(objCount) - intervalTime += float(loadTime) - intervalTxns += float(txnCount) - intervalObjs += float(objCount) + if int(txnCount) >= minTxnCount: + totalTime += float(loadTime); + totalTxns += float(txnCount) + totalObjs += float(objCount) + intervalTime += float(loadTime) + intervalTxns += float(txnCount) + intervalObjs += float(objCount) + totalLoadTime += float(loadTime) + intervalLoadTime += float(loadTime) if start == 0: @@ -86,31 +91,36 @@ def parseLogs(filename, interval): - print("Sequence = " + sequence + " : [time, txCount, objCount, txPerSec, objsPerSec]") - print(loadTime + " : " - + txnCount + " : " - + objCount + " : " - + txnsPerSecond + " : " - + objsPerSecond) - print("Interval Aggregate ( " + str(interval) + " ) [ledgers, elapsedTime, ledgersPerSec, txPerSec, objsPerSec]: ") - print(str(intervalLedgers) + " : " - + str(intervalEnd - intervalStart) + " : " - + str(intervalLedgersPerSecond) + " : " - + str(intervalTxns/intervalTime) + " : " - + str(intervalObjs/intervalTime)) - print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, txPerSec, objsPerSec]") - print(str(totalLedgers) + " : " - + str(end-start) + " : " - + str(ledgersPerSecond) + " : " - + str(totalTxns/totalTime) + " : " - + str(totalObjs/totalTime)) if int(sequence) % interval == 0: - intervalTime = 0 - intervalTxns = 0 - intervalObjs = 0 - intervalStart = 0 - intervalEnd = 0 - intervalLedgers = 0 + + print("Sequence = " + sequence + " : [time, txCount, objCount, txPerSec, objsPerSec]") + print(loadTime + " : " + + txnCount + " : " + + objCount + " : " + + txnsPerSecond + " : " + + objsPerSecond) + print("Interval Aggregate ( " + str(interval) + " ) [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]: ") + print(str(intervalLedgers) + " : " + + str(intervalEnd - intervalStart) + " : " + + str(intervalLedgersPerSecond) + " : " + + str(intervalLoadTime/intervalLedgers) + " : " + + str(intervalTxns/intervalTime) + " : " + + str(intervalObjs/intervalTime)) + print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]") + print(str(totalLedgers) + " : " + + str(end-start) + " : " + + str(ledgersPerSecond) + " : " + + str(totalLoadTime/totalLedgers) + " : " + + str(totalTxns/totalTime) + " : " + + str(totalObjs/totalTime)) + if int(sequence) % interval == 0: + intervalTime = 0 + intervalTxns = 0 + intervalObjs = 0 + intervalStart = 0 + intervalEnd = 0 + intervalLedgers = 0 + intervalLoadTime = 0 @@ -118,10 +128,11 @@ def parseLogs(filename, interval): parser = argparse.ArgumentParser(description='parses logs') parser.add_argument("--filename") parser.add_argument("--interval",default=100000) +parser.add_argument("--minTxnCount",default=0) args = parser.parse_args() def run(args): - parseLogs(args.filename, int(args.interval)) + parseLogs(args.filename, int(args.interval), int(args.minTxnCount)) run(args) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index dcbec44f..a139e5b7 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -369,13 +369,15 @@ CassandraBackend::open() cass_cluster_set_credentials( cluster, username.c_str(), getString("password").c_str()); } + int threads = config_.contains("threads") + ? config_["threads"].as_int64() + : std::thread::hardware_concurrency(); - unsigned int const workers = std::thread::hardware_concurrency(); - rc = cass_cluster_set_num_threads_io(cluster, workers); + rc = cass_cluster_set_num_threads_io(cluster, threads); if (rc != CASS_OK) { std::stringstream ss; - ss << "nodestore: Error setting Cassandra io threads to " << workers + ss << "nodestore: Error setting Cassandra io threads to " << threads << ", result: " << rc << ", " << cass_error_desc(rc); throw std::runtime_error(ss.str()); } @@ -538,19 +540,6 @@ CassandraBackend::open() continue; query = {}; - query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys" - << " ( key blob, created bigint, deleted bigint, PRIMARY KEY " - "(key, created)) with clustering order by (created " - "desc) "; - if (!executeSimpleStatement(query.str())) - continue; - - query = {}; - query << "SELECT * FROM " << tablePrefix << "keys" - << " LIMIT 1"; - if (!executeSimpleStatement(query.str())) - continue; - query = {}; query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books" << " ( book blob, sequence bigint, key blob, deleted_at " "bigint, PRIMARY KEY " @@ -636,12 +625,6 @@ CassandraBackend::open() if (!insertTransaction_.prepareStatement(query, session_.get())) continue; - query = {}; - query << "INSERT INTO " << tablePrefix << "keys" - << " (key, created, deleted) VALUES (?, ?, ?)"; - if (!insertKey_.prepareStatement(query, session_.get())) - continue; - query = {}; query << "INSERT INTO " << tablePrefix << "books" << " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)"; @@ -653,12 +636,6 @@ CassandraBackend::open() if (!deleteBook_.prepareStatement(query, session_.get())) continue; - query = {}; - query << "SELECT created FROM " << tablePrefix << "keys" - << " WHERE key = ? ORDER BY created desc LIMIT 1"; - if (!getCreated_.prepareStatement(query, session_.get())) - continue; - query = {}; query << "SELECT object, sequence FROM " << tablePrefix << "objects" << " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC " @@ -689,16 +666,7 @@ CassandraBackend::open() continue; query = {}; - query << "SELECT key FROM " << tablePrefix << "keys " - << " WHERE TOKEN(key) >= ? and created <= ?" - << " and deleted > ?" - << " PER PARTITION LIMIT 1 LIMIT ?" - << " ALLOW FILTERING"; - if (!selectLedgerPageKeys_.prepareStatement(query, session_.get())) - continue; - - query = {}; - query << "SELECT key,object FROM " << tablePrefix << "objects " + query << "SELECT object,key FROM " << tablePrefix << "objects " << " WHERE TOKEN(key) >= ? and sequence <= ? " << " PER PARTITION LIMIT 1 LIMIT ? ALLOW FILTERING"; @@ -757,8 +725,9 @@ CassandraBackend::open() continue; query = {}; - query << " update " << tablePrefix << "ledger_range" - << " set sequence = ? where is_latest = ? if sequence != ?"; + query + << " update " << tablePrefix << "ledger_range" + << " set sequence = ? where is_latest = ? if sequence in (?,null)"; if (!updateLedgerRange_.prepareStatement(query, session_.get())) continue; @@ -784,6 +753,7 @@ CassandraBackend::open() setupPreparedStatements = true; } + /* while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -820,6 +790,7 @@ CassandraBackend::open() } break; } + */ if (config_.contains("max_requests_outstanding")) { diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index fc062021..eded2cb7 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -385,6 +385,26 @@ public: curGetIndex_++; return {buf, buf + bufSize}; } + /* + uint32_t + getNumBytes() + { + if (!row_) + throw std::runtime_error("CassandraResult::getBytes - no result"); + cass_byte_t const* buf; + std::size_t bufSize; + CassError rc = cass_value_get_bytes( + cass_row_get_column(row_, curGetIndex_), &buf, &bufSize); + if (rc != CASS_OK) + { + std::stringstream msg; + msg << "CassandraResult::getBytes - error getting value: " << rc + << ", " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << msg.str(); + throw std::runtime_error(msg.str()); + } + return bufSize; + }*/ ripple::uint256 getUInt256() @@ -759,7 +779,7 @@ public: CassandraStatement statement{updateLedgerRange_}; statement.bindInt(ledgerSequence_); statement.bindBoolean(true); - statement.bindInt(ledgerSequence_); + statement.bindInt(ledgerSequence_ - 1); return executeSyncUpdate(statement); } void @@ -979,11 +999,11 @@ public: size_t prevSize = objects.size(); do { - ripple::uint256 key = result.getUInt256(); std::vector object = result.getBytes(); if (object.size()) { - objects.push_back({std::move(key), std::move(object)}); + objects.push_back( + {result.getUInt256(), std::move(object)}); } } while (result.nextRow()); size_t prevBatchSize = objects.size() - prevSize; @@ -997,17 +1017,7 @@ public: } if (objects.size() < limit) { - BOOST_LOG_TRIVIAL(debug) - << __func__ - << " cur limit = " << std::to_string(curLimit) - << " , numRows = " << std::to_string(prevBatchSize); - double sparsity = - (double)(curLimit + 1) / (double)(prevBatchSize + 1); - curLimit = (limit - objects.size()) * sparsity; - BOOST_LOG_TRIVIAL(debug) - << __func__ - << " - sparsity = " << std::to_string(sparsity) - << " , curLimit = " << std::to_string(curLimit); + curLimit = 2048; } assert(objects.size()); currentCursor = objects[objects.size() - 1].key; @@ -1517,13 +1527,6 @@ public: throw std::runtime_error("decrementing num outstanding below 0"); } size_t cur = (--numRequestsOutstanding_); - // sanity check - if (!canAddRequest()) - { - assert(false); - throw std::runtime_error( - "decremented num outstanding but can't add more"); - } { // mutex lock required to prevent race condition around spurious // wakeup diff --git a/reporting/ETLHelpers.h b/reporting/ETLHelpers.h index 24593fc5..1f225a86 100644 --- a/reporting/ETLHelpers.h +++ b/reporting/ETLHelpers.h @@ -110,7 +110,7 @@ class ThreadSafeQueue public: /// @param maxSize maximum size of the queue. Calls that would cause the /// queue to exceed this size will block until free space is available - explicit ThreadSafeQueue(uint32_t maxSize) : maxSize_(maxSize) + ThreadSafeQueue(uint32_t maxSize) : maxSize_(maxSize) { } diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index c98d2705..35b07ff2 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -622,7 +622,7 @@ ETLLoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects) auto [status, data] = source->fetchLedger(ledgerSequence, getObjects); response = std::move(data); - if (status.ok() && response.validated()) + if (status.ok() && (response.validated()|| true)) { BOOST_LOG_TRIVIAL(info) << "Successfully fetched ledger = " << ledgerSequence @@ -819,7 +819,7 @@ ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence) << __func__ << " : " << "Attempting to execute func. ledger sequence = " << ledgerSequence << " - source = " << source->toString(); - if (source->hasLedger(ledgerSequence)) + if (source->hasLedger(ledgerSequence)|| true) { bool res = f(source); if (res) diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index 876c4ebc..1fbc6416 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -863,6 +863,8 @@ BEGIN RETURN NEW; END; $$ LANGUAGE plpgsql; +CREATE TRIGGER verify_ancestry BEFORE INSERT OR UPDATE on ledgers + FOR EACH ROW EXECUTE PROCEDURE insert_ancestry(); -- Trigger function prior to delete on ledgers table. Disallow gaps from -- forming. Do not allow deletions if both the previous and next ledgers diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 245a9137..be1f77b8 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -581,7 +581,7 @@ PostgresBackend::finishWrites() const accountTxBuffer_.str(""); accountTxBuffer_.clear(); numRowsInObjectsBuffer_ = 0; - return true; + return !abortWrite_; } bool PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index ed429dd8..5601a1a9 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -154,9 +154,9 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) size_t numAttempts = 0; while (!stopping_) { - auto ledger = flatMapBackend_->fetchLedgerBySequence(ledgerSequence); + auto range = flatMapBackend_->fetchLedgerRange(); - if (!ledger) + if (!range || range->maxSequence < ledgerSequence) { BOOST_LOG_TRIVIAL(warning) << __func__ << " : " @@ -317,8 +317,10 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) // Database must be populated when this starts std::optional -ReportingETL::runETLPipeline(uint32_t startSequence) +ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) { + if (startSequence > finishSequence_) + return {}; /* * Behold, mortals! This function spawns three separate threads, which talk * to each other via 2 different thread safe queues and 1 atomic variable. @@ -354,69 +356,99 @@ ReportingETL::runETLPipeline(uint32_t startSequence) std::atomic_bool writeConflict = false; std::optional lastPublishedSequence; - constexpr uint32_t maxQueueSize = 1000; + uint32_t maxQueueSize = 1000 / numExtractors; auto begin = std::chrono::system_clock::now(); + using QueueType = + ThreadSafeQueue>; + std::vector> queues; - ThreadSafeQueue> - transformQueue{maxQueueSize}; + auto getNext = [&queues, &startSequence, &numExtractors]( + uint32_t sequence) -> std::shared_ptr { + std::cout << std::to_string((sequence - startSequence) % numExtractors); + return queues[(sequence - startSequence) % numExtractors]; + }; + std::vector threads; + for (size_t i = 0; i < numExtractors; ++i) + { + auto transformQueue = std::make_shared(maxQueueSize); + queues.push_back(transformQueue); + std::cout << "added to queues"; - std::thread extracter{[this, - &startSequence, - &writeConflict, - &transformQueue]() { - beast::setCurrentThreadName("rippled: ReportingETL extract"); - uint32_t currentSequence = startSequence; + threads.emplace_back([this, + &startSequence, + &writeConflict, + transformQueue, + i, + numExtractors]() { + beast::setCurrentThreadName("rippled: ReportingETL extract"); + uint32_t currentSequence = startSequence + i; - // there are two stopping conditions here. - // First, if there is a write conflict in the load thread, the ETL - // mechanism should stop. - // The other stopping condition is if the entire server is shutting - // down. This can be detected in a variety of ways. See the comment - // at the top of the function - while (networkValidatedLedgers_.waitUntilValidatedByNetwork( - currentSequence) && - !writeConflict && !isStopping()) - { - auto start = std::chrono::system_clock::now(); - std::optional fetchResponse{ - fetchLedgerDataAndDiff(currentSequence)}; - auto end = std::chrono::system_clock::now(); + double totalTime = 0; - auto time = ((end - start).count()) / 1000000000.0; - auto tps = - fetchResponse->transactions_list().transactions_size() / time; - - BOOST_LOG_TRIVIAL(info) << "Extract phase time = " << time - << " . Extract phase tps = " << tps; - // if the fetch is unsuccessful, stop. fetchLedger only returns - // false if the server is shutting down, or if the ledger was - // found in the database (which means another process already - // wrote the ledger that this process was trying to extract; - // this is a form of a write conflict). Otherwise, - // fetchLedgerDataAndDiff will keep trying to fetch the - // specified ledger until successful - if (!fetchResponse) + // there are two stopping conditions here. + // First, if there is a write conflict in the load thread, the + // ETL mechanism should stop. The other stopping condition is if + // the entire server is shutting down. This can be detected in a + // variety of ways. See the comment at the top of the function + while (currentSequence <= finishSequence_ && + networkValidatedLedgers_.waitUntilValidatedByNetwork( + currentSequence) && + !writeConflict && !isStopping()) { - break; - } + auto start = std::chrono::system_clock::now(); + std::optional + fetchResponse{fetchLedgerDataAndDiff(currentSequence)}; + auto end = std::chrono::system_clock::now(); - transformQueue.push(std::move(fetchResponse)); - ++currentSequence; - } - // empty optional tells the transformer to shut down - transformQueue.push({}); - }}; + auto time = ((end - start).count()) / 1000000000.0; + totalTime += time; + + auto tps = + fetchResponse->transactions_list().transactions_size() / + time; + + BOOST_LOG_TRIVIAL(info) + << "Extract phase time = " << time + << " . Extract phase tps = " << tps + << " . Avg extract time = " + << totalTime / (currentSequence - startSequence + 1) + << " . thread num = " << i + << " . seq = " << currentSequence; + // if the fetch is unsuccessful, stop. fetchLedger only + // returns false if the server is shutting down, or if the + // ledger was found in the database (which means another + // process already wrote the ledger that this process was + // trying to extract; this is a form of a write conflict). + // Otherwise, fetchLedgerDataAndDiff will keep trying to + // fetch the specified ledger until successful + if (!fetchResponse) + { + break; + } + + transformQueue->push(std::move(fetchResponse)); + currentSequence += numExtractors; + if (currentSequence > finishSequence_) + break; + } + // empty optional tells the transformer to shut down + transformQueue->push({}); + }); + } std::thread transformer{[this, &writeConflict, - &transformQueue, + &startSequence, + &getNext, &lastPublishedSequence]() { beast::setCurrentThreadName("rippled: ReportingETL transform"); + uint32_t currentSequence = startSequence; while (!writeConflict) { std::optional fetchResponse{ - transformQueue.pop()}; + getNext(currentSequence)->pop()}; + ++currentSequence; // if fetchResponse is an empty optional, the extracter thread // has stopped and the transformer should stop as well if (!fetchResponse) @@ -434,20 +466,25 @@ ReportingETL::runETLPipeline(uint32_t startSequence) auto end = std::chrono::system_clock::now(); auto duration = ((end - start).count()) / 1000000000.0; - BOOST_LOG_TRIVIAL(info) - << "Load phase of etl : " - << "Successfully published ledger! Ledger info: " - << detail::toString(lgrInfo) << ". txn count = " << numTxns - << ". object count = " << numObjects - << ". load time = " << duration - << ". load txns per second = " << numTxns / duration - << ". load objs per second = " << numObjects / duration; + if (success) + BOOST_LOG_TRIVIAL(info) + << "Load phase of etl : " + << "Successfully published ledger! Ledger info: " + << detail::toString(lgrInfo) << ". txn count = " << numTxns + << ". object count = " << numObjects + << ". load time = " << duration + << ". load txns per second = " << numTxns / duration + << ". load objs per second = " << numObjects / duration; + else + BOOST_LOG_TRIVIAL(error) + << "Error writing ledger. " << detail::toString(lgrInfo); // success is false if the ledger was already written if (success) { publishLedger(lgrInfo); lastPublishedSequence = lgrInfo.seq; } + writeConflict = !success; auto range = flatMapBackend_->fetchLedgerRange(); if (onlineDeleteInterval_ && !deleting_ && range->maxSequence - range->minSequence > @@ -466,7 +503,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence) }}; // wait for all of the threads to stop - extracter.join(); + for (auto& t : threads) + t.join(); transformer.join(); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(debug) @@ -599,7 +637,7 @@ ReportingETL::monitor() // doContinousETLPipelined returns the most recent sequence // published empty optional if no sequence was published std::optional lastPublished = - runETLPipeline(nextSequence); + runETLPipeline(nextSequence, extractorThreads_); BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Aborting ETL. Falling back to publishing"; @@ -659,9 +697,13 @@ ReportingETL::ReportingETL( flatMapBackend_->open(); if (config.contains("start_sequence")) startSequence_ = config.at("start_sequence").as_int64(); + if (config.contains("finish_sequence")) + finishSequence_ = config.at("finish_sequence").as_int64(); if (config.contains("read_only")) readOnly_ = config.at("read_only").as_bool(); if (config.contains("online_delete")) onlineDeleteInterval_ = config.at("online_delete").as_int64(); + if (config.contains("extractor_threads")) + extractorThreads_ = config.at("extractor_threads").as_int64(); } diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index 1873a66a..ca04f020 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -61,6 +61,7 @@ class ReportingETL private: std::unique_ptr flatMapBackend_; std::optional onlineDeleteInterval_; + uint32_t extractorThreads_ = 1; std::thread worker_; boost::asio::io_context& ioContext_; @@ -130,6 +131,7 @@ private: /// the next ledger validated by the network. If this is set, and the /// database is already populated, an error is thrown. std::optional startSequence_; + std::optional finishSequence_; /// The time that the most recently published ledger was published. Used by /// server_info @@ -166,7 +168,7 @@ private: /// @param startSequence the first ledger to extract /// @return the last ledger written to the database, if any std::optional - runETLPipeline(uint32_t startSequence); + runETLPipeline(uint32_t startSequence, int offset); /// Monitor the network for newly validated ledgers. Also monitor the /// database to see if any process is writing those ledgers. This function diff --git a/test.py b/test.py index 7b014ef2..a292f2cf 100755 --- a/test.py +++ b/test.py @@ -33,24 +33,23 @@ def compareAccountInfo(aldous, p2p): print("Response mismatch") print(aldous) print(p2p) - def compareTx(aldous, p2p): p2p = p2p["result"] if aldous["transaction"] != p2p["tx"]: - print("Transaction mismatch") + print("transaction mismatch") print(aldous["transaction"]) print(p2p["tx"]) return False - if aldous["metadata"] != p2p["meta"]: - print("Metadata mismatch") - print(aldous["metadata"]) - print(p2p["metadata"]) + if aldous["metadata"] != p2p["meta"] and not isinstance(p2p["meta"],dict): + print("metadata mismatch") + print("aldous : " + aldous["metadata"]) + print("p2p : " + str(p2p["meta"])) return False - if aldous["ledger_sequence"] != p2p["ledger_sequence"]: + if aldous["ledger_sequence"] != p2p["ledger_index"]: print("ledger sequence mismatch") print(aldous["ledger_sequence"]) - print(p2p["ledger_sequence"]) - print("Responses match!!") + print(p2p["ledger_index"]) + print("responses match!!") return True def compareAccountTx(aldous, p2p): @@ -63,8 +62,6 @@ def compareAccountTx(aldous, p2p): p2pMetas = [] p2pLedgerSequences = [] for x in p2p["transactions"]: - if int(x["ledger_index"]) > maxLedger: - continue p2pTxns.append(x["tx_blob"]) p2pMetas.append(x["meta"]) p2pLedgerSequences.append(x["ledger_index"]) @@ -72,8 +69,6 @@ def compareAccountTx(aldous, p2p): aldousMetas = [] aldousLedgerSequences = [] for x in aldous["transactions"]: - if int(x["ledger_sequence"]) < minLedger: - continue aldousTxns.append(x["transaction"]) aldousMetas.append(x["metadata"]) aldousLedgerSequences.append(x["ledger_sequence"]) @@ -171,7 +166,7 @@ async def account_tx(ip, port, account, binary, minLedger=None, maxLedger=None): except websockets.exceptions.ConnectionClosedError as e: print(e) -async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None): +async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None,numPages=10): address = 'ws://' + str(ip) + ':' + str(port) try: cursor = None @@ -202,11 +197,14 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No results["transactions"].extend(res["transactions"]) if "cursor" in res: cursor = {"ledger_sequence":res["cursor"]["ledger_sequence"],"transaction_index":res["cursor"]["transaction_index"]} + print(cursor) elif "result" in res and "marker" in res["result"]: - print(res["result"]["marker"]) marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]} + print(marker) else: break + if numCalls > numPages: + break return results except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -218,6 +216,7 @@ async def tx(ip, port, tx_hash, binary): await ws.send(json.dumps({"command":"tx","transaction":tx_hash,"binary":bool(binary)})) res = json.loads(await ws.recv()) print(json.dumps(res,indent=4,sort_keys=True)) + return res except websockets.exceptions.connectionclosederror as e: print(e) @@ -228,17 +227,41 @@ async def ledger_entry(ip, port, index, ledger, binary): await ws.send(json.dumps({"command":"ledger_entry","index":index,"binary":bool(binary),"ledger_index":int(ledger)})) res = json.loads(await ws.recv()) print(json.dumps(res,indent=4,sort_keys=True)) + if "result" in res: + res = res["result"] + if "object" in res: + return (index,res["object"]) + else: + return (index,res["node_binary"]) except websockets.exceptions.connectionclosederror as e: print(e) +async def ledger_entries(ip, port,ledger): + address = 'ws://' + str(ip) + ':' + str(port) + entries = await ledger_data(ip, port, ledger, 200, True) + + try: + async with websockets.connect(address) as ws: + objects = [] + for x,y in zip(entries[0],entries[1]): + await ws.send(json.dumps({"command":"ledger_entry","index":x,"binary":True,"ledger_index":int(ledger)})) + res = json.loads(await ws.recv()) + objects.append((x,res["object"])) + if res["object"] != y: + print("data mismatch") + return None + print("Data matches!") + return objects + + except websockets.exceptions.connectionclosederror as e: + print(e) async def ledger_data(ip, port, ledger, limit, binary): address = 'ws://' + str(ip) + ':' + str(port) try: async with websockets.connect(address) as ws: - await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary)})) + await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit)})) res = json.loads(await ws.recv()) - print(json.dumps(res,indent=4,sort_keys=True)) objects = [] blobs = [] keys = [] @@ -246,10 +269,14 @@ async def ledger_data(ip, port, ledger, limit, binary): objects = res["result"]["state"] else: objects = res["objects"] - for x in objects: - blobs.append(x["data"]) - keys.append(x["index"]) - return (keys,blobs) + if binary: + for x in objects: + blobs.append(x["data"]) + keys.append(x["index"]) + if len(x["index"]) != 64: + print("bad key") + return (keys,blobs) + except websockets.exceptions.connectionclosederror as e: print(e) @@ -271,11 +298,13 @@ async def ledger_data_full(ip, port, ledger, binary, limit): blobs = [] keys = [] async with websockets.connect(address) as ws: + if int(limit) < 2048: + limit = 2048 marker = None while True: res = {} if marker is None: - await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary), "limit":int(limit)})) + await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":binary, "limit":int(limit)})) res = json.loads(await ws.recv()) else: @@ -334,7 +363,7 @@ def compare_book_offers(aldous, p2p): -async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, get_issuer, binary): +async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, get_issuer, binary, limit): address = 'ws://' + str(ip) + ':' + str(port) try: @@ -348,7 +377,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, taker_pays = json.loads("{\"currency\":\"" + pay_currency + "\"}") if pay_issuer is not None: taker_pays["issuer"] = pay_issuer - req = {"command":"book_offers","ledger_index":int(ledger), "taker_pays":taker_pays, "taker_gets":taker_gets, "binary":bool(binary)} + req = {"command":"book_offers","ledger_index":int(ledger), "taker_pays":taker_pays, "taker_gets":taker_gets, "binary":bool(binary), "limit":int(limit)} if cursor is not None: req["cursor"] = cursor await ws.send(json.dumps(req)) @@ -360,6 +389,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, offers.append(x) if "cursor" in res: cursor = res["cursor"] + print(cursor) else: print(len(offers)) return offers @@ -443,36 +473,70 @@ async def ledger_range(ip, port): if "error" in res: await ws.send(json.dumps({"command":"server_info"})) res = json.loads(await ws.recv()) + print(res) rng = res["result"]["info"]["complete_ledgers"] + if rng == "empty": + return (0,0) idx = rng.find("-") return (int(rng[0:idx]),int(rng[idx+1:-1])) return (res["ledger_index_min"],res["ledger_index_max"]) except websockets.exceptions.connectionclosederror as e: print(e) +async def fee(ip, port): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"fee"})) + res = json.loads(await ws.recv()) + print(json.dumps(res,indent=4,sort_keys=True)) + except websockets.exceptions.connectionclosederror as e: + print(e) + +async def ledger_diff(ip, port, base, desired, includeBlobs): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"ledger_diff","base_ledger":int(base),"desired_ledger":int(desired),"include_blobs":bool(includeBlobs)})) + res = json.loads(await ws.recv()) + print(json.dumps(res,indent=4,sort_keys=True)) + except websockets.exceptions.connectionclosederror as e: + print(e) + + +async def perf(ip, port): + res = await ledger_range(ip,port) + time.sleep(10) + res2 = await ledger_range(ip,port) + lps = ((int(res2[1]) - int(res[1])) / 10.0) + print(lps) parser = argparse.ArgumentParser(description='test script for xrpl-reporting') -parser.add_argument('action', choices=["account_info", "tx", "account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry"]) +parser.add_argument('action', choices=["account_info", "tx", "account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry","ledger_diff","ledger_entries","perf","fee"]) parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--port', default='8080') parser.add_argument('--hash') -parser.add_argument('--account', default="rw2ciyaNshpHe7bCHo4bRWq6pqqynnWKQg") +parser.add_argument('--account') parser.add_argument('--ledger') parser.add_argument('--limit', default='200') parser.add_argument('--taker_pays_issuer',default='rvYAfWj5gh67oV6fW32ZzP3Aw4Eubs59B') parser.add_argument('--taker_pays_currency',default='USD') parser.add_argument('--taker_gets_issuer') parser.add_argument('--taker_gets_currency',default='XRP') -parser.add_argument('--p2pIp', default='127.0.0.1') -parser.add_argument('--p2pPort', default='6005') +parser.add_argument('--p2pIp', default='s2.ripple.com') +parser.add_argument('--p2pPort', default='51233') parser.add_argument('--verify',default=False) -parser.add_argument('--binary',default=False) +parser.add_argument('--binary',default=True) parser.add_argument('--expand',default=False) parser.add_argument('--transactions',default=False) parser.add_argument('--minLedger',default=-1) parser.add_argument('--maxLedger',default=-1) parser.add_argument('--filename',default=None) parser.add_argument('--index') +parser.add_argument('--numPages',default=3) +parser.add_argument('--base') +parser.add_argument('--desired') +parser.add_argument('--includeBlobs',default=False) @@ -483,7 +547,12 @@ def run(args): asyncio.set_event_loop(asyncio.new_event_loop()) if(args.ledger is None): args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1] - if args.action == "account_info": + if args.action == "fee": + asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port)) + elif args.action == "perf": + asyncio.get_event_loop().run_until_complete( + perf(args.ip,args.port)) + elif args.action == "account_info": res1 = asyncio.get_event_loop().run_until_complete( account_info(args.ip, args.port, args.account, args.ledger, args.binary)) if args.verify: @@ -493,18 +562,46 @@ def run(args): elif args.action == "ledger_entry": asyncio.get_event_loop().run_until_complete( ledger_entry(args.ip, args.port, args.index, args.ledger, args.binary)) + elif args.action == "ledger_entries": + res = asyncio.get_event_loop().run_until_complete( + ledger_entries(args.ip, args.port, args.ledger)) + if args.verify: + objects = [] + for x in res: + res2 = asyncio.get_event_loop().run_until_complete( + ledger_entry(args.p2pIp, args.p2pPort,x[0] , args.ledger, True)) + if res2[1] != x[1]: + print("mismatch!") + return + print("Data matches!") + elif args.action == "ledger_diff": + asyncio.get_event_loop().run_until_complete( + ledger_diff(args.ip, args.port, args.base, args.desired, args.includeBlobs)) elif args.action == "tx": + if args.verify: + args.binary = True if args.hash is None: args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0] - asyncio.get_event_loop().run_until_complete( + res = asyncio.get_event_loop().run_until_complete( tx(args.ip, args.port, args.hash, args.binary)) + if args.verify: + res2 = asyncio.get_event_loop().run_until_complete( + tx(args.p2pIp, args.p2pPort, args.hash, args.binary)) + print(compareTx(res,res2)) elif args.action == "account_tx": if args.verify: args.binary=True + if args.account is None: + args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0] + + res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False)) + args.account = res["transaction"]["Account"] - rng = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)) res = asyncio.get_event_loop().run_until_complete( account_tx(args.ip, args.port, args.account, args.binary)) + rng = getMinAndMax(res) + + if args.verify: res2 = asyncio.get_event_loop().run_until_complete( @@ -513,13 +610,19 @@ def run(args): elif args.action == "account_tx_full": if args.verify: args.binary=True - rng = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)) + if args.account is None: + args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0] + + res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False)) + args.account = res["transaction"]["Account"] res = asyncio.get_event_loop().run_until_complete( - account_tx_full(args.ip, args.port, args.account, args.binary)) + account_tx_full(args.ip, args.port, args.account, args.binary,None,None,int(args.numPages))) + rng = getMinAndMax(res) print(len(res["transactions"])) if args.verify: + print("requesting p2p node") res2 = asyncio.get_event_loop().run_until_complete( - account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary, rng[0],rng[1])) + account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary, rng[0],rng[1],int(args.numPages))) print(compareAccountTx(res,res2)) elif args.action == "ledger_data": @@ -559,10 +662,10 @@ def run(args): if args.verify: args.binary=True res = asyncio.get_event_loop().run_until_complete( - book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary)) + book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary,args.limit)) if args.verify: res2 = asyncio.get_event_loop().run_until_complete( - book_offers(args.p2pIp, args.p2pPort, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary)) + book_offers(args.p2pIp, args.p2pPort, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary, args.limit)) print(compare_book_offers(res,res2)) else: