From 7a6dfe5967165fc800a7d0faecb3ccd5c8303adc Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 5 Mar 2021 11:25:12 -0500 Subject: [PATCH] postgres fixes --- handlers/AccountInfo.cpp | 23 ++----- handlers/BookOffers.cpp | 9 ++- handlers/Ledger.cpp | 11 +++- handlers/LedgerData.cpp | 11 +++- handlers/RPCHelpers.cpp | 14 ++++ handlers/RPCHelpers.h | 5 ++ reporting/CassandraBackend.cpp | 8 +-- reporting/DBHelpers.cpp | 3 +- reporting/DBHelpers.h | 18 +----- reporting/Pg.h | 23 ++++++- reporting/PostgresBackend.cpp | 115 +++++++++++---------------------- reporting/ReportingETL.cpp | 7 +- test.py | 26 ++++++-- websocket_server_async.cpp | 5 +- 14 files changed, 139 insertions(+), 139 deletions(-) diff --git a/handlers/AccountInfo.cpp b/handlers/AccountInfo.cpp index 6a842d0b..6176004a 100644 --- a/handlers/AccountInfo.cpp +++ b/handlers/AccountInfo.cpp @@ -56,24 +56,11 @@ doAccountInfo( response["error"] = "missing account field"; return response; } - size_t ledgerSequence = 0; - if (not request.contains("ledger_index")) + auto ledgerSequence = ledgerSequenceFromRequest(request, backend); + if (!ledgerSequence) { - auto latest = backend.fetchLatestLedgerSequence(); - - if (not latest) - { - response["error"] = "database is empty"; - return response; - } - else - { - ledgerSequence = *latest; - } - } - else - { - ledgerSequence = request.at("ledger_index").as_int64(); + response["error"] = "Empty database"; + return response; } // bool bStrict = request.contains("strict") && @@ -92,7 +79,7 @@ doAccountInfo( auto start = std::chrono::system_clock::now(); std::optional> dbResponse = - backend.fetchLedgerObject(key.key, ledgerSequence); + backend.fetchLedgerObject(key.key, *ledgerSequence); auto end = std::chrono::system_clock::now(); auto time = std::chrono::duration_cast(end - start) diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index 5e629d06..a4367cac 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -94,8 +94,13 @@ doBookOffers( { std::cout << "enter" << std::endl; boost::json::object response; - uint32_t sequence = request.at("ledger_index").as_int64(); + auto ledgerSequence = ledgerSequenceFromRequest(request, backend); + if (!ledgerSequence) + { + response["error"] = "Empty database"; + return response; + } if (!request.contains("taker_pays")) { response["error"] = "Missing field taker_pays"; @@ -306,7 +311,7 @@ doBookOffers( ripple::uint256 bookBase = getBookBase(book); auto start = std::chrono::system_clock::now(); auto [offers, retCursor] = - backend.fetchBookOffers(bookBase, sequence, limit, cursor); + backend.fetchBookOffers(bookBase, *ledgerSequence, limit, cursor); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(warning) << "Time loading books from Postgres: " diff --git a/handlers/Ledger.cpp b/handlers/Ledger.cpp index d0baedad..b5ec7cec 100644 --- a/handlers/Ledger.cpp +++ b/handlers/Ledger.cpp @@ -10,9 +10,14 @@ doLedger(boost::json::object const& request, BackendInterface const& backend) response["error"] = "Please specify a ledger index"; return response; } - uint32_t ledgerSequence = request.at("ledger_index").as_int64(); + auto ledgerSequence = ledgerSequenceFromRequest(request, backend); + if (!ledgerSequence) + { + response["error"] = "Empty database"; + return response; + } - auto lgrInfo = backend.fetchLedgerBySequence(ledgerSequence); + auto lgrInfo = backend.fetchLedgerBySequence(*ledgerSequence); if (!lgrInfo) { response["error"] = "ledger not found"; @@ -32,7 +37,7 @@ doLedger(boost::json::object const& request, BackendInterface const& backend) lgrInfo->parentCloseTime.time_since_epoch().count(); header["close_time"] = lgrInfo->closeTime.time_since_epoch().count(); header["close_time_resolution"] = lgrInfo->closeTimeResolution.count(); - auto txns = backend.fetchAllTransactionsInLedger(ledgerSequence); + auto txns = backend.fetchAllTransactionsInLedger(*ledgerSequence); response["transactions"] = boost::json::value(boost::json::array_kind); boost::json::array& jsonTransactions = response.at("transactions").as_array(); diff --git a/handlers/LedgerData.cpp b/handlers/LedgerData.cpp index 66bd8e12..93d3aac8 100644 --- a/handlers/LedgerData.cpp +++ b/handlers/LedgerData.cpp @@ -41,7 +41,12 @@ doLedgerData( BackendInterface const& backend) { boost::json::object response; - uint32_t ledger = request.at("ledger_index").as_int64(); + auto ledgerSequence = ledgerSequenceFromRequest(request, backend); + if (!ledgerSequence) + { + response["error"] = "Empty database"; + return response; + } ripple::uint256 cursor; if (request.contains("cursor")) @@ -54,7 +59,7 @@ doLedgerData( : (binary ? 2048 : 256); Backend::LedgerPage page; auto start = std::chrono::system_clock::now(); - page = backend.fetchLedgerPage(cursor, ledger, limit); + page = backend.fetchLedgerPage(cursor, *ledgerSequence, limit); auto end = std::chrono::system_clock::now(); @@ -82,7 +87,7 @@ doLedgerData( } response["objects"] = objects; if (returnedCursor) - response["marker"] = ripple::strHex(*returnedCursor); + response["cursor"] = ripple::strHex(*returnedCursor); response["num_results"] = results.size(); response["db_time"] = time; diff --git a/handlers/RPCHelpers.cpp b/handlers/RPCHelpers.cpp index a8afcec1..77e0f997 100644 --- a/handlers/RPCHelpers.cpp +++ b/handlers/RPCHelpers.cpp @@ -66,3 +66,17 @@ getJson(ripple::SLE const& sle) .count(); return value.as_object(); } +std::optional +ledgerSequenceFromRequest( + boost::json::object const& request, + BackendInterface const& backend) +{ + if (not request.contains("ledger_index")) + { + return backend.fetchLatestLedgerSequence(); + } + else + { + return request.at("ledger_index").as_int64(); + } +} diff --git a/handlers/RPCHelpers.h b/handlers/RPCHelpers.h index cc141df1..f07bf3ef 100644 --- a/handlers/RPCHelpers.h +++ b/handlers/RPCHelpers.h @@ -20,4 +20,9 @@ getJson(ripple::STBase const& obj); boost::json::object getJson(ripple::SLE const& sle); +std::optional +ledgerSequenceFromRequest( + boost::json::object const& request, + BackendInterface const& backend); + #endif diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index fceb51fb..993f2a30 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -731,14 +731,14 @@ CassandraBackend::open() setupPreparedStatements = true; } - work_.emplace(ioContext_); - ioThread_ = std::thread{[this]() { ioContext_.run(); }}; - open_ = true; - if (config_.contains("max_requests_outstanding")) { maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64(); } + work_.emplace(ioContext_); + ioThread_ = std::thread{[this]() { ioContext_.run(); }}; + open_ = true; + BOOST_LOG_TRIVIAL(info) << "Opened database successfully"; } } // namespace Backend diff --git a/reporting/DBHelpers.cpp b/reporting/DBHelpers.cpp index 761b0974..9cf82030 100644 --- a/reporting/DBHelpers.cpp +++ b/reporting/DBHelpers.cpp @@ -83,6 +83,7 @@ writeBooks(std::vector const& bookDirData, PgQuery& pg) } */ +/* bool writeToPostgres( ripple::LedgerInfo const& info, @@ -166,4 +167,4 @@ writeToPostgres( return false; } } - +*/ diff --git a/reporting/DBHelpers.h b/reporting/DBHelpers.h index 9344fe56..1a66553d 100644 --- a/reporting/DBHelpers.h +++ b/reporting/DBHelpers.h @@ -33,17 +33,15 @@ struct AccountTransactionsData uint32_t ledgerSequence; uint32_t transactionIndex; ripple::uint256 txHash; - ripple::uint256 nodestoreHash; AccountTransactionsData( ripple::TxMeta& meta, - ripple::uint256&& nodestoreHash, + ripple::uint256 const& txHash, beast::Journal& j) : accounts(meta.getAffectedAccounts(j)) , ledgerSequence(meta.getLgrSeq()) , transactionIndex(meta.getIndex()) - , txHash(meta.getTxID()) - , nodestoreHash(std::move(nodestoreHash)) + , txHash(txHash) { } }; @@ -68,18 +66,6 @@ getBook(std::string const& offer) return book; } -/// Write new ledger and transaction data to Postgres -/// @param info Ledger Info to write -/// @param accountTxData transaction data to write -/// @param pgPool pool of Postgres connections -/// @param j journal (for logging) -/// @return whether the write succeeded -bool -writeToPostgres( - ripple::LedgerInfo const& info, - std::vector const& accountTxData, - std::shared_ptr const& pgPool); - inline ripple::LedgerInfo deserializeHeader(ripple::Slice data) { diff --git a/reporting/Pg.h b/reporting/Pg.h index f7d976b7..bdc3fc2a 100644 --- a/reporting/Pg.h +++ b/reporting/Pg.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_CORE_PG_H_INCLUDED #define RIPPLE_CORE_PG_H_INCLUDED +#include #include #include #include @@ -35,6 +36,7 @@ #include #include #include +#include #include #include @@ -135,7 +137,26 @@ public: char const* c_str(int ntuple = 0, int nfield = 0) const { - return PQgetvalue(result_.get(), ntuple, nfield); + return PQgetvalue(result_.get(), ntuple, nfield) + 2; + } + + std::vector + asUnHexedBlob(int ntuple = 0, int nfield = 0) const + { + std::string_view view{c_str(ntuple, nfield)}; + auto res = ripple::strUnHex(view.size(), view.cbegin(), view.cend()); + if (res) + return *res; + return {}; + } + + ripple::uint256 + asUInt256(int ntuple = 0, int nfield = 0) const + { + ripple::uint256 val; + if (!val.parseHex(c_str(ntuple, nfield))) + throw std::runtime_error("Pg - failed to parse hex into uint256"); + return val; } /** Return field as equivalent to Postgres' INT type (32 bit signed). diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 3e811fb9..1f5a86e6 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -26,9 +26,9 @@ PostgresBackend::writeLedger( ledgerInfo.closeTimeResolution.count() % ledgerInfo.closeFlags % ripple::strHex(ledgerInfo.accountHash) % ripple::strHex(ledgerInfo.txHash)); - BOOST_LOG_TRIVIAL(trace) << __func__ << " : " - << " : " - << "query string = " << ledgerInsert; + BOOST_LOG_TRIVIAL(info) << __func__ << " : " + << " : " + << "query string = " << ledgerInsert; auto res = pgQuery(ledgerInsert.data()); @@ -44,17 +44,14 @@ PostgresBackend::writeAccountTransactions( PgQuery pg(pgPool_); for (auto const& record : data) { - std::string txHash = ripple::strHex(record.txHash); - auto idx = record.transactionIndex; - auto ledgerSeq = record.ledgerSequence; - for (auto const& a : record.accounts) { std::string acct = ripple::strHex(a); accountTxBuffer_ << "\\\\x" << acct << '\t' - << std::to_string(ledgerSeq) << '\t' - << std::to_string(idx) << '\t' << "\\\\x" - << ripple::strHex(txHash) << '\n'; + << std::to_string(record.ledgerSequence) << '\t' + << std::to_string(record.transactionIndex) << '\t' + << "\\\\x" << ripple::strHex(record.txHash) + << '\n'; } } } @@ -149,35 +146,30 @@ ripple::LedgerInfo parseLedgerInfo(PgResult const& res) { std::int64_t ledgerSeq = res.asBigInt(0, 0); - char const* hash = res.c_str(0, 1); - char const* prevHash = res.c_str(0, 2); + ripple::uint256 hash = res.asUInt256(0, 1); + ripple::uint256 prevHash = res.asUInt256(0, 2); std::int64_t totalCoins = res.asBigInt(0, 3); std::int64_t closeTime = res.asBigInt(0, 4); std::int64_t parentCloseTime = res.asBigInt(0, 5); std::int64_t closeTimeRes = res.asBigInt(0, 6); std::int64_t closeFlags = res.asBigInt(0, 7); - char const* accountHash = res.c_str(0, 8); - char const* txHash = res.c_str(0, 9); + ripple::uint256 accountHash = res.asUInt256(0, 8); + ripple::uint256 txHash = res.asUInt256(0, 9); using time_point = ripple::NetClock::time_point; using duration = ripple::NetClock::duration; ripple::LedgerInfo info; - if (!info.parentHash.parseHex(prevHash + 2)) - throw std::runtime_error("parseLedgerInfo - error parsing parent hash"); - if (!info.txHash.parseHex(txHash + 2)) - throw std::runtime_error("parseLedgerInfo - error parsing tx map hash"); - if (!info.accountHash.parseHex(accountHash + 2)) - throw std::runtime_error( - "parseLedgerInfo - error parsing state map hash"); + info.seq = ledgerSeq; + info.hash = hash; + info.parentHash = prevHash; info.drops = totalCoins; info.closeTime = time_point{duration{closeTime}}; info.parentCloseTime = time_point{duration{parentCloseTime}}; info.closeFlags = closeFlags; info.closeTimeResolution = duration{closeTimeRes}; - info.seq = ledgerSeq; - if (!info.hash.parseHex(hash + 2)) - throw std::runtime_error("parseLedgerInfo - error parsing ledger hash"); + info.accountHash = accountHash; + info.txHash = txHash; info.validated = true; return info; } @@ -253,10 +245,7 @@ PostgresBackend::fetchLedgerObject( auto res = pgQuery(sql.str().data()); if (checkResult(res, 1)) { - char const* object = res.c_str(0, 0); - std::string_view view{object}; - std::vector blob{view.front(), view.back()}; - return blob; + return res.asUnHexedBlob(0, 0); } return {}; @@ -274,13 +263,7 @@ PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const auto res = pgQuery(sql.str().data()); if (checkResult(res, 3)) { - char const* txn = res.c_str(0, 0); - char const* metadata = res.c_str(0, 1); - std::string_view txnView{txn}; - std::string_view metadataView{metadata}; - return { - {{txnView.front(), txnView.back()}, - {metadataView.front(), metadataView.back()}}}; + return {{res.asUnHexedBlob(0, 0), res.asUnHexedBlob(0, 1)}}; } return {}; @@ -298,13 +281,7 @@ PostgresBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const std::vector txns; for (size_t i = 0; i < numRows; ++i) { - char const* txn = res.c_str(0, 0); - char const* metadata = res.c_str(0, 1); - std::string_view txnView{txn}; - std::string_view metadataView{metadata}; - txns.push_back( - {{txnView.front(), txnView.back()}, - {metadataView.front(), metadataView.back()}}); + txns.push_back({res.asUnHexedBlob(i, 0), res.asUnHexedBlob(i, 1)}); } return txns; } @@ -333,12 +310,7 @@ PostgresBackend::fetchLedgerPage( std::vector objects; for (size_t i = 0; i < numRows; ++i) { - ripple::uint256 key; - if (!key.parseHex(res.c_str(i, 0))) - throw std::runtime_error("Error parsing key from postgres"); - char const* object = res.c_str(i, 1); - std::string_view view{object}; - objects.push_back({std::move(key), {view.front(), view.back()}}); + objects.push_back({res.asUInt256(i, 0), res.asUnHexedBlob(i, 1)}); } if (numRows == limit) return {objects, objects[objects.size() - 1].key}; @@ -357,13 +329,13 @@ PostgresBackend::fetchBookOffers( { PgQuery pgQuery(pgPool_); std::stringstream sql; - sql << "SELECT key FROM" - << " (SELECT DISTINCT ON (key) * FROM books WHERE book = " + sql << "SELECT offer_key FROM" + << " (SELECT DISTINCT ON (offer_key) * FROM books WHERE book = " << "\'\\x" << ripple::strHex(book) << "\' AND ledger_seq <= " << std::to_string(ledgerSequence); if (cursor) - sql << " AND key > \'" << ripple::strHex(*cursor) << "\'"; - sql << " ORDER BY key DESC, ledger_seq DESC)" + sql << " AND offer_key > \'" << ripple::strHex(*cursor) << "\'"; + sql << " ORDER BY offer_key DESC, ledger_seq DESC)" << " sub WHERE NOT deleted" << " LIMIT " << std::to_string(limit); auto res = pgQuery(sql.str().data()); @@ -372,10 +344,7 @@ PostgresBackend::fetchBookOffers( std::vector keys; for (size_t i = 0; i < numRows; ++i) { - ripple::uint256 key; - if (!key.parseHex(res.c_str(i, 0))) - throw std::runtime_error("Error parsing key from postgres"); - keys.push_back(std::move(key)); + keys.push_back(res.asUInt256(i, 0)); } std::vector blobs = fetchLedgerObjects(keys, ledgerSequence); std::vector results; @@ -414,14 +383,8 @@ PostgresBackend::fetchTransactions( std::vector results; for (size_t i = 0; i < numRows; ++i) { - char const* txn = res.c_str(i, 0); - char const* metadata = res.c_str(i, 1); - std::string_view txnView{txn}; - std::string_view metadataView{metadata}; - results.push_back( - {{txnView.front(), txnView.back()}, - {metadataView.front(), metadataView.back()}}); + {res.asUnHexedBlob(i, 0), res.asUnHexedBlob(i, 1)}); } return results; } @@ -436,7 +399,7 @@ PostgresBackend::fetchLedgerObjects( { PgQuery pgQuery(pgPool_); std::stringstream sql; - sql << "SELECT object FROM objects WHERE"; + sql << "SELECT DISTINCT ON(key) object FROM objects WHERE"; bool first = true; for (auto const& key : keys) @@ -444,27 +407,27 @@ PostgresBackend::fetchLedgerObjects( if (!first) { sql << " OR "; - first = false; } else { sql << " ( "; + first = false; } sql << " key = " << "\'\\x" << ripple::strHex(key) << "\'"; } sql << " ) " << " AND ledger_seq <= " << std::to_string(sequence) - << " ORDER BY ledger_seq DESC LIMIT 1"; + << " ORDER BY key, ledger_seq DESC"; + + BOOST_LOG_TRIVIAL(info) << sql.str(); auto res = pgQuery(sql.str().data()); if (size_t numRows = checkResult(res, 1)) { std::vector results; for (size_t i = 0; i < numRows; ++i) { - char const* object = res.c_str(i, 0); - std::string_view view{object}; - results.push_back({view.front(), view.back()}); + results.push_back(res.asUnHexedBlob(i, 0)); } return results; } @@ -483,7 +446,7 @@ PostgresBackend::fetchAccountTransactions( std::stringstream sql; sql << "SELECT hash, ledger_seq, transaction_index FROM " "account_transactions WHERE account = " - << ripple::strHex(account); + << "\'\\x" << ripple::strHex(account) << "\'"; if (cursor) sql << " AND ledger_seq < " << cursor->ledgerSequence << " AND transaction_index < " << cursor->transactionIndex; @@ -494,11 +457,7 @@ PostgresBackend::fetchAccountTransactions( std::vector hashes; for (size_t i = 0; i < numRows; ++i) { - ripple::uint256 hash; - if (!hash.parseHex(res.c_str(i, 0))) - throw std::runtime_error( - "Error parsing transaction hash from Postgres"); - hashes.push_back(std::move(hash)); + hashes.push_back(res.asUInt256(i, 0)); } if (numRows == limit) @@ -545,12 +504,12 @@ PostgresBackend::finishWrites() const if (abortWrite_) return false; PgQuery pg(pgPool_); - std::string objectsStr = objectsBuffer_.str(); - if (objectsStr.size()) - pg.bulkInsert("objects", objectsStr); pg.bulkInsert("transactions", transactionsBuffer_.str()); pg.bulkInsert("books", booksBuffer_.str()); pg.bulkInsert("account_transactions", accountTxBuffer_.str()); + std::string objectsStr = objectsBuffer_.str(); + if (objectsStr.size()) + pg.bulkInsert("objects", objectsStr); auto res = pg("COMMIT"); if (!res || res.status() != PGRES_COMMAND_OK) { diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 25ab79fd..c3fb1f30 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -72,10 +72,9 @@ ReportingETL::insertTransactions( BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << "Inserting transaction = " << sttx.getTransactionID(); - ripple::uint256 nodestoreHash = sttx.getTransactionID(); auto journal = ripple::debugLog(); - accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal); + accountTxData.emplace_back(txMeta, sttx.getTransactionID(), journal); std::string keyStr{(const char*)sttx.getTransactionID().data(), 32}; flatMapBackend_->writeTransaction( std::move(keyStr), @@ -115,6 +114,8 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) << "Deserialized ledger header. " << detail::toString(lgrInfo); flatMapBackend_->startWrites(); + flatMapBackend_->writeLedger( + lgrInfo, std::move(*ledgerData->mutable_ledger_header())); std::vector accountTxData = insertTransactions(lgrInfo, *ledgerData); @@ -129,8 +130,6 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) if (!stopping_) { flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); - flatMapBackend_->writeLedger( - lgrInfo, std::move(*ledgerData->mutable_ledger_header())); } flatMapBackend_->finishWrites(); auto end = std::chrono::system_clock::now(); diff --git a/test.py b/test.py index 99cbc229..1cf5d920 100755 --- a/test.py +++ b/test.py @@ -19,11 +19,11 @@ async def account_info(ip, port, account, ledger): if ledger is None: await ws.send(json.dumps({"command":"account_info","account":account})) res = json.loads(await ws.recv()) - print(res) + print(json.dumps(res,indent=4,sort_keys=True)) else: await ws.send(json.dumps({"command":"account_info","account":account, "ledger_index":int(ledger)})) res = json.loads(await ws.recv()) - print(res) + print(json.dumps(res,indent=4,sort_keys=True)) except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -33,7 +33,7 @@ async def account_tx(ip, port, account): async with websockets.connect(address) as ws: await ws.send(json.dumps({"command":"account_tx","account":account})) res = json.loads(await ws.recv()) - print(res) + print(json.dumps(res,indent=4,sort_keys=True)) except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -43,7 +43,7 @@ async def tx(ip, port, tx_hash): async with websockets.connect(address) as ws: await ws.send(json.dumps({"command":"tx","transaction":tx_hash})) res = json.loads(await ws.recv()) - print(res) + print(json.dumps(res,indent=4,sort_keys=True)) except websockets.exceptions.connectionclosederror as e: print(e) @@ -54,7 +54,7 @@ async def ledger_data(ip, port, ledger, limit): async with websockets.connect(address) as ws: await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"limit":int(limit),"binary":True})) res = json.loads(await ws.recv()) - print(res) + print(json.dumps(res,indent=4,sort_keys=True)) except websockets.exceptions.connectionclosederror as e: print(e) @@ -102,14 +102,23 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, await ws.send(json.dumps({"command":"book_offers","ledger_index":int(ledger), "taker_pays":taker_pays, "taker_gets":taker_gets})) res = json.loads(await ws.recv()) - print(res) + print(json.dumps(res,indent=4,sort_keys=True)) except websockets.exceptions.connectionclosederror as e: print(e) +async def ledger(ip, port, ledger): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger)})) + res = json.loads(await ws.recv()) + print(json.dumps(res,indent=4,sort_keys=True)) + except websockets.exceptions.connectionclosederror as e: + print(e) parser = argparse.ArgumentParser(description='test script for xrpl-reporting') -parser.add_argument('action', choices=["account_info", "tx", "account_tx", "ledger_data", "ledger_data_full", "book_offers"]) +parser.add_argument('action', choices=["account_info", "tx", "account_tx", "ledger_data", "ledger_data_full", "book_offers","ledger"]) parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--port', default='8080') parser.add_argument('--hash') @@ -143,6 +152,9 @@ def run(args): elif args.action == "ledger_data_full": asyncio.get_event_loop().run_until_complete( ledger_data_full(args.ip, args.port, args.ledger)) + elif args.action == "ledger": + asyncio.get_event_loop().run_until_complete( + ledger(args.ip, args.port, args.ledger)) elif args.action == "book_offers": asyncio.get_event_loop().run_until_complete( book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer)) diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp index 4ab6ad60..fec6535f 100644 --- a/websocket_server_async.cpp +++ b/websocket_server_async.cpp @@ -89,7 +89,7 @@ buildResponse( return doAccountTx(request, backend); break; case ledger: - return doLedgerData(request, backend); + return doLedger(request, backend); break; case ledger_data: return doLedgerData(request, backend); @@ -201,8 +201,9 @@ public: boost::json::value raw = boost::json::parse(msg); // BOOST_LOG_TRIVIAL(debug) << __func__ << " parsed"; boost::json::object request = raw.as_object(); + BOOST_LOG_TRIVIAL(debug) << " received request : " << request; auto response = buildResponse(request, backend_); - BOOST_LOG_TRIVIAL(debug) << __func__ << response; + BOOST_LOG_TRIVIAL(trace) << __func__ << response; response_ = boost::json::serialize(response); // Echo the message