From 943cac57ea3be3cc350da5f1a23f989f20e1dd30 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 23 Dec 2020 13:18:39 -0500 Subject: [PATCH] tx handler --- CMakeLists.txt | 4 +- handlers/RPCHelpers.cpp | 68 +++++++++++++++++++++++++++++++ handlers/RPCHelpers.h | 38 ++++++------------ handlers/Tx.cpp | 69 ++++++++++++++++++++++++++++++++ reporting/Pg.cpp | 34 ++++++++++++++++ reporting/Pg.h | 5 +++ reporting/ReportingBackend.h | 77 ++++++++++++++++++++++++++++++++++++ reporting/ReportingETL.h | 6 +++ test.py | 16 +++++++- websocket_server_async.cpp | 34 +++++++++++----- 10 files changed, 314 insertions(+), 37 deletions(-) create mode 100644 handlers/RPCHelpers.cpp create mode 100644 handlers/Tx.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index af74c560..a971909d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,7 +60,9 @@ target_sources(reporting PRIVATE reporting/Pg.cpp reporting/DBHelpers.cpp reporting/ReportingETL.cpp - handlers/AccountInfo.cpp) + handlers/AccountInfo.cpp + handlers/Tx.cpp + handlers/RPCHelpers.cpp) message(${Boost_LIBRARIES}) diff --git a/handlers/RPCHelpers.cpp b/handlers/RPCHelpers.cpp new file mode 100644 index 00000000..c33aab91 --- /dev/null +++ b/handlers/RPCHelpers.cpp @@ -0,0 +1,68 @@ +#include + +std::optional +accountFromStringStrict(std::string const& account) +{ + boost::optional result; + + auto const publicKey = ripple::parseBase58( + ripple::TokenType::AccountPublic, account); + + if (publicKey) + result = ripple::calcAccountID(*publicKey); + else + result = ripple::parseBase58(account); + + if (result) + return result.value(); + else + return {}; +} +std::pair< + std::shared_ptr, + std::shared_ptr> +deserializeTxPlusMeta( + std::pair, std::vector> const& + blobs) +{ + std::pair< + std::shared_ptr, + std::shared_ptr> + result; + { + ripple::SerialIter s{blobs.first.data(), blobs.first.size()}; + result.first = std::make_shared(s); + } + { + ripple::SerialIter s{blobs.second.data(), blobs.second.size()}; + result.second = + std::make_shared(s, ripple::sfMetadata); + } + return result; +} + +boost::json::object +getJson(ripple::STBase const& obj) +{ + auto start = std::chrono::system_clock::now(); + boost::json::value value = boost::json::parse( + obj.getJson(ripple::JsonOptions::none).toStyledString()); + auto end = std::chrono::system_clock::now(); + value.as_object()["deserialization_time_microseconds"] = + std::chrono::duration_cast(end - start) + .count(); + return value.as_object(); +} + +boost::json::object +getJson(ripple::SLE const& sle) +{ + auto start = std::chrono::system_clock::now(); + boost::json::value value = boost::json::parse( + sle.getJson(ripple::JsonOptions::none).toStyledString()); + auto end = std::chrono::system_clock::now(); + value.as_object()["deserialization_time_microseconds"] = + std::chrono::duration_cast(end - start) + .count(); + return value.as_object(); +} diff --git a/handlers/RPCHelpers.h b/handlers/RPCHelpers.h index 6876f9d4..86f701e1 100644 --- a/handlers/RPCHelpers.h +++ b/handlers/RPCHelpers.h @@ -3,36 +3,22 @@ #define XRPL_REPORTING_RPCHELPERS_H_INCLUDED #include +#include #include std::optional -accountFromStringStrict(std::string const& account) -{ - boost::optional result; +accountFromStringStrict(std::string const& account); - auto const publicKey = ripple::parseBase58( - ripple::TokenType::AccountPublic, account); +std::pair< + std::shared_ptr, + std::shared_ptr> +deserializeTxPlusMeta( + std::pair, std::vector> const& + blobs); - if (publicKey) - result = ripple::calcAccountID(*publicKey); - else - result = ripple::parseBase58(account); - - if (result) - return result.value(); - else - return {}; -} boost::json::object -getJson(ripple::SLE const& sle) -{ - auto start = std::chrono::system_clock::now(); - boost::json::value value = boost::json::parse( - sle.getJson(ripple::JsonOptions::none).toStyledString()); - auto end = std::chrono::system_clock::now(); - value.as_object()["deserialization_time_microseconds"] = - std::chrono::duration_cast(end - start) - .count(); - return value.as_object(); -} +getJson(ripple::STBase const& obj); + +boost::json::object +getJson(ripple::SLE const& sle); #endif diff --git a/handlers/Tx.cpp b/handlers/Tx.cpp new file mode 100644 index 00000000..0964d79d --- /dev/null +++ b/handlers/Tx.cpp @@ -0,0 +1,69 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2014 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include + +// { +// transaction: +// } + +boost::json::object +doTx( + boost::json::object const& request, + CassandraFlatMapBackend const& backend, + std::shared_ptr& postgres) +{ + boost::json::object response; + if (!request.contains("transaction")) + { + response["error"] = "Please specify a transaction hash"; + return response; + } + ripple::uint256 hash; + if (!hash.parseHex(request.at("transaction").as_string().c_str())) + { + response["error"] = "Error parsing transaction hash"; + return response; + } + + auto range = getLedgerRange(postgres); + if (!range) + { + response["error"] = "Database is empty"; + return response; + } + + auto dbResponse = backend.fetchTransaction(hash.data()); + if (!dbResponse) + { + response["error"] = "Transaction not found in Cassandra"; + response["ledger_range"] = std::to_string(range->lower()) + " - " + + std::to_string(range->upper()); + + return response; + } + + auto [sttx, meta] = deserializeTxPlusMeta(dbResponse.value()); + response["transaction"] = getJson(*sttx); + response["meta"] = getJson(*meta); + return response; +} + diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index 862792b5..229886a4 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -1534,3 +1534,37 @@ getLedger( return info; } + +std::optional +getLedgerRange(std::shared_ptr& pgPool) +{ + auto range = PgQuery(pgPool)("SELECT complete_ledgers()"); + if (!range) + return {}; + + std::string res{range.c_str()}; + try + { + size_t minVal = 0; + size_t maxVal = 0; + if (res == "empty" || res == "error" || res.empty()) + return {}; + else if (size_t delim = res.find('-'); delim != std::string::npos) + { + minVal = std::stol(res.substr(0, delim)); + maxVal = std::stol(res.substr(delim + 1)); + } + else + { + minVal = maxVal = std::stol(res); + } + return LedgerRange{minVal, maxVal}; + } + catch (std::exception&) + { + BOOST_LOG_TRIVIAL(error) + << __func__ << " : " + << "Error parsing result of getCompleteLedgers()"; + } + return {}; +} diff --git a/reporting/Pg.h b/reporting/Pg.h index 00675a28..43b35884 100644 --- a/reporting/Pg.h +++ b/reporting/Pg.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -523,4 +524,8 @@ getLedger( std::variant const& whichLedger, std::shared_ptr& pgPool); +using LedgerRange = boost::icl::closed_interval; +std::optional +getLedgerRange(std::shared_ptr& pgPool); + #endif // RIPPLE_CORE_PG_H_INCLUDED diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index 3fdfaaef..acaf8c37 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -700,6 +700,83 @@ public: << " microseconds"; return result; } + std::optional< + std::pair, std::vector>> + fetchTransaction(void const* hash) const + { + BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; + auto start = std::chrono::system_clock::now(); + CassStatement* statement = cass_prepared_bind(selectTransaction_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + CassError rc = cass_statement_bind_bytes( + statement, 0, static_cast(hash), 32); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + CassFuture* fut; + do + { + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + + CassResult const* res = cass_future_get_result(fut); + cass_statement_free(statement); + cass_future_free(fut); + + CassRow const* row = cass_result_first_row(res); + if (!row) + { + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows"; + cass_result_free(res); + return {}; + } + cass_byte_t const* txBuf; + std::size_t txBufSize; + rc = cass_value_get_bytes( + cass_row_get_column(row, 0), &txBuf, &txBufSize); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + std::vector txResult{txBuf, txBuf + txBufSize}; + cass_byte_t const* metaBuf; + std::size_t metaBufSize; + rc = cass_value_get_bytes( + cass_row_get_column(row, 0), &metaBuf, &metaBufSize); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + std::vector metaResult{metaBuf, metaBuf + metaBufSize}; + cass_result_free(res); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(debug) + << "Fetched from cassandra in " + << std::chrono::duration_cast( + end - start) + .count() + << " microseconds"; + return {{txResult, metaResult}}; + } /* std::pair>>, Status> doUpperBound(uint256 marker, std::uint32_t seq, std::uint32_t limit) diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index 11a4f2e0..7fc1d676 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -326,6 +326,12 @@ public: return flatMapBackend_; } + std::shared_ptr& + getPgPool() + { + return pgPool_; + } + private: void doWork(); diff --git a/test.py b/test.py index 551b8422..24fb445a 100755 --- a/test.py +++ b/test.py @@ -22,13 +22,24 @@ async def account_info(ip, port): except websockets.exceptions.ConnectionClosedError as e: print(e) +async def tx(ip, port, tx_hash): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"tx","transaction":tx_hash})) + res = json.loads(await ws.recv()) + print(res) + except websockets.exceptions.ConnectionClosedError as e: + print(e) + parser = argparse.ArgumentParser(description='test script for xrpl-reporting') -parser.add_argument('action', choices=["account_info"]) +parser.add_argument('action', choices=["account_info", "tx"]) parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--port', default='8080') +parser.add_argument('--hash') @@ -39,6 +50,9 @@ def run(args): if args.action == "account_info": asyncio.get_event_loop().run_until_complete( account_info(args.ip, args.port)) + if args.action == "tx": + asyncio.get_event_loop().run_until_complete( + tx(args.ip, args.port, args.hash)) else: print("incorrect arguments") diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp index 19eb1888..1d0fa07e 100644 --- a/websocket_server_async.cpp +++ b/websocket_server_async.cpp @@ -47,15 +47,23 @@ doAccountInfo( boost::json::object const& request, CassandraFlatMapBackend const& backend); boost::json::object +doTx( + boost::json::object const& request, + CassandraFlatMapBackend const& backend, + std::shared_ptr& pgPool); + +boost::json::object buildResponse( boost::json::object const& request, - CassandraFlatMapBackend const& backend) + CassandraFlatMapBackend const& backend, + std::shared_ptr& pgPool) { std::string command = request.at("command").as_string().c_str(); boost::json::object response; switch (commandMap[command]) { case tx: + return doTx(request, backend, pgPool); break; case account_tx: break; @@ -81,14 +89,17 @@ class session : public std::enable_shared_from_this { boost::beast::websocket::stream ws_; boost::beast::flat_buffer buffer_; + std::string response_; CassandraFlatMapBackend const& backend_; + std::shared_ptr& pgPool_; public: // Take ownership of the socket explicit session( boost::asio::ip::tcp::socket&& socket, - CassandraFlatMapBackend const& backend) - : ws_(std::move(socket)), backend_(backend) + CassandraFlatMapBackend const& backend, + std::shared_ptr& pgPool) + : ws_(std::move(socket)), backend_(backend), pgPool_(pgPool) { } @@ -164,13 +175,14 @@ public: boost::json::value raw = boost::json::parse(msg); // BOOST_LOG_TRIVIAL(debug) << __func__ << " parsed"; boost::json::object request = raw.as_object(); - auto response = buildResponse(request, backend_); + auto response = buildResponse(request, backend_, pgPool_); BOOST_LOG_TRIVIAL(debug) << __func__ << response; + response_ = boost::json::serialize(response); // Echo the message ws_.text(ws_.got_text()); ws_.async_write( - boost::asio::buffer(boost::json::serialize(response)), + boost::asio::buffer(response_), boost::beast::bind_front_handler( &session::on_write, shared_from_this())); } @@ -199,13 +211,15 @@ class listener : public std::enable_shared_from_this boost::asio::io_context& ioc_; boost::asio::ip::tcp::acceptor acceptor_; CassandraFlatMapBackend const& backend_; + std::shared_ptr& pgPool_; public: listener( boost::asio::io_context& ioc, boost::asio::ip::tcp::endpoint endpoint, - CassandraFlatMapBackend const& backend) - : ioc_(ioc), acceptor_(ioc), backend_(backend) + CassandraFlatMapBackend const& backend, + std::shared_ptr& pgPool) + : ioc_(ioc), acceptor_(ioc), backend_(backend), pgPool_(pgPool) { boost::beast::error_code ec; @@ -270,7 +284,8 @@ private: else { // Create the session and run it - std::make_shared(std::move(socket), backend_)->run(); + std::make_shared(std::move(socket), backend_, pgPool_) + ->run(); } // Accept another connection @@ -376,7 +391,8 @@ main(int argc, char* argv[]) std::make_shared( ioc, boost::asio::ip::tcp::endpoint{address, port}, - etl.getFlatMapBackend()) + etl.getFlatMapBackend(), + etl.getPgPool()) ->run(); // Run the I/O service on the requested number of threads