diff --git a/handlers/AccountInfo.cpp b/handlers/AccountInfo.cpp index 6986ac1a..34581626 100644 --- a/handlers/AccountInfo.cpp +++ b/handlers/AccountInfo.cpp @@ -73,8 +73,12 @@ doAccountInfo( if (!accountID) { - response["error"] = "couldnt decode account"; - return response; + accountID = ripple::AccountID(); + if (!accountID->parseHex(request.at("account").as_string().c_str())) + { + response["error"] = "account malformed"; + return response; + } } auto key = ripple::keylet::account(accountID.value()); diff --git a/handlers/AccountTx.cpp b/handlers/AccountTx.cpp index 8b35e293..c4d4430d 100644 --- a/handlers/AccountTx.cpp +++ b/handlers/AccountTx.cpp @@ -137,12 +137,16 @@ doAccountTx(boost::json::object const& request, BackendInterface const& backend) return response; } - auto const account = ripple::parseBase58( + auto account = ripple::parseBase58( request.at("account").as_string().c_str()); if (!account) { - response["error"] = "account malformed"; - return response; + account = ripple::AccountID(); + if (!account->parseHex(request.at("account").as_string().c_str())) + { + response["error"] = "account malformed"; + return response; + } } auto ledgerSequence = ledgerSequenceFromRequest(request, backend); if (!ledgerSequence) @@ -182,8 +186,11 @@ doAccountTx(boost::json::object const& request, BackendInterface const& backend) request.at("limit").kind() == boost::json::kind::int64) limit = request.at("limit").as_int64(); boost::json::array txns; + auto start = std::chrono::system_clock::now(); auto [blobs, retCursor] = backend.fetchAccountTransactions(*account, limit, cursor); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) << __func__ << " db fetch took " << ((end - start).count() / 1000000000.0) << " num blobs = " << blobs.size(); for (auto const& txnPlusMeta : blobs) { if (txnPlusMeta.ledgerSequence > ledgerSequence) @@ -216,6 +223,8 @@ doAccountTx(boost::json::object const& request, BackendInterface const& backend) cursorJson["transaction_index"] = retCursor->transactionIndex; response["cursor"] = cursorJson; } + auto end2 = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) << __func__ << " serialization took " << ((end2 - end).count() / 1000000000.0); return response; } diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index cd8c4cae..1a26173b 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -57,196 +57,214 @@ doBookOffers( response["error"] = "Empty database"; return response; } - if (!request.contains("taker_pays")) + ripple::uint256 bookBase; + if (request.contains("book")) { - response["error"] = "Missing field taker_pays"; - return response; - } - - if (!request.contains("taker_gets")) - { - response["error"] = "Missing field taker_gets"; - return response; - } - - boost::json::object taker_pays; - if (request.at("taker_pays").kind() == boost::json::kind::object) - { - taker_pays = request.at("taker_pays").as_object(); - } - else - { - response["error"] = "Invalid field taker_pays"; - return response; - } - - boost::json::object taker_gets; - if (request.at("taker_gets").kind() == boost::json::kind::object) - { - taker_gets = request.at("taker_gets").as_object(); - } - else - { - response["error"] = "Invalid field taker_gets"; - return response; - } - - if (!taker_pays.contains("currency")) - { - response["error"] = "Missing field taker_pays.currency"; - return response; - } - - if (!taker_pays.at("currency").is_string()) - { - response["error"] = "taker_pays.currency should be string"; - return response; - } - - if (!taker_gets.contains("currency")) - { - response["error"] = "Missing field taker_gets.currency"; - return response; - } - - if (!taker_gets.at("currency").is_string()) - { - response["error"] = "taker_gets.currency should be string"; - return response; - } - - ripple::Currency pay_currency; - - if (!ripple::to_currency( - pay_currency, taker_pays.at("currency").as_string().c_str())) - { - response["error"] = - "Invalid field 'taker_pays.currency', bad currency."; - return response; - } - - ripple::Currency get_currency; - - if (!ripple::to_currency( - get_currency, taker_gets["currency"].as_string().c_str())) - { - response["error"] = - "Invalid field 'taker_gets.currency', bad currency."; - return response; - } - - ripple::AccountID pay_issuer; - - if (taker_pays.contains("issuer")) - { - if (!taker_pays.at("issuer").is_string()) + if (!bookBase.parseHex(request.at("book").as_string().c_str())) { - response["error"] = "taker_pays.issuer should be string"; - return response; - } - - if (!ripple::to_issuer( - pay_issuer, taker_pays.at("issuer").as_string().c_str())) - { - response["error"] = - "Invalid field 'taker_pays.issuer', bad issuer."; - return response; - } - - if (pay_issuer == ripple::noAccount()) - { - response["error"] = - "Invalid field 'taker_pays.issuer', bad issuer account one."; + response["error"] = "Error parsing book"; return response; } } else { - pay_issuer = ripple::xrpAccount(); - } - - if (isXRP(pay_currency) && !isXRP(pay_issuer)) - { - response["error"] = - "Unneeded field 'taker_pays.issuer' for XRP currency " - "specification."; - return response; - } - - if (!isXRP(pay_currency) && isXRP(pay_issuer)) - { - response["error"] = - "Invalid field 'taker_pays.issuer', expected non-XRP issuer."; - return response; - } - - ripple::AccountID get_issuer; - - if (taker_gets.contains("issuer")) - { - if (!taker_gets["issuer"].is_string()) + if (!request.contains("taker_pays")) { - response["error"] = "taker_gets.issuer should be string"; + response["error"] = "Missing field taker_pays"; return response; } - if (!ripple::to_issuer( - get_issuer, taker_gets.at("issuer").as_string().c_str())) + if (!request.contains("taker_gets")) + { + response["error"] = "Missing field taker_gets"; + return response; + } + + boost::json::object taker_pays; + if (request.at("taker_pays").kind() == boost::json::kind::object) + { + taker_pays = request.at("taker_pays").as_object(); + } + else + { + response["error"] = "Invalid field taker_pays"; + return response; + } + + boost::json::object taker_gets; + if (request.at("taker_gets").kind() == boost::json::kind::object) + { + taker_gets = request.at("taker_gets").as_object(); + } + else + { + response["error"] = "Invalid field taker_gets"; + return response; + } + + if (!taker_pays.contains("currency")) + { + response["error"] = "Missing field taker_pays.currency"; + return response; + } + + if (!taker_pays.at("currency").is_string()) + { + response["error"] = "taker_pays.currency should be string"; + return response; + } + + if (!taker_gets.contains("currency")) + { + response["error"] = "Missing field taker_gets.currency"; + return response; + } + + if (!taker_gets.at("currency").is_string()) + { + response["error"] = "taker_gets.currency should be string"; + return response; + } + + ripple::Currency pay_currency; + + if (!ripple::to_currency( + pay_currency, taker_pays.at("currency").as_string().c_str())) { response["error"] = - "Invalid field 'taker_gets.issuer', bad issuer."; + "Invalid field 'taker_pays.currency', bad currency."; return response; } - if (get_issuer == ripple::noAccount()) + ripple::Currency get_currency; + + if (!ripple::to_currency( + get_currency, taker_gets["currency"].as_string().c_str())) { response["error"] = - "Invalid field 'taker_gets.issuer', bad issuer account one."; + "Invalid field 'taker_gets.currency', bad currency."; return response; } - } - else - { - get_issuer = ripple::xrpAccount(); - } - if (ripple::isXRP(get_currency) && !ripple::isXRP(get_issuer)) - { - response["error"] = - "Unneeded field 'taker_gets.issuer' for XRP currency " - "specification."; - return response; - } + ripple::AccountID pay_issuer; - if (!ripple::isXRP(get_currency) && ripple::isXRP(get_issuer)) - { - response["error"] = - "Invalid field 'taker_gets.issuer', expected non-XRP issuer."; - return response; - } - - boost::optional takerID; - if (request.contains("taker")) - { - if (!request.at("taker").is_string()) + if (taker_pays.contains("issuer")) { - response["error"] = "taker should be string"; - return response; - } + if (!taker_pays.at("issuer").is_string()) + { + response["error"] = "taker_pays.issuer should be string"; + return response; + } - takerID = ripple::parseBase58( - request.at("taker").as_string().c_str()); - if (!takerID) + if (!ripple::to_issuer( + pay_issuer, taker_pays.at("issuer").as_string().c_str())) + { + response["error"] = + "Invalid field 'taker_pays.issuer', bad issuer."; + return response; + } + + if (pay_issuer == ripple::noAccount()) + { + response["error"] = + "Invalid field 'taker_pays.issuer', bad issuer account " + "one."; + return response; + } + } + else { - response["error"] = "Invalid taker"; + pay_issuer = ripple::xrpAccount(); + } + + if (isXRP(pay_currency) && !isXRP(pay_issuer)) + { + response["error"] = + "Unneeded field 'taker_pays.issuer' for XRP currency " + "specification."; return response; } - } - if (pay_currency == get_currency && pay_issuer == get_issuer) - { - response["error"] = "Bad market"; - return response; + if (!isXRP(pay_currency) && isXRP(pay_issuer)) + { + response["error"] = + "Invalid field 'taker_pays.issuer', expected non-XRP issuer."; + return response; + } + + ripple::AccountID get_issuer; + + if (taker_gets.contains("issuer")) + { + if (!taker_gets["issuer"].is_string()) + { + response["error"] = "taker_gets.issuer should be string"; + return response; + } + + if (!ripple::to_issuer( + get_issuer, taker_gets.at("issuer").as_string().c_str())) + { + response["error"] = + "Invalid field 'taker_gets.issuer', bad issuer."; + return response; + } + + if (get_issuer == ripple::noAccount()) + { + response["error"] = + "Invalid field 'taker_gets.issuer', bad issuer account " + "one."; + return response; + } + } + else + { + get_issuer = ripple::xrpAccount(); + } + + if (ripple::isXRP(get_currency) && !ripple::isXRP(get_issuer)) + { + response["error"] = + "Unneeded field 'taker_gets.issuer' for XRP currency " + "specification."; + return response; + } + + if (!ripple::isXRP(get_currency) && ripple::isXRP(get_issuer)) + { + response["error"] = + "Invalid field 'taker_gets.issuer', expected non-XRP issuer."; + return response; + } + + boost::optional takerID; + if (request.contains("taker")) + { + if (!request.at("taker").is_string()) + { + response["error"] = "taker should be string"; + return response; + } + + takerID = ripple::parseBase58( + request.at("taker").as_string().c_str()); + if (!takerID) + { + response["error"] = "Invalid taker"; + return response; + } + } + + if (pay_currency == get_currency && pay_issuer == get_issuer) + { + response["error"] = "Bad market"; + return response; + } + ripple::Book book = { + {pay_currency, pay_issuer}, {get_currency, get_issuer}}; + + bookBase = getBookBase(book); } std::uint32_t limit = 200; @@ -261,10 +279,6 @@ doBookOffers( cursor->parseHex(request.at("cursor").as_string().c_str()); } - ripple::Book book = { - {pay_currency, pay_issuer}, {get_currency, get_issuer}}; - - ripple::uint256 bookBase = getBookBase(book); auto start = std::chrono::system_clock::now(); std::cout << "getting Book Offers" << std::endl; auto [offers, retCursor] = diff --git a/metrics.py b/metrics.py index e13eb630..32b612f9 100644 --- a/metrics.py +++ b/metrics.py @@ -10,8 +10,36 @@ def getTime(line): timestamp = datetime.strptime(timestampSub, '%Y-%m-%d %H:%M:%S.%f') return timestamp.timestamp() +def parseAccountTx(filename): -def parseLogs(filename, interval): + + with open(filename) as f: + totalProcTime = 0.0 + totalTxnTime = 0.0 + numCalls = 0 + for line in f: + if "executed stored_procedure" in line: + idx = line.find("in ") + idx = idx + 3 + idx2 = line.find("num") + procTime = float(line[idx:idx2]) + totalProcTime += procTime + if "fetchTransactions fetched" in line: + idx = line.find("took ") + idx = idx + 5 + txnTime = float(line[idx:]) + totalTxnTime += txnTime + numCalls = numCalls + 1 + print(totalProcTime) + print(totalProcTime/numCalls) + print(totalTxnTime) + print(totalTxnTime/numCalls) + + + + + +def parseLogs(filename, interval, minTxnCount = 0): with open(filename) as f: @@ -33,7 +61,9 @@ def parseLogs(filename, interval): intervalStart = 0 intervalEnd = 0 intervalLedgers = 0 + ledgersPerSecond = 0 + print("ledgers, transactions, objects, loadTime, loadTime/ledger, ledgers/sec, txns/sec, objs/sec") for line in f: if "Load phase" in line: sequenceIdx = line.find("Sequence : ") @@ -54,12 +84,13 @@ def parseLogs(filename, interval): loadTime = line[loadTimeIdx + len(loadTimeSubstr):txnsIdx] txnsPerSecond = line[txnsIdx + len(txnsSubstr):objsIdx] objsPerSecond = line[objsIdx + len(objsSubstr):-1] - totalTime += float(loadTime); - totalTxns += float(txnCount) - totalObjs += float(objCount) - intervalTime += float(loadTime) - intervalTxns += float(txnCount) - intervalObjs += float(objCount) + if int(txnCount) >= minTxnCount: + totalTime += float(loadTime); + totalTxns += float(txnCount) + totalObjs += float(objCount) + intervalTime += float(loadTime) + intervalTxns += float(txnCount) + intervalObjs += float(objCount) totalLoadTime += float(loadTime) intervalLoadTime += float(loadTime) @@ -71,8 +102,6 @@ def parseLogs(filename, interval): prevEnd = end end = getTime(line) - if end - prevEnd > 3 and prevEnd != 0: - print("Caught up!") if intervalStart == 0: intervalStart = getTime(line) @@ -92,26 +121,30 @@ def parseLogs(filename, interval): if int(sequence) % interval == 0: - print("Sequence = " + sequence + " : [time, txCount, objCount, txPerSec, objsPerSec]") - print(loadTime + " : " - + txnCount + " : " - + objCount + " : " - + txnsPerSecond + " : " - + objsPerSecond) - print("Interval Aggregate ( " + str(interval) + " ) [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]: ") - print(str(intervalLedgers) + " : " - + str(intervalEnd - intervalStart) + " : " - + str(intervalLedgersPerSecond) + " : " - + str(intervalLoadTime/intervalLedgers) + " : " - + str(intervalTxns/intervalTime) + " : " - + str(intervalObjs/intervalTime)) - print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]") - print(str(totalLedgers) + " : " - + str(end-start) + " : " - + str(ledgersPerSecond) + " : " - + str(totalLoadTime/totalLedgers) + " : " - + str(totalTxns/totalTime) + " : " - + str(totalObjs/totalTime)) + # print("Sequence = " + sequence + " : [time, txCount, objCount, txPerSec, objsPerSec]") + # print(loadTime + " , " + # + txnCount + " , " + # + objCount + " , " + # + txnsPerSecond + " , " + # + objsPerSecond) + # print("Interval Aggregate ( " + str(interval) + " ) [ledgers, txns, objects, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]: ") + print(str(intervalLedgers) + " , " + + str(intervalTxns) + " , " + + str(intervalObjs) + " , " + + str(intervalLoadTime) + " , " + + str(intervalLoadTime/intervalLedgers) + " , " + + str(intervalLedgers/intervalLoadTime) + " , " + + str(intervalTxns/intervalLoadTime) + " , " + + str(intervalObjs/intervalLoadTime)) + # print("Total Aggregate: [ledgers, txns, objects, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]") + # print(str(totalLedgers) + " , " + # + str(totalTxns) + " , " + # + str(totalObjs) + " , " + # + str(end-start) + " , " + # + str(ledgersPerSecond) + " , " + # + str(totalLoadTime/totalLedgers) + " , " + # + str(totalTxns/totalTime) + " , " + # + str(totalObjs/totalTime)) if int(sequence) % interval == 0: intervalTime = 0 intervalTxns = 0 @@ -120,6 +153,15 @@ def parseLogs(filename, interval): intervalEnd = 0 intervalLedgers = 0 intervalLoadTime = 0 + print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]") + print(totalLedgers) + print(totalLoadTime) + print(str(totalLedgers) + " : " + + str(end-start) + " : " + + str(ledgersPerSecond) + " : " + + str(totalLoadTime/totalLedgers) + " : " + + str(totalTxns/totalTime) + " : " + + str(totalObjs/totalTime)) @@ -127,10 +169,15 @@ def parseLogs(filename, interval): parser = argparse.ArgumentParser(description='parses logs') parser.add_argument("--filename") parser.add_argument("--interval",default=100000) +parser.add_argument("--minTxnCount",default=0) +parser.add_argument("--account_tx",default=False) args = parser.parse_args() def run(args): - parseLogs(args.filename, int(args.interval)) + if args.account_tx: + parseAccountTx(args.filename) + else: + parseLogs(args.filename, int(args.interval)) run(args) diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index f66c4067..58b2388a 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -2,15 +2,8 @@ namespace Backend { BackendIndexer::BackendIndexer(boost::json::object const& config) - : keyShift_(config.at("keyshift").as_int64()) - , bookShift_(config.at("bookshift").as_int64()) + : shift_(config.at("indexer_shift").as_int64()) { - BOOST_LOG_TRIVIAL(info) << "Indexer - starting with keyShift_ = " - << std::to_string(keyShift_); - - BOOST_LOG_TRIVIAL(info) << "Indexer - starting with keyShift_ = " - << std::to_string(bookShift_); - work_.emplace(ioc_); ioThread_ = std::thread{[this]() { ioc_.run(); }}; }; @@ -25,11 +18,12 @@ void BackendIndexer::addKey(ripple::uint256 const& key) { keys.insert(key); + keysCumulative.insert(key); } void BackendIndexer::deleteKey(ripple::uint256 const& key) { - keys.erase(key); + keysCumulative.erase(key); } void @@ -37,65 +31,108 @@ BackendIndexer::addBookOffer( ripple::uint256 const& book, ripple::uint256 const& offerKey) { - booksToOffers[book].insert(offerKey); + books[book].insert(offerKey); + booksCumulative[book].insert(offerKey); } void BackendIndexer::deleteBookOffer( ripple::uint256 const& book, ripple::uint256 const& offerKey) { - booksToOffers[book].erase(offerKey); - booksToDeletedOffers[book].insert(offerKey); + booksCumulative[book].erase(offerKey); } -std::vector -BackendIndexer::getCurrentOffers(ripple::uint256 const& book) +void +BackendIndexer::clearCaches() { - std::vector offers; - offers.reserve(booksToOffers[book].size() + booksToOffers[book].size()); - - for (auto const& offer : booksToOffers[book]) + keysCumulative = {}; + booksCumulative = {}; +} +void +BackendIndexer::populateCaches(BackendInterface const& backend) +{ + if (keysCumulative.size() > 0) { - offers.push_back(offer); + BOOST_LOG_TRIVIAL(info) + << __func__ << " caches already populated. returning"; + return; + } + auto tip = backend.fetchLatestLedgerSequence(); + if (!tip) + return; + std::optional cursor; + while (true) + { + try + { + auto [objects, curCursor] = + backend.fetchLedgerPage(cursor, *tip, 2048); + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + cursor = curCursor; + for (auto& obj : objects) + { + keysCumulative.insert(obj.key); + if (isOffer(obj.blob)) + { + auto book = getBook(obj.blob); + booksCumulative[book].insert(obj.key); + } + } + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } +} + +void +BackendIndexer::writeNext( + uint32_t ledgerSequence, + BackendInterface const& backend) +{ + BOOST_LOG_TRIVIAL(info) + << __func__ + << " starting. sequence = " << std::to_string(ledgerSequence); + bool isFlag = (ledgerSequence % (1 << shift_)) == 0; + if (!backend.fetchLedgerRange()) + { + isFlag = true; } - for(auto const& offer : booksToDeletedOffers[book]) + if (isFlag) { - offers.push_back(offer); - } + uint32_t nextSeq = + ((ledgerSequence >> shift_ << shift_) + (1 << shift_)); + BOOST_LOG_TRIVIAL(info) + << __func__ << " actually doing the write. keysCumulative.size() = " + << std::to_string(keysCumulative.size()); + backend.writeKeys(keysCumulative, nextSeq); + BOOST_LOG_TRIVIAL(info) << __func__ << " wrote keys"; - return offers; + backend.writeBooks(booksCumulative, nextSeq); + BOOST_LOG_TRIVIAL(info) << __func__ << " wrote books"; + } } void BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) { - if (ledgerSequence >> keyShift_ << keyShift_ == ledgerSequence) + bool isFlag = ledgerSequence % (1 << shift_) == 0; + if (!backend.fetchLedgerRange()) { - std::unordered_set keysCopy = keys; - boost::asio::post(ioc_, [=, &backend]() { - BOOST_LOG_TRIVIAL(info) << "Indexer - writing keys. Ledger = " - << std::to_string(ledgerSequence); - backend.writeKeys(keysCopy, ledgerSequence); - BOOST_LOG_TRIVIAL(info) << "Indexer - wrote keys. Ledger = " - << std::to_string(ledgerSequence); - }); + isFlag = true; } - if (ledgerSequence >> bookShift_ << bookShift_ == ledgerSequence) - { - std::unordered_map> - booksToOffersCopy = booksToOffers; - std::unordered_map> - booksToDeletedOffersCopy = booksToDeletedOffers; - boost::asio::post(ioc_, [=, &backend]() { - BOOST_LOG_TRIVIAL(info) << "Indexer - writing books. Ledger = " - << std::to_string(ledgerSequence); - backend.writeBooks(booksToOffersCopy, ledgerSequence); - backend.writeBooks(booksToDeletedOffersCopy, ledgerSequence); - BOOST_LOG_TRIVIAL(info) << "Indexer - wrote books. Ledger = " - << std::to_string(ledgerSequence); - }); - booksToDeletedOffers = {}; - } -} + uint32_t nextSeq = ((ledgerSequence >> shift_ << shift_) + (1 << shift_)); + uint32_t curSeq = isFlag ? ledgerSequence : nextSeq; + backend.writeKeys(keys, curSeq); + keys = {}; + backend.writeBooks(books, curSeq); + books = {}; + +} // namespace Backend } // namespace Backend diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index 987ed4be..dfb5dc14 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -61,26 +61,28 @@ class BackendIndexer std::mutex mutex_; std::optional work_; std::thread ioThread_; - uint32_t keyShift_ = 16; - uint32_t bookShift_ = 16; + uint32_t shift_ = 16; std::unordered_set keys; std::unordered_map> - booksToOffers; + books; + std::unordered_set keysCumulative; std::unordered_map> - booksToDeletedOffers; + booksCumulative; public: BackendIndexer(boost::json::object const& config); ~BackendIndexer(); + void + populateCaches(BackendInterface const& backend); + void + clearCaches(); + void addKey(ripple::uint256 const& key); void deleteKey(ripple::uint256 const& key); - std::vector - getCurrentOffers(ripple::uint256 const& book); - void addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey); void @@ -90,6 +92,13 @@ public: void finish(uint32_t ledgerSequence, BackendInterface const& backend); + void + writeNext(uint32_t ledgerSequence, BackendInterface const& backend); + uint32_t + getShift() + { + return shift_; + } }; class BackendInterface @@ -103,6 +112,26 @@ public: { } + BackendIndexer& + getIndexer() const + { + return indexer_; + } + + std::optional + getIndexOfSeq(uint32_t seq) const + { + if (!fetchLedgerRange()) + return {}; + if (fetchLedgerRange()->minSequence == seq) + return seq; + uint32_t shift = indexer_.getShift(); + uint32_t incr = (1 << shift); + if ((seq % incr) == 0) + return seq; + return (seq >> shift << shift) + incr; + } + virtual std::optional fetchLatestLedgerSequence() const = 0; @@ -216,7 +245,7 @@ public: // Open the database. Set up all of the necessary objects and // datastructures. After this call completes, the database is ready for use. virtual void - open() = 0; + open(bool readOnly) = 0; // Close the database, releasing any resources virtual void @@ -229,6 +258,7 @@ public: finishWrites(uint32_t ledgerSequence) const { indexer_.finish(ledgerSequence, *this); + indexer_.writeNext(ledgerSequence, *this); return doFinishWrites(); } virtual bool diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index c5adf6f9..a445c393 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -473,129 +473,61 @@ CassandraBackend::fetchLedgerPage( std::uint32_t ledgerSequence, std::uint32_t limit) const { - auto rng = fetchLedgerRange(); - if (!rng) - return {{}, {}}; - if (!isIndexed(ledgerSequence)) - { - return fetchLedgerPage2(cursor, ledgerSequence, limit); - } + auto index = getIndexOfSeq(ledgerSequence); + if (!index) + return {}; LedgerPage page; - bool cursorIsInt = false; - if (cursor && !cursor->isZero()) - { - bool foundNonZero = false; - for (size_t i = 0; i < 28 && !foundNonZero; ++i) - { - if (cursor->data()[i] != 0) - foundNonZero = true; - } - cursorIsInt = !foundNonZero; - } if (cursor) BOOST_LOG_TRIVIAL(debug) - << __func__ << " - Cursor = " << ripple::strHex(*cursor) - << " : cursorIsInt = " << std::to_string(cursorIsInt); - if (!cursor || !cursorIsInt) - { - BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger"; - CassandraStatement statement{selectKeys_}; - uint32_t upper = ledgerSequence; - if (upper != rng->minSequence) - upper = (ledgerSequence >> indexerShift_) << indexerShift_; - if (upper != ledgerSequence) - upper += (1 << indexerShift_); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " upper is " << std::to_string(upper); - statement.bindInt(upper); - if (cursor) - statement.bindBytes(*cursor); - else - { - ripple::uint256 zero; - statement.bindBytes(zero); - } - statement.bindUInt(limit); - CassandraResult result = executeSyncRead(statement); - BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Got keys"; - if (!!result) - { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - got keys - size = " << result.numRows(); - std::vector keys; - - do - { - keys.push_back(result.getUInt256()); - } while (result.nextRow()); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Using base ledger. Read keys"; - auto objects = fetchLedgerObjects(keys, ledgerSequence); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Using base ledger. Got objects"; - if (objects.size() != keys.size()) - throw std::runtime_error( - "Mismatch in size of objects and keys"); - if (keys.size() == limit) - page.cursor = keys[keys.size() - 1]; - else if (ledgerSequence < upper) - page.cursor = upper - 1; - - if (cursor) - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Cursor = " << ripple::strHex(*page.cursor); - - for (size_t i = 0; i < objects.size(); ++i) - { - auto& obj = objects[i]; - auto& key = keys[i]; - if (obj.size()) - { - page.objects.push_back({std::move(key), std::move(obj)}); - } - } - return page; - } - } + << __func__ << " - Cursor = " << ripple::strHex(*cursor); + BOOST_LOG_TRIVIAL(debug) + << __func__ << " ledgerSequence = " << std::to_string(ledgerSequence) + << " index = " << std::to_string(*index); + CassandraStatement statement{selectKeys_}; + statement.bindInt(*index); + if (cursor) + statement.bindBytes(*cursor); else { - uint32_t curSequence = 0; - for (size_t i = 28; i < 32; ++i) - { - uint32_t digit = cursor->data()[i]; - digit = digit << (8 * (31 - i)); - curSequence += digit; - } + ripple::uint256 zero; + statement.bindBytes(zero); + } + statement.bindUInt(limit); + CassandraResult result = executeSyncRead(statement); + BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Got keys"; + if (!!result) + { BOOST_LOG_TRIVIAL(debug) - << __func__ << " Using ledger diffs. Sequence = " << curSequence - << " size_of uint32_t " << std::to_string(sizeof(uint32_t)) - << " cursor = " << ripple::strHex(*cursor); - auto diff = fetchLedgerDiff(curSequence); - BOOST_LOG_TRIVIAL(debug) << __func__ << " diff size = " << diff.size(); - std::vector deletedKeys; - for (auto& obj : diff) + << __func__ << " - got keys - size = " << result.numRows(); + std::vector keys; + + do { - if (obj.blob.size() == 0) - deletedKeys.push_back(std::move(obj.key)); - } - auto objects = fetchLedgerObjects(deletedKeys, ledgerSequence); - if (objects.size() != deletedKeys.size()) + keys.push_back(result.getUInt256()); + } while (result.nextRow()); + BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Read keys"; + auto objects = fetchLedgerObjects(keys, ledgerSequence); + BOOST_LOG_TRIVIAL(debug) + << __func__ << " Using base ledger. Got objects"; + if (objects.size() != keys.size()) throw std::runtime_error("Mismatch in size of objects and keys"); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " deleted keys size = " << deletedKeys.size(); + if (keys.size() == limit) + page.cursor = keys[keys.size() - 1]; + + if (cursor) + BOOST_LOG_TRIVIAL(debug) + << __func__ << " Cursor = " << ripple::strHex(*page.cursor); + for (size_t i = 0; i < objects.size(); ++i) { auto& obj = objects[i]; - auto& key = deletedKeys[i]; + auto& key = keys[i]; if (obj.size()) { page.objects.push_back({std::move(key), std::move(obj)}); } } - if (curSequence - 1 >= ledgerSequence) - page.cursor = curSequence - 1; return page; - // do the diff algorithm } return {{}, {}}; } @@ -651,10 +583,7 @@ CassandraBackend::fetchBookOffers( uint32_t upper = sequence; auto lastPage = rng->maxSequence - (rng->maxSequence % 256); - if (lastPage < sequence) { - keys = indexer_.getCurrentOffers(book); - } - else if (sequence != rng->minSequence) + if (sequence != rng->minSequence) { upper = (sequence >> 8) << 8; if (upper != sequence) @@ -892,7 +821,7 @@ CassandraBackend::writeKeys( std::mutex mtx; std::vector> cbs; cbs.reserve(keys.size()); - uint32_t concurrentLimit = maxRequestsOutstanding / 2; + uint32_t concurrentLimit = maxRequestsOutstanding; uint32_t numSubmitted = 0; for (auto& key : keys) { @@ -978,6 +907,8 @@ CassandraBackend::writeBooks( bool CassandraBackend::isIndexed(uint32_t ledgerSequence) const { + return false; + /* auto rng = fetchLedgerRange(); if (!rng) return false; @@ -992,11 +923,14 @@ CassandraBackend::isIndexed(uint32_t ledgerSequence) const statement.bindUInt(1); CassandraResult result = executeSyncRead(statement); return !!result; + */ } std::optional CassandraBackend::getNextToIndex() const { + return {}; + /* auto rng = fetchLedgerRange(); if (!rng) return {}; @@ -1006,6 +940,7 @@ CassandraBackend::getNextToIndex() const cur = ((cur >> indexerShift_) << indexerShift_) + (1 << indexerShift_); } return cur; + */ } bool @@ -1202,7 +1137,7 @@ CassandraBackend::doOnlineDelete(uint32_t minLedgerToKeep) const } void -CassandraBackend::open() +CassandraBackend::open(bool readOnly) { std::cout << config_ << std::endl; auto getString = [this](std::string const& field) -> std::string { @@ -1715,6 +1650,7 @@ CassandraBackend::open() setupPreparedStatements = true; } + /* while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -1748,31 +1684,34 @@ CassandraBackend::open() } break; } + */ if (config_.contains("max_requests_outstanding")) { maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64(); } - // if (config_.contains("run_indexer")) - // { - // if (config_["run_indexer"].as_bool()) - // { - // if (config_.contains("indexer_shift")) - // { - // indexerShift_ = config_["indexer_shift"].as_int64(); - // } - // indexer_ = std::thread{[this]() { - // auto seq = getNextToIndex(); - // if (seq) - // { - // BOOST_LOG_TRIVIAL(info) - // << "Running indexer. Ledger = " << std::to_string(*seq); - // runIndexer(*seq); - // BOOST_LOG_TRIVIAL(info) << "Ran indexer"; - // } - // }}; - // } - // } + /* + if (config_.contains("run_indexer")) + { + if (config_["run_indexer"].as_bool()) + { + if (config_.contains("indexer_shift")) + { + indexerShift_ = config_["indexer_shift"].as_int64(); + } + indexer_ = std::thread{[this]() { + auto seq = getNextToIndex(); + if (seq) + { + BOOST_LOG_TRIVIAL(info) + << "Running indexer. Ledger = " << std::to_string(*seq); + runIndexer(*seq); + BOOST_LOG_TRIVIAL(info) << "Ran indexer"; + } + }}; + } + } + */ work_.emplace(ioContext_); ioThread_ = std::thread{[this]() { ioContext_.run(); }}; diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 0c3498f4..9f9a027c 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -659,9 +659,6 @@ private: std::optional work_; std::thread ioThread_; - // std::thread indexer_; - uint32_t indexerShift_ = 16; - // maximum number of concurrent in flight requests. New requests will wait // for earlier requests to finish if this limit is exceeded uint32_t maxRequestsOutstanding = 10000; @@ -711,7 +708,7 @@ public: // Create the table if it doesn't exist already // @param createIfMissing ignored void - open() override; + open(bool readOnly) override; // Close the connection to the database void @@ -721,8 +718,6 @@ public: std::lock_guard lock(mutex_); work_.reset(); ioThread_.join(); - // if (indexer_.joinable()) - // indexer_.join(); } open_ = false; } @@ -1216,8 +1211,6 @@ public: , isDeleted(isDeleted) , book(std::move(inBook)) { - if (book) - ++refs; } }; struct WriteAccountTxCallbackData diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index b61c1281..fc508de1 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -618,7 +618,7 @@ ETLLoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects) auto [status, data] = source->fetchLedger(ledgerSequence, getObjects); response = std::move(data); - if (status.ok() && response.validated()) + if (status.ok() && (response.validated()|| true)) { BOOST_LOG_TRIVIAL(info) << "Successfully fetched ledger = " << ledgerSequence @@ -815,7 +815,7 @@ ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence) << __func__ << " : " << "Attempting to execute func. ledger sequence = " << ledgerSequence << " - source = " << source->toString(); - if (source->hasLedger(ledgerSequence)) + if (source->hasLedger(ledgerSequence)|| true) { bool res = f(source); if (res) diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index 1fbc6416..77d38927 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -282,6 +282,7 @@ Pg::bulkInsert(char const* table, std::string const& records) ss << "bulkInsert to " << table << ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status; disconnect(); + BOOST_LOG_TRIVIAL(debug) << __func__ << " " << records; throw std::runtime_error(ss.str()); } } @@ -748,7 +749,16 @@ CREATE TABLE IF NOT EXISTS objects ( ledger_seq bigint NOT NULL, object bytea, PRIMARY KEY(key, ledger_seq) -); +) PARTITION BY RANGE (ledger_seq); + +create table if not exists objects1 partition of objects for values from (0) to (10000000); +create table if not exists objects2 partition of objects for values from (10000000) to (20000000); +create table if not exists objects3 partition of objects for values from (20000000) to (30000000); +create table if not exists objects4 partition of objects for values from (30000000) to (40000000); +create table if not exists objects5 partition of objects for values from (40000000) to (50000000); +create table if not exists objects6 partition of objects for values from (50000000) to (60000000); +create table if not exists objects7 partition of objects for values from (60000000) to (70000000); + -- Index for lookups by ledger hash. CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers @@ -757,34 +767,163 @@ CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers -- Transactions table. Deletes from the ledger table -- cascade here based on ledger_seq. CREATE TABLE IF NOT EXISTS transactions ( - hash bytea PRIMARY KEY, - ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, + hash bytea NOT NULL, + ledger_seq bigint NOT NULL , transaction bytea NOT NULL, metadata bytea NOT NULL -); --- Index for lookups by ledger hash. -CREATE INDEX IF NOT EXISTS ledgers_ledger_seq_idx ON transactions - USING hash (ledger_seq); +) PARTITION BY RANGE(ledger_seq); +create table if not exists transactions1 partition of transactions for values from (0) to (10000000); +create table if not exists transactions2 partition of transactions for values from (10000000) to (20000000); +create table if not exists transactions3 partition of transactions for values from (20000000) to (30000000); +create table if not exists transactions4 partition of transactions for values from (30000000) to (40000000); +create table if not exists transactions5 partition of transactions for values from (40000000) to (50000000); +create table if not exists transactions6 partition of transactions for values from (50000000) to (60000000); +create table if not exists transactions7 partition of transactions for values from (60000000) to (70000000); + +create index if not exists tx_by_hash on transactions using hash (hash); +create index if not exists tx_by_lgr_seq on transactions using hash (ledger_seq); -- Table that maps accounts to transactions affecting them. Deletes from the -- ledger table cascade here based on ledger_seq. CREATE TABLE IF NOT EXISTS account_transactions ( account bytea NOT NULL, - ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, + ledger_seq bigint NOT NULL , transaction_index bigint NOT NULL, hash bytea NOT NULL, - PRIMARY KEY (account, ledger_seq, transaction_index) -); + PRIMARY KEY (account, ledger_seq, transaction_index, hash) +) PARTITION BY RANGE (ledger_seq); +create table if not exists account_transactions1 partition of account_transactions for values from (0) to (10000000); +create table if not exists account_transactions2 partition of account_transactions for values from (10000000) to (20000000); +create table if not exists account_transactions3 partition of account_transactions for values from (20000000) to (30000000); +create table if not exists account_transactions4 partition of account_transactions for values from (30000000) to (40000000); +create table if not exists account_transactions5 partition of account_transactions for values from (40000000) to (50000000); +create table if not exists account_transactions6 partition of account_transactions for values from (50000000) to (60000000); +create table if not exists account_transactions7 partition of account_transactions for values from (60000000) to (70000000); + -- Table that maps a book to a list of offers in that book. Deletes from the ledger table -- cascade here based on ledger_seq. CREATE TABLE IF NOT EXISTS books ( - book bytea NOT NULL, ledger_seq bigint NOT NULL, - deleted boolean NOT NULL, - offer_key bytea NOT NULL, - PRIMARY KEY(book, offer_key, deleted) + book bytea NOT NULL, + offer_key bytea NOT NULL ); +CREATE INDEX book_idx ON books using btree(ledger_seq, book, offer_key); + +CREATE TABLE IF NOT EXISTS keys ( + ledger_seq bigint NOT NULL, + key bytea NOT NULL +); + +CREATE INDEX key_idx ON keys USING btree(ledger_seq, key); + +-- account_tx() RPC helper. From the rippled reporting process, only the +-- parameters without defaults are required. For the parameters with +-- defaults, validation should be done by rippled, such as: +-- _in_account_id should be a valid xrp base58 address. +-- _in_forward either true or false according to the published api +-- _in_limit should be validated and not simply passed through from +-- client. +-- +-- For _in_ledger_index_min and _in_ledger_index_max, if passed in the +-- request, verify that their type is int and pass through as is. +-- For _ledger_hash, verify and convert from hex length 32 bytes and +-- prepend with \x (\\x C++). +-- +-- For _in_ledger_index, if the input type is integer, then pass through +-- as is. If the type is string and contents = validated, then do not +-- set _in_ledger_index. Instead set _in_invalidated to TRUE. +-- +-- There is no need for rippled to do any type of lookup on max/min +-- ledger range, lookup of hash, or the like. This functions does those +-- things, including error responses if bad input. Only the above must +-- be done to set the correct search range. +-- +-- If a marker is present in the request, verify the members 'ledger' +-- and 'seq' are integers and they correspond to _in_marker_seq +-- _in_marker_index. +-- To reiterate: +-- JSON input field 'ledger' corresponds to _in_marker_seq +-- JSON input field 'seq' corresponds to _in_marker_index +CREATE OR REPLACE FUNCTION account_tx( + _in_account_id bytea, + _in_limit bigint, + _in_marker_seq bigint DEFAULT NULL::bigint, + _in_marker_index bigint DEFAULT NULL::bigint) +RETURNS jsonb +AS $$ +DECLARE + _min bigint; + _max bigint; + _marker bool; + _between_min bigint; + _between_max bigint; + _sql text; + _cursor refcursor; + _result jsonb; + _record record; + _tally bigint := 0; + _ret_marker jsonb; + _transactions jsonb[] := '{}'; +BEGIN + _min := min_ledger(); + _max := max_ledger(); + IF _in_marker_seq IS NOT NULL OR _in_marker_index IS NOT NULL THEN + _marker := TRUE; + IF _in_marker_seq IS NULL OR _in_marker_index IS NULL THEN + -- The rippled implementation returns no transaction results + -- if either of these values are missing. + _between_min := 0; + _between_max := 0; + ELSE + _between_min := _min; + _between_max := _in_marker_seq; + END IF; + ELSE + _marker := FALSE; + _between_min := _min; + _between_max := _max; + END IF; + + + _sql := format('SELECT hash, ledger_seq, transaction_index FROM account_transactions WHERE account = $1 + AND ledger_seq BETWEEN $2 AND $3 ORDER BY ledger_seq DESC, transaction_index DESC'); + + OPEN _cursor FOR EXECUTE _sql USING _in_account_id, _between_min, _between_max; + LOOP + FETCH _cursor INTO _record; + IF _record IS NULL THEN EXIT; END IF; + IF _marker IS TRUE THEN + IF _in_marker_seq = _record.ledger_seq THEN + IF _in_marker_index < _record.transaction_index THEN + CONTINUE; + END IF; + END IF; + _marker := FALSE; + END IF; + _tally := _tally + 1; + IF _tally > _in_limit THEN + _ret_marker := jsonb_build_object( + 'ledger_sequence', _record.ledger_seq, + 'transaction_index', _record.transaction_index); + EXIT; + END IF; + + -- Is the transaction index in the tx object? + _transactions := _transactions || jsonb_build_object('hash',_record.hash); + END LOOP; + CLOSE _cursor; + + _result := jsonb_build_object('ledger_index_min', _min, + 'ledger_index_max', _max, + 'transactions', _transactions); + IF _ret_marker IS NOT NULL THEN + _result := _result || jsonb_build_object('cursor', _ret_marker); + END IF; + RETURN _result; +END; +$$ LANGUAGE plpgsql; + -- Avoid inadvertent administrative tampering with committed data. CREATE OR REPLACE RULE ledgers_update_protect AS ON UPDATE TO ledgers DO INSTEAD NOTHING; diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 4085cc44..7d6bc4c5 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -1,3 +1,4 @@ +#include #include #include namespace Backend { @@ -7,6 +8,10 @@ PostgresBackend::PostgresBackend(boost::json::object const& config) , pgPool_(make_PgPool(config)) , writeConnection_(pgPool_) { + if (config.contains("write_interval")) + { + writeInterval_ = config.at("write_interval").as_int64(); + } } void PostgresBackend::writeLedger( @@ -68,18 +73,15 @@ PostgresBackend::doWriteLedgerObject( numRowsInObjectsBuffer_++; // If the buffer gets too large, the insert fails. Not sure why. So we // insert after 1 million records - if (numRowsInObjectsBuffer_ % 1000000 == 0) + if (numRowsInObjectsBuffer_ % writeInterval_ == 0) { + BOOST_LOG_TRIVIAL(info) + << __func__ << " Flushing large buffer. num objects = " + << numRowsInObjectsBuffer_; writeConnection_.bulkInsert("objects", objectsBuffer_.str()); + BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer"; objectsBuffer_.str(""); } - - if (book) - { - booksBuffer_ << "\\\\x" << ripple::strHex(*book) << '\t' - << std::to_string(seq) << '\t' << isDeleted << '\t' - << "\\\\x" << ripple::strHex(key) << '\n'; - } } void @@ -327,30 +329,40 @@ PostgresBackend::fetchLedgerPage( std::uint32_t ledgerSequence, std::uint32_t limit) const { + auto index = getIndexOfSeq(ledgerSequence); + if (!index) + return {}; PgQuery pgQuery(pgPool_); pgQuery("SET statement_timeout TO 10000"); std::stringstream sql; - sql << "SELECT key,object FROM" - << " (SELECT DISTINCT ON (key) * FROM objects" - << " WHERE ledger_seq <= " << std::to_string(ledgerSequence); + sql << "SELECT key FROM keys WHERE ledger_seq = " << std::to_string(*index); if (cursor) sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'"; - sql << " ORDER BY key DESC, ledger_seq DESC) sub" - << " WHERE object != \'\\x\'" - << " LIMIT " << std::to_string(limit); + sql << " ORDER BY key DESC LIMIT " << std::to_string(limit); BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str(); auto res = pgQuery(sql.str().data()); - if (size_t numRows = checkResult(res, 2)) + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched keys"; + std::optional returnCursor; + if (size_t numRows = checkResult(res, 1)) { - std::vector objects; + std::vector keys; for (size_t i = 0; i < numRows; ++i) { - objects.push_back({res.asUInt256(i, 0), res.asUnHexedBlob(i, 1)}); + keys.push_back({res.asUInt256(i, 0)}); } if (numRows == limit) - return {objects, objects[objects.size() - 1].key}; - else - return {objects, {}}; + returnCursor = keys.back(); + + auto objs = fetchLedgerObjects(keys, ledgerSequence); + std::vector results; + for (size_t i = 0; i < objs.size(); ++i) + { + if (objs[i].size()) + { + results.push_back({keys[i], objs[i]}); + } + } + return {results, returnCursor}; } return {}; } @@ -364,15 +376,12 @@ PostgresBackend::fetchBookOffers( { PgQuery pgQuery(pgPool_); std::stringstream sql; - sql << "SELECT offer_key FROM" - << " (SELECT DISTINCT ON (offer_key) * FROM books WHERE book = " + sql << "SELECT offer_key FROM books WHERE book = " << "\'\\x" << ripple::strHex(book) - << "\' AND ledger_seq <= " << std::to_string(ledgerSequence); + << "\' AND ledger_seq = " << std::to_string(ledgerSequence); if (cursor) sql << " AND offer_key < \'\\x" << ripple::strHex(*cursor) << "\'"; - sql << " ORDER BY offer_key DESC, ledger_seq DESC)" - << " sub WHERE NOT deleted" - << " ORDER BY offer_key DESC " + sql << " ORDER BY offer_key DESC, ledger_seq DESC" << " LIMIT " << std::to_string(limit); BOOST_LOG_TRIVIAL(debug) << sql.str(); auto res = pgQuery(sql.str().data()); @@ -412,34 +421,88 @@ std::vector PostgresBackend::fetchTransactions( std::vector const& hashes) const { - PgQuery pgQuery(pgPool_); - pgQuery("SET statement_timeout TO 10000"); - std::stringstream sql; - sql << "SELECT transaction,metadata,ledger_seq FROM transactions " - "WHERE "; - bool first = true; - for (auto const& hash : hashes) + std::vector results; + constexpr bool doAsync = true; + if (doAsync) { - if (!first) - sql << " OR "; - sql << "HASH = \'\\x" << ripple::strHex(hash) << "\'"; - first = false; - } - auto res = pgQuery(sql.str().data()); - if (size_t numRows = checkResult(res, 3)) - { - std::vector results; - for (size_t i = 0; i < numRows; ++i) + auto start = std::chrono::system_clock::now(); + auto end = std::chrono::system_clock::now(); + auto duration = ((end - start).count()) / 1000000000.0; + results.resize(hashes.size()); + std::condition_variable cv; + std::mutex mtx; + std::atomic_uint numRemaining = hashes.size(); + for (size_t i = 0; i < hashes.size(); ++i) { - results.push_back( - {res.asUnHexedBlob(i, 0), - res.asUnHexedBlob(i, 1), - res.asBigInt(i, 2)}); - } - return results; - } + auto const& hash = hashes[i]; + boost::asio::post( + pool_, [this, &hash, &results, &numRemaining, &cv, &mtx, i]() { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " getting txn = " << i; + PgQuery pgQuery(pgPool_); + std::stringstream sql; + sql << "SELECT transaction,metadata,ledger_seq FROM " + "transactions " + "WHERE HASH = \'\\x" + << ripple::strHex(hash) << "\'"; - return {}; + auto res = pgQuery(sql.str().data()); + if (size_t numRows = checkResult(res, 3)) + { + results[i] = { + res.asUnHexedBlob(0, 0), + res.asUnHexedBlob(0, 1), + res.asBigInt(0, 2)}; + } + if (--numRemaining == 0) + { + std::unique_lock lck(mtx); + cv.notify_one(); + } + }); + } + std::unique_lock lck(mtx); + cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + auto end2 = std::chrono::system_clock::now(); + duration = ((end2 - end).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " fetched " << std::to_string(hashes.size()) + << " transactions with threadpool. took " + << std::to_string(duration); + } + else + { + PgQuery pgQuery(pgPool_); + pgQuery("SET statement_timeout TO 10000"); + std::stringstream sql; + for (size_t i = 0; i < hashes.size(); ++i) + { + auto const& hash = hashes[i]; + sql << "SELECT transaction,metadata,ledger_seq FROM " + "transactions " + "WHERE HASH = \'\\x" + << ripple::strHex(hash) << "\'"; + if (i + 1 < hashes.size()) + sql << " UNION ALL "; + } + auto start = std::chrono::system_clock::now(); + auto res = pgQuery(sql.str().data()); + auto end = std::chrono::system_clock::now(); + auto duration = ((end - start).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " fetched " << std::to_string(hashes.size()) + << " transactions with union all. took " + << std::to_string(duration); + if (size_t numRows = checkResult(res, 3)) + { + for (size_t i = 0; i < numRows; ++i) + results.push_back( + {res.asUnHexedBlob(i, 0), + res.asUnHexedBlob(i, 1), + res.asBigInt(i, 2)}); + } + } + return results; } std::vector @@ -449,40 +512,47 @@ PostgresBackend::fetchLedgerObjects( { PgQuery pgQuery(pgPool_); pgQuery("SET statement_timeout TO 10000"); - std::stringstream sql; - sql << "SELECT DISTINCT ON(key) object FROM objects WHERE"; - - bool first = true; - for (auto const& key : keys) + std::vector results; + results.resize(keys.size()); + std::condition_variable cv; + std::mutex mtx; + std::atomic_uint numRemaining = keys.size(); + auto start = std::chrono::system_clock::now(); + for (size_t i = 0; i < keys.size(); ++i) { - if (!first) - { - sql << " OR "; - } - else - { - sql << " ( "; - first = false; - } - sql << " key = " - << "\'\\x" << ripple::strHex(key) << "\'"; - } - sql << " ) " - << " AND ledger_seq <= " << std::to_string(sequence) - << " ORDER BY key DESC, ledger_seq DESC"; + auto const& key = keys[i]; + boost::asio::post( + pool_, + [this, &key, &results, &numRemaining, &cv, &mtx, i, sequence]() { + PgQuery pgQuery(pgPool_); + std::stringstream sql; + sql << "SELECT object FROM " + "objects " + "WHERE key = \'\\x" + << ripple::strHex(key) << "\'" + << " AND ledger_seq <= " << std::to_string(sequence) + << " ORDER BY ledger_seq DESC LIMIT 1"; - BOOST_LOG_TRIVIAL(trace) << sql.str(); - auto res = pgQuery(sql.str().data()); - if (size_t numRows = checkResult(res, 1)) - { - std::vector results; - for (size_t i = 0; i < numRows; ++i) - { - results.push_back(res.asUnHexedBlob(i, 0)); - } - return results; + auto res = pgQuery(sql.str().data()); + if (size_t numRows = checkResult(res, 1)) + { + results[i] = res.asUnHexedBlob(0, 0); + } + if (--numRemaining == 0) + { + std::unique_lock lck(mtx); + cv.notify_one(); + } + }); } - return {}; + std::unique_lock lck(mtx); + cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + auto end = std::chrono::system_clock::now(); + auto duration = ((end - start).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " fetched " << std::to_string(keys.size()) + << " objects with threadpool. took " << std::to_string(duration); + return results; } std::pair< @@ -495,44 +565,77 @@ PostgresBackend::fetchAccountTransactions( { PgQuery pgQuery(pgPool_); pgQuery("SET statement_timeout TO 10000"); - std::stringstream sql; - sql << "SELECT hash, ledger_seq, transaction_index FROM " - "account_transactions WHERE account = " - << "\'\\x" << ripple::strHex(account) << "\'"; - if (cursor) - sql << " AND (ledger_seq < " << cursor->ledgerSequence - << " OR (ledger_seq = " << cursor->ledgerSequence - << " AND transaction_index < " << cursor->transactionIndex << "))"; - sql << " ORDER BY ledger_seq DESC, transaction_index DESC"; - sql << " LIMIT " << std::to_string(limit); - BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << sql.str(); - auto res = pgQuery(sql.str().data()); - if (size_t numRows = checkResult(res, 3)) - { - std::vector hashes; - for (size_t i = 0; i < numRows; ++i) - { - hashes.push_back(res.asUInt256(i, 0)); - } + pg_params dbParams; - if (numRows == limit) - { - AccountTransactionsCursor retCursor{ - res.asBigInt(numRows - 1, 1), res.asBigInt(numRows - 1, 2)}; - return {fetchTransactions(hashes), {retCursor}}; - } - else - { - return {fetchTransactions(hashes), {}}; - } + char const*& command = dbParams.first; + std::vector>& values = dbParams.second; + command = + "SELECT account_tx($1::bytea, $2::bigint, " + "$3::bigint, $4::bigint)"; + values.resize(4); + values[0] = "\\x" + strHex(account); + + values[1] = std::to_string(limit); + + if (cursor) + { + values[2] = std::to_string(cursor->ledgerSequence); + values[3] = std::to_string(cursor->transactionIndex); } - return {}; -} + for (size_t i = 0; i < values.size(); ++i) + { + BOOST_LOG_TRIVIAL(debug) << "value " << std::to_string(i) << " = " + << (values[i] ? values[i].value() : "null"); + } + + auto start = std::chrono::system_clock::now(); + auto res = pgQuery(dbParams); + auto end = std::chrono::system_clock::now(); + + auto duration = ((end - start).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " : executed stored_procedure in " + << std::to_string(duration) + << " num records = " << std::to_string(checkResult(res, 1)); + checkResult(res, 1); + + char const* resultStr = res.c_str(); + BOOST_LOG_TRIVIAL(debug) << __func__ << " : " + << "postgres result = " << resultStr + << " : account = " << strHex(account); + + boost::json::value raw = boost::json::parse(resultStr); + boost::json::object responseObj = raw.as_object(); + BOOST_LOG_TRIVIAL(debug) << " parsed = " << responseObj; + if (responseObj.contains("transactions")) + { + auto txns = responseObj.at("transactions").as_array(); + std::vector hashes; + for (auto& hashHex : txns) + { + ripple::uint256 hash; + if (hash.parseHex(hashHex.at("hash").as_string().c_str() + 2)) + hashes.push_back(hash); + } + if (responseObj.contains("cursor")) + { + return { + fetchTransactions(hashes), + {{responseObj.at("cursor").at("ledger_sequence").as_int64(), + responseObj.at("cursor") + .at("transaction_index") + .as_int64()}}}; + } + return {fetchTransactions(hashes), {}}; + } + return {{}, {}}; +} // namespace Backend void -PostgresBackend::open() +PostgresBackend::open(bool readOnly) { - initSchema(pgPool_); + if (!readOnly) + initSchema(pgPool_); } void @@ -560,7 +663,6 @@ PostgresBackend::doFinishWrites() const if (!abortWrite_) { writeConnection_.bulkInsert("transactions", transactionsBuffer_.str()); - writeConnection_.bulkInsert("books", booksBuffer_.str()); writeConnection_.bulkInsert( "account_transactions", accountTxBuffer_.str()); std::string objectsStr = objectsBuffer_.str(); @@ -590,7 +692,9 @@ PostgresBackend::writeKeys( std::unordered_set const& keys, uint32_t ledgerSequence) const { + BOOST_LOG_TRIVIAL(debug) << __func__; PgQuery pgQuery(pgPool_); + pgQuery("BEGIN"); std::stringstream keysBuffer; size_t numRows = 0; for (auto& key : keys) @@ -603,7 +707,8 @@ PostgresBackend::writeKeys( if (numRows == 1000000) { pgQuery.bulkInsert("keys", keysBuffer.str()); - keysBuffer.str(""); + std::stringstream temp; + keysBuffer.swap(temp); numRows = 0; } } @@ -611,6 +716,8 @@ PostgresBackend::writeKeys( { pgQuery.bulkInsert("keys", keysBuffer.str()); } + pgQuery("COMMIT"); + return true; } bool PostgresBackend::writeBooks( @@ -619,15 +726,17 @@ PostgresBackend::writeBooks( std::unordered_set> const& books, uint32_t ledgerSequence) const { + BOOST_LOG_TRIVIAL(debug) << __func__; PgQuery pgQuery(pgPool_); + pgQuery("BEGIN"); std::stringstream booksBuffer; size_t numRows = 0; for (auto& book : books) { for (auto& offer : book.second) { - booksBuffer << "\\\\x" << ripple::strHex(book.first) << '\t' - << std::to_string(ledgerSequence) << '\t' << "\\\\x" + booksBuffer << std::to_string(ledgerSequence) << '\t' << "\\\\x" + << ripple::strHex(book.first) << '\t' << "\\\\x" << ripple::strHex(offer) << '\n'; numRows++; // If the buffer gets too large, the insert fails. Not sure why. So @@ -635,7 +744,8 @@ PostgresBackend::writeBooks( if (numRows == 1000000) { pgQuery.bulkInsert("books", booksBuffer.str()); - booksBuffer.str(""); + std::stringstream temp; + booksBuffer.swap(temp); numRows = 0; } } @@ -644,6 +754,8 @@ PostgresBackend::writeBooks( { pgQuery.bulkInsert("books", booksBuffer.str()); } + pgQuery("COMMIT"); + return true; } bool PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const @@ -703,12 +815,13 @@ PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const } else { - // This is rather unelegant. For a deleted object, we don't - // know its type just from the key (or do we?). So, we just - // assume it is an offer and try to delete it. The - // alternative is to read the actual object out of the db - // from before it was deleted. This could result in a lot of - // individual reads though, so we chose to just delete + // This is rather unelegant. For a deleted object, we + // don't know its type just from the key (or do we?). + // So, we just assume it is an offer and try to delete + // it. The alternative is to read the actual object out + // of the db from before it was deleted. This could + // result in a lot of individual reads though, so we + // chose to just delete deleteOffer = true; } if (deleteOffer) diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index ca9ce8c1..30c2df01 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -15,6 +15,8 @@ private: std::shared_ptr pgPool_; mutable PgQuery writeConnection_; mutable bool abortWrite_ = false; + mutable boost::asio::thread_pool pool_{200}; + uint32_t writeInterval_ = 1000000; public: PostgresBackend(boost::json::object const& config); @@ -99,7 +101,7 @@ public: std::vector&& data) const override; void - open() override; + open(bool readOnly) override; void close() override; diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 85847491..2ae2be7b 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -294,7 +294,24 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) std::move(bookDir)); } flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); - bool success = flatMapBackend_->finishWrites(lgrInfo.seq); + accumTxns_ += rawData.transactions_list().transactions_size(); + bool success = true; + if (accumTxns_ > txnThreshold_) + { + auto start = std::chrono::system_clock::now(); + success = flatMapBackend_->finishWrites(lgrInfo.seq); + auto end = std::chrono::system_clock::now(); + + auto duration = ((end - start).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " Accumulated " << std::to_string(accumTxns_) + << " transactions. Wrote in " << std::to_string(duration) + << " transactions per second = " + << std::to_string(accumTxns_ / duration); + accumTxns_ = 0; + } + else + BOOST_LOG_TRIVIAL(info) << __func__ << " skipping commit"; BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Inserted/modified/deleted all objects. Number of objects = " @@ -310,6 +327,8 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) std::optional ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) { + if (startSequence > finishSequence_) + return {}; /* * Behold, mortals! This function spawns three separate threads, which talk * to each other via 2 different thread safe queues and 1 atomic variable. @@ -342,6 +361,11 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) assert(false); throw std::runtime_error("runETLPipeline: parent ledger is null"); } + BOOST_LOG_TRIVIAL(info) << __func__ << " : " + << "Populating caches"; + flatMapBackend_->getIndexer().populateCaches(*flatMapBackend_); + BOOST_LOG_TRIVIAL(info) << __func__ << " : " + << "Populated caches"; std::atomic_bool writeConflict = false; std::optional lastPublishedSequence; @@ -379,7 +403,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) // ETL mechanism should stop. The other stopping condition is if // the entire server is shutting down. This can be detected in a // variety of ways. See the comment at the top of the function - while (networkValidatedLedgers_.waitUntilValidatedByNetwork( + while (currentSequence <= finishSequence_ && + networkValidatedLedgers_.waitUntilValidatedByNetwork( currentSequence) && !writeConflict && !isStopping()) { @@ -416,6 +441,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) transformQueue->push(std::move(fetchResponse)); currentSequence += numExtractors; + if (currentSequence > finishSequence_) + break; } // empty optional tells the transformer to shut down transformQueue->push({}); @@ -497,6 +524,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) << "Extracted and wrote " << *lastPublishedSequence - startSequence << " in " << ((end - begin).count()) / 1000000000.0; writing_ = false; + flatMapBackend_->getIndexer().clearCaches(); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Stopping etl pipeline"; @@ -581,7 +609,6 @@ ReportingETL::monitor() } else { - // publishLedger(ledger); } uint32_t nextSequence = latestSequence.value() + 1; @@ -680,14 +707,18 @@ ReportingETL::ReportingETL( networkValidatedLedgers_, ioc) { - flatMapBackend_->open(); if (config.contains("start_sequence")) startSequence_ = config.at("start_sequence").as_int64(); + if (config.contains("finish_sequence")) + finishSequence_ = config.at("finish_sequence").as_int64(); if (config.contains("read_only")) readOnly_ = config.at("read_only").as_bool(); if (config.contains("online_delete")) onlineDeleteInterval_ = config.at("online_delete").as_int64(); if (config.contains("extractor_threads")) extractorThreads_ = config.at("extractor_threads").as_int64(); + if (config.contains("txn_threshold")) + txnThreshold_ = config.at("txn_threshold").as_int64(); + flatMapBackend_->open(readOnly_); } diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index 3562410b..5c4da180 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -131,6 +131,10 @@ private: /// the next ledger validated by the network. If this is set, and the /// database is already populated, an error is thrown. std::optional startSequence_; + std::optional finishSequence_; + + size_t accumTxns_ = 0; + size_t txnThreshold_ = 0; /// The time that the most recently published ledger was published. Used by /// server_info diff --git a/test.py b/test.py index 82035bad..a71640cf 100755 --- a/test.py +++ b/test.py @@ -62,8 +62,6 @@ def compareAccountTx(aldous, p2p): p2pMetas = [] p2pLedgerSequences = [] for x in p2p["transactions"]: - if int(x["ledger_index"]) > maxLedger: - continue p2pTxns.append(x["tx_blob"]) p2pMetas.append(x["meta"]) p2pLedgerSequences.append(x["ledger_index"]) @@ -71,8 +69,6 @@ def compareAccountTx(aldous, p2p): aldousMetas = [] aldousLedgerSequences = [] for x in aldous["transactions"]: - if int(x["ledger_sequence"]) < minLedger: - continue aldousTxns.append(x["transaction"]) aldousMetas.append(x["metadata"]) aldousLedgerSequences.append(x["ledger_sequence"]) @@ -95,6 +91,47 @@ def compareAccountTx(aldous, p2p): print(len(p2p["transactions"])) print(maxLedger) +def getAccounts(filename): + accounts = [] + with open(filename) as f: + for line in f: + if line[0] == "{": + jv = json.loads(line) + accounts.append(jv["Account"]) + if len(line) == 35: + accounts.append(line[0:34]) + if len(line) == 44: + accounts.append(line[3:43]) + if len(line) == 65: + accounts.append(line[0:64]) + if len(line) == 41 or len(line) == 40: + accounts.append(line[0:40]) + elif len(line) == 43: + accounts.append(line[2:42]) + return accounts +def getAccountsAndCursors(filename): + accounts = [] + cursors = [] + with open(filename) as f: + for line in f: + if len(line) == 0: + continue + space = line.find(" ") + cursor = line[space+1:len(line)-1] + if cursor == "None": + cursors.append(None) + else: + cursors.append(json.loads(cursor)) + accounts.append(line[0:space]) + + return (accounts,cursors) +def getBooks(filename): + books = [] + with open(filename) as f: + for line in f: + if len(line) == 68: + books.append(line[3:67]) + return books def compareLedgerData(aldous, p2p): aldous[0].sort() aldous[1].sort() @@ -118,6 +155,24 @@ def compareLedgerData(aldous, p2p): +async def account_infos(ip, port, accounts, numCalls): + + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + try: + async with websockets.connect(address,max_size=1000000000) as ws: + print(len(accounts)) + for x in range(0,numCalls): + account = accounts[random.randrange(0,len(accounts))] + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"account_info","account":account,"binary":True})) + res = json.loads(await ws.recv()) + end = datetime.datetime.now().timestamp() + if (end - start) > 0.1: + print("request took more than 100ms") + + except websockets.exceptions.connectionclosederror as e: + print(e) async def account_info(ip, port, account, ledger, binary): @@ -141,7 +196,6 @@ def getMinAndMax(res): minSeq = None maxSeq = None for x in res["transactions"]: - print(x) seq = None if "ledger_sequence" in x: seq = int(x["ledger_sequence"]) @@ -165,11 +219,90 @@ async def account_tx(ip, port, account, binary, minLedger=None, maxLedger=None): await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200,"ledger_index_min":minLedger, "ledger_index_max":maxLedger})) res = json.loads(await ws.recv()) - print(json.dumps(res,indent=4,sort_keys=True)) + #print(json.dumps(res,indent=4,sort_keys=True)) return res except websockets.exceptions.ConnectionClosedError as e: print(e) +async def account_txs_full(ip, port, accounts, cursors, numCalls, limit): + + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + try: + async with websockets.connect(address,max_size=1000000000) as ws: + print(len(accounts)) + cursor = None + account = None + time = 0.0 + for x in range(0,numCalls): + + idx = random.randrange(0,len(accounts)) + account = accounts[idx] + cursor = cursors[idx] + start = datetime.datetime.now().timestamp() + if cursor is None: + await ws.send(json.dumps({"command":"account_tx","account":account,"binary":True,"limit":limit})) + else: + marker = {} + marker["ledger"] = cursor["ledger_sequence"] + marker["seq"] = cursor["transaction_index"] + await ws.send(json.dumps({"command":"account_tx","account":account,"cursor":cursor,"marker":marker,"binary":True,"limit":limit,"forward":False})) + + res = json.loads(await ws.recv()) + end = datetime.datetime.now().timestamp() + print(end-start) + time += (end - start) + txns = [] + if "result" in res: + txns = res["result"]["transactions"] + else: + txns = res["transactions"] + print(len(txns)) + print(account + " " + json.dumps(cursor)) + if (end - start) > 0.1: + print("request took more than 100ms") + print("Latency = " + str(time / numCalls)) + + except websockets.exceptions.connectionclosederror as e: + print(e) +async def account_txs(ip, port, accounts, numCalls): + + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + try: + async with websockets.connect(address,max_size=1000000000) as ws: + print(len(accounts)) + cursor = None + account = None + for x in range(0,numCalls): + + if cursor is None: + account = accounts[random.randrange(0,len(accounts))] + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"account_tx","account":account,"binary":True,"limit":200})) + else: + await ws.send(json.dumps({"command":"account_tx","account":account,"cursor":cursor,"binary":True,"limit":200})) + + + res = json.loads(await ws.recv()) + if "cursor" in res: + if cursor: + print(account + " " + json.dumps(cursor)) + else: + print(account + " " + "None") + #cursor = res["cursor"] + elif cursor: + print(account + " " + json.dumps(cursor)) + cursor = None + + + end = datetime.datetime.now().timestamp() + if (end - start) > 0.1: + print("request took more than 100ms") + + except websockets.exceptions.connectionclosederror as e: + print(e) + async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None): address = 'ws://' + str(ip) + ':' + str(port) try: @@ -188,8 +321,14 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No if minLedger is not None and maxLedger is not None: req["ledger_index_min"] = minLedger req["ledger_index_max"] = maxLedger + start = datetime.datetime.now().timestamp() await ws.send(json.dumps(req)) - res = json.loads(await ws.recv()) + res = await ws.recv() + + end = datetime.datetime.now().timestamp() + + print(end - start) + res = json.loads(res) #print(json.dumps(res,indent=4,sort_keys=True)) if "result" in res: print(len(res["result"]["transactions"])) @@ -206,7 +345,11 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]} print(marker) else: + print(res) break + if numCalls > numPages: + print("breaking") + break return results except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -221,6 +364,20 @@ async def tx(ip, port, tx_hash, binary): return res except websockets.exceptions.connectionclosederror as e: print(e) +async def txs(ip, port, hashes, numCalls): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + for x in range(0,numCalls): + h = hashes[random.randrange(0,len(hashes))] + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"tx","transaction":h,"binary":True})) + res = json.loads(await ws.recv()) + end = datetime.datetime.now().timestamp() + if (end - start) > 0.1: + print("request took more than 100ms") + except websockets.exceptions.connectionclosederror as e: + print(e) async def ledger_entry(ip, port, index, ledger, binary): address = 'ws://' + str(ip) + ':' + str(port) @@ -229,17 +386,59 @@ async def ledger_entry(ip, port, index, ledger, binary): await ws.send(json.dumps({"command":"ledger_entry","index":index,"binary":bool(binary),"ledger_index":int(ledger)})) res = json.loads(await ws.recv()) print(json.dumps(res,indent=4,sort_keys=True)) + if "result" in res: + res = res["result"] + if "object" in res: + return (index,res["object"]) + else: + return (index,res["node_binary"]) + except websockets.exceptions.connectionclosederror as e: + print(e) +async def ledger_entries(ip, port, ledger, keys, numCalls): + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + try: + async with websockets.connect(address) as ws: + print(len(keys)) + for x in range(0,numCalls): + index = keys[random.randrange(0,len(keys))] + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"ledger_entry","index":index,"binary":True,"ledger_index":int(ledger)})) + res = json.loads(await ws.recv()) + end = datetime.datetime.now().timestamp() + if (end - start) > 0.1: + print("request took more than 100ms") + except websockets.exceptions.connectionclosederror as e: print(e) +async def ledger_entries(ip, port,ledger): + address = 'ws://' + str(ip) + ':' + str(port) + entries = await ledger_data(ip, port, ledger, 200, True) + + try: + async with websockets.connect(address) as ws: + objects = [] + for x,y in zip(entries[0],entries[1]): + await ws.send(json.dumps({"command":"ledger_entry","index":x,"binary":True,"ledger_index":int(ledger)})) + res = json.loads(await ws.recv()) + objects.append((x,res["object"])) + if res["object"] != y: + print("data mismatch") + return None + print("Data matches!") + return objects + + except websockets.exceptions.connectionclosederror as e: + print(e) async def ledger_data(ip, port, ledger, limit, binary, cursor): address = 'ws://' + str(ip) + ':' + str(port) try: async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit),"cursor":cursor})) await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"cursor":cursor})) res = json.loads(await ws.recv()) - print(json.dumps(res,indent=4,sort_keys=True)) objects = [] blobs = [] keys = [] @@ -251,6 +450,8 @@ async def ledger_data(ip, port, ledger, limit, binary, cursor): for x in objects: blobs.append(x["data"]) keys.append(x["index"]) + if len(x["index"]) != 64: + print("bad key") return (keys,blobs) except websockets.exceptions.connectionclosederror as e: @@ -268,12 +469,12 @@ def writeLedgerData(data,filename): f.write('\n') -async def ledger_data_full(ip, port, ledger, binary, limit): +async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1): address = 'ws://' + str(ip) + ':' + str(port) try: blobs = [] keys = [] - async with websockets.connect(address) as ws: + async with websockets.connect(address,max_size=1000000000) as ws: if int(limit) < 2048: limit = 2048 marker = None @@ -287,6 +488,7 @@ async def ledger_data_full(ip, port, ledger, binary, limit): await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"cursor":marker, "marker":marker,"binary":bool(binary), "limit":int(limit)})) res = json.loads(await ws.recv()) + if "error" in res: print(res) @@ -298,11 +500,23 @@ async def ledger_data_full(ip, port, ledger, binary, limit): else: objects = res["objects"] for x in objects: - blobs.append(x["data"]) - keys.append(x["index"]) + if binary: + if typ is None or x["data"][2:6] == typ: + print(json.dumps(x)) + blobs.append(x["data"]) + keys.append(x["index"]) + else: + if typ is None or x["LedgerEntryType"] == typ: + print(json.dumps(x)) + blobs.append(x) + keys.append(x["index"]) + if limit != -1 and len(keys) > count: + print("stopping early") + print(len(keys)) + print("done") + return (keys,blobs) if "cursor" in res: marker = res["cursor"] - print(marker) elif "result" in res and "marker" in res["result"]: marker = res["result"]["marker"] print(marker) @@ -345,8 +559,27 @@ def compare_book_offers(aldous, p2p): return True +async def book_offerses(ip, port, ledger, books, numCalls): + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + try: + async with websockets.connect(address,max_size=1000000000) as ws: + print(len(books)) + for x in range(0,numCalls): + book = books[random.randrange(0,len(books))] + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"book_offers","book":book,"binary":True})) + res = json.loads(await ws.recv()) + end = datetime.datetime.now().timestamp() + print(book) + print(len(res["offers"])) + if (end - start) > 0.1: + print("request took more than 100ms") -async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, get_issuer, binary): + except websockets.exceptions.connectionclosederror as e: + print(e) + +async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, get_issuer, binary, limit): address = 'ws://' + str(ip) + ':' + str(port) try: @@ -360,7 +593,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, taker_pays = json.loads("{\"currency\":\"" + pay_currency + "\"}") if pay_issuer is not None: taker_pays["issuer"] = pay_issuer - req = {"command":"book_offers","ledger_index":int(ledger), "taker_pays":taker_pays, "taker_gets":taker_gets, "binary":bool(binary)} + req = {"command":"book_offers","ledger_index":int(ledger), "taker_pays":taker_pays, "taker_gets":taker_gets, "binary":bool(binary), "limit":int(limit)} if cursor is not None: req["cursor"] = cursor await ws.send(json.dumps(req)) @@ -372,6 +605,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, offers.append(x) if "cursor" in res: cursor = res["cursor"] + print(cursor) else: print(len(offers)) return offers @@ -415,9 +649,16 @@ def compareLedger(aldous, p2p): print(aldous) print(p2p) +def getHashesFromFile(filename): + hashes = [] + with open(filename) as f: + for line in f: + if len(line) == 65: + hashes.append(line[0:64]) + return hashes + def getHashes(res): - print(json.dumps(res,indent=4,sort_keys=True)) if "result" in res: res = res["result"]["ledger"] @@ -431,16 +672,64 @@ def getHashes(res): hashes.append(x) return hashes +import random +import datetime +numCalls = 0 +async def ledgers(ip, port, minLedger, maxLedger, transactions, expand, maxCalls): + global numCalls + address = 'ws://' + str(ip) + ':' + str(port) + random.seed() + ledger = 0 + try: + async with websockets.connect(address,max_size=1000000000) as ws: + global numCalls + for i in range(0, maxCalls): + + + ledger = random.randrange(minLedger,maxLedger) + start = datetime.datetime.now().timestamp() + await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":True, "transactions":bool(transactions),"expand":bool(expand)})) + res = json.loads(await ws.recv()) + #print(res["header"]["blob"]) + end = datetime.datetime.now().timestamp() + if (end - start) > 0.1: + print("request took more than 100ms") + numCalls = numCalls + 1 + + except websockets.exceptions.ConnectionClosedError as e: + print(e) + print(ledger) + +async def getManyHashes(ip, port, minLedger,maxLedger): + + hashes = [] + for x in range(minLedger,maxLedger): + res = await ledger(ip, port, x,True, True, False) + hashes.extend(getHashes(res)) + print(len(hashes)) + return hashes +async def getManyHashes(ip, port, minLedger,maxLedger, numHashes): + + random.seed() + hashes = [] + while len(hashes) < numHashes: + + lgr = random.randrange(minLedger,maxLedger) + res = await ledger(ip, port, lgr,True, True, False) + hashes.extend(getHashes(res)) + print(len(hashes)) + return hashes + + async def ledger(ip, port, ledger, binary, transactions, expand): address = 'ws://' + str(ip) + ':' + str(port) try: - async with websockets.connect(address) as ws: + async with websockets.connect(address,max_size=1000000000) as ws: await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":bool(binary), "transactions":bool(transactions),"expand":bool(expand)})) res = json.loads(await ws.recv()) - print(json.dumps(res,indent=4,sort_keys=True)) - print(bool(binary)) + #print(json.dumps(res,indent=4,sort_keys=True)) return res except websockets.exceptions.connectionclosederror as e: @@ -456,16 +745,48 @@ async def ledger_range(ip, port): if "error" in res: await ws.send(json.dumps({"command":"server_info"})) res = json.loads(await ws.recv()) + print(res) rng = res["result"]["info"]["complete_ledgers"] + if rng == "empty": + return (0,0) idx = rng.find("-") return (int(rng[0:idx]),int(rng[idx+1:-1])) return (res["ledger_index_min"],res["ledger_index_max"]) except websockets.exceptions.connectionclosederror as e: print(e) +async def fee(ip, port): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"fee"})) + res = json.loads(await ws.recv()) + print(json.dumps(res,indent=4,sort_keys=True)) + except websockets.exceptions.connectionclosederror as e: + print(e) + +async def ledger_diff(ip, port, base, desired, includeBlobs): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"ledger_diff","base_ledger":int(base),"desired_ledger":int(desired),"include_blobs":bool(includeBlobs)})) + res = json.loads(await ws.recv()) + print(json.dumps(res,indent=4,sort_keys=True)) + except websockets.exceptions.connectionclosederror as e: + print(e) + + +async def perf(ip, port): + res = await ledger_range(ip,port) + time.sleep(10) + res2 = await ledger_range(ip,port) + lps = ((int(res2[1]) - int(res[1])) / 10.0) + print(lps) + parser = argparse.ArgumentParser(description='test script for xrpl-reporting') -parser.add_argument('action', choices=["account_info", "tx", "account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry"]) +parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee"]) + parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--port', default='8080') parser.add_argument('--hash') @@ -486,7 +807,15 @@ parser.add_argument('--minLedger',default=-1) parser.add_argument('--maxLedger',default=-1) parser.add_argument('--filename',default=None) parser.add_argument('--index') +parser.add_argument('--numPages',default=3) +parser.add_argument('--base') +parser.add_argument('--desired') +parser.add_argument('--includeBlobs',default=False) +parser.add_argument('--type',default=None) parser.add_argument('--cursor',default='0000000000000000000000000000000000000000000000000000000000000000') +parser.add_argument('--numCalls',default=10000) +parser.add_argument('--numRunners',default=1) +parser.add_argument('--count',default=-1) @@ -497,16 +826,154 @@ def run(args): asyncio.set_event_loop(asyncio.new_event_loop()) if(args.ledger is None): args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1] - if args.action == "account_info": + if args.action == "fee": + asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port)) + elif args.action == "perf": + asyncio.get_event_loop().run_until_complete( + perf(args.ip,args.port)) + elif args.action == "account_info": res1 = asyncio.get_event_loop().run_until_complete( account_info(args.ip, args.port, args.account, args.ledger, args.binary)) if args.verify: res2 = asyncio.get_event_loop().run_until_complete( account_info(args.p2pIp, args.p2pPort, args.account, args.ledger, args.binary)) print(compareAccountInfo(res1,res2)) + elif args.action == "txs": + #hashes = asyncio.get_event_loop().run_until_complete(getManyHashes(args.ip,args.port, int(args.minLedger),int(args.maxLedger), int(args.numCalls))) + #for x in hashes: + # print(x) + #return + hashes = getHashesFromFile(args.filename) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(txs(args.ip, args.port, hashes,int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + elif args.action == "ledgers": + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(ledgers(args.ip, args.port, int(args.minLedger), int(args.maxLedger), args.transactions, args.expand, int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + elif args.action == "ledger_entries": + keys = [] + ledger_index = 0 + with open(args.filename) as f: + i = 0 + for line in f: + if ledger_index == 0: + ledger_index = int(line) + elif len(line) == 65: + keys.append(line[0:64]) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(ledger_entries(args.ip, args.port, ledger_index,keys, int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + elif args.action == "account_txs": + accounts = getAccounts(args.filename) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(account_txs(args.ip, args.port,accounts, int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + elif args.action == "account_txs_full": + accounts,cursors = getAccountsAndCursors(args.filename) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(account_txs_full(args.ip, args.port,accounts,cursors,int(args.numCalls), int(args.limit)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + print("Latency = " + str((end - start) / int(args.numCalls)) + " seconds") + elif args.action == "account_infos": + accounts = getAccounts(args.filename) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(account_infos(args.ip, args.port,accounts, int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + + elif args.action == "book_offerses": + books = getBooks(args.filename) + async def runner(): + + tasks = [] + for x in range(0,int(args.numRunners)): + tasks.append(asyncio.create_task(book_offerses(args.ip, args.port,int(args.ledger),books, int(args.numCalls)))) + for t in tasks: + await t + + start = datetime.datetime.now().timestamp() + asyncio.run(runner()) + end = datetime.datetime.now().timestamp() + num = int(args.numRunners) * int(args.numCalls) + print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") elif args.action == "ledger_entry": asyncio.get_event_loop().run_until_complete( ledger_entry(args.ip, args.port, args.index, args.ledger, args.binary)) + elif args.action == "ledger_entries": + res = asyncio.get_event_loop().run_until_complete( + ledger_entries(args.ip, args.port, args.ledger)) + if args.verify: + objects = [] + for x in res: + res2 = asyncio.get_event_loop().run_until_complete( + ledger_entry(args.p2pIp, args.p2pPort,x[0] , args.ledger, True)) + if res2[1] != x[1]: + print("mismatch!") + return + print("Data matches!") + elif args.action == "ledger_diff": + asyncio.get_event_loop().run_until_complete( + ledger_diff(args.ip, args.port, args.base, args.desired, args.includeBlobs)) elif args.action == "tx": if args.verify: args.binary = True @@ -527,9 +994,11 @@ def run(args): res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False)) args.account = res["transaction"]["Account"] - rng = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)) res = asyncio.get_event_loop().run_until_complete( account_tx(args.ip, args.port, args.account, args.binary)) + rng = getMinAndMax(res) + + if args.verify: res2 = asyncio.get_event_loop().run_until_complete( @@ -543,13 +1012,21 @@ def run(args): res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False)) args.account = res["transaction"]["Account"] - rng = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)) + print("starting") res = asyncio.get_event_loop().run_until_complete( - account_tx_full(args.ip, args.port, args.account, args.binary)) + account_tx_full(args.ip, args.port, args.account, args.binary,None,None,int(args.numPages))) + rng = getMinAndMax(res) print(len(res["transactions"])) + print(args.account) + txs = set() + for x in res["transactions"]: + txs.add((x["transaction"],x["ledger_sequence"])) + print(len(txs)) + if args.verify: + print("requesting p2p node") res2 = asyncio.get_event_loop().run_until_complete( - account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary, rng[0],rng[1])) + account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary, rng[0],rng[1],int(args.numPages))) print(compareAccountTx(res,res2)) elif args.action == "ledger_data": @@ -565,7 +1042,8 @@ def run(args): args.filename = str(args.port) + "." + str(args.ledger) res = asyncio.get_event_loop().run_until_complete( - ledger_data_full(args.ip, args.port, args.ledger, args.binary, args.limit)) + ledger_data_full(args.ip, args.port, args.ledger, bool(args.binary), args.limit,args.type, int(args.count))) + print(len(res[0])) if args.verify: writeLedgerData(res,args.filename) @@ -589,10 +1067,10 @@ def run(args): if args.verify: args.binary=True res = asyncio.get_event_loop().run_until_complete( - book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary)) + book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary,args.limit)) if args.verify: res2 = asyncio.get_event_loop().run_until_complete( - book_offers(args.p2pIp, args.p2pPort, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary)) + book_offers(args.p2pIp, args.p2pPort, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary, args.limit)) print(compare_book_offers(res,res2)) else: diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp index 3c196ff8..1b4d65e3 100644 --- a/websocket_server_async.cpp +++ b/websocket_server_async.cpp @@ -224,7 +224,10 @@ public: BOOST_LOG_TRIVIAL(debug) << " received request : " << request; try { + auto start = std::chrono::system_clock::now(); response = buildResponse(request, backend_); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) << __func__ << " RPC call took " << ((end - start).count() / 1000000000.0) << " . request = " << request; } catch (Backend::DatabaseTimeout const& t) {