From 4360d4b2197f167a3a6a4de70b053ce233a43308 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Mon, 11 Jan 2021 15:20:49 -0500 Subject: [PATCH] ledger_data --- CMakeLists.txt | 3 +- handlers/LedgerData.cpp | 140 ++++++++++++++++ reporting/ReportingBackend.h | 306 +++++++++++++++++++++++------------ test.py | 51 +++++- websocket_server_async.cpp | 12 +- 5 files changed, 399 insertions(+), 113 deletions(-) create mode 100644 handlers/LedgerData.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 80264830..960f9eb4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -63,7 +63,8 @@ target_sources(reporting PRIVATE handlers/AccountInfo.cpp handlers/Tx.cpp handlers/RPCHelpers.cpp - handlers/AccountTx.cpp) + handlers/AccountTx.cpp + handlers/LedgerData.cpp) message(${Boost_LIBRARIES}) diff --git a/handlers/LedgerData.cpp b/handlers/LedgerData.cpp new file mode 100644 index 00000000..dd36079c --- /dev/null +++ b/handlers/LedgerData.cpp @@ -0,0 +1,140 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2014 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +// Get state nodes from a ledger +// Inputs: +// limit: integer, maximum number of entries +// marker: opaque, resume point +// binary: boolean, format +// type: string // optional, defaults to all ledger node types +// Outputs: +// ledger_hash: chosen ledger's hash +// ledger_index: chosen ledger's index +// state: array of state nodes +// marker: resume point, if any +// +// +boost::json::object +doLedgerData( + boost::json::object const& request, + CassandraFlatMapBackend const& backend) +{ + boost::json::object response; + uint32_t ledger = request.at("ledger_index").as_int64(); + + std::optional marker = request.contains("marker") + ? request.at("marker").as_int64() + : std::optional{}; + size_t limit = + request.contains("limit") ? request.at("limit").as_int64() : 200; + auto start = std::chrono::system_clock::now(); + auto [results, returnedMarker] = + backend.doUpperBound(marker, ledger, limit); + + auto end = std::chrono::system_clock::now(); + + auto time = + std::chrono::duration_cast(end - start) + .count(); + boost::json::array objects; + for (auto const& [key, object] : results) + { + ripple::STLedgerEntry sle{ + ripple::SerialIter{object.data(), object.size()}, key}; + objects.push_back(getJson(sle)); + } + response["objects"] = objects; + if (returnedMarker) + response["marker"] = returnedMarker.value(); + + response["num_results"] = results.size(); + response["db_time"] = time; + + return response; +} + +/* +std::pair +doLedgerDataGrpc( + RPC::GRPCContext& context) +{ + org::xrpl::rpc::v1::GetLedgerDataRequest& request = context.params; + org::xrpl::rpc::v1::GetLedgerDataResponse response; + grpc::Status status = grpc::Status::OK; + + std::shared_ptr ledger; + if (RPC::ledgerFromRequest(ledger, context)) + { + grpc::Status errorStatus{ + grpc::StatusCode::NOT_FOUND, "ledger not found"}; + return {response, errorStatus}; + } + + ReadView::key_type key = ReadView::key_type(); + if (request.marker().size() != 0) + { + key = uint256::fromVoid(request.marker().data()); + if (key.size() != request.marker().size()) + { + grpc::Status errorStatus{ + grpc::StatusCode::INVALID_ARGUMENT, "marker malformed"}; + return {response, errorStatus}; + } + } + + auto e = ledger->sles.end(); + ReadView::key_type stopKey = ReadView::key_type(); + if (request.end_marker().size() != 0) + { + stopKey = uint256::fromVoid(request.end_marker().data()); + if (stopKey.size() != request.marker().size()) + { + grpc::Status errorStatus{ + grpc::StatusCode::INVALID_ARGUMENT, "end marker malformed"}; + return {response, errorStatus}; + } + e = ledger->sles.upper_bound(stopKey); + } + + int maxLimit = RPC::Tuning::pageLength(true); + + for (auto i = ledger->sles.upper_bound(key); i != e; ++i) + { + auto sle = ledger->read(keylet::unchecked((*i)->key())); + if (maxLimit-- <= 0) + { + // Stop processing before the current key. + auto k = sle->key(); + --k; + response.set_marker(k.data(), k.size()); + break; + } + auto stateObject = response.mutable_ledger_objects()->add_objects(); + Serializer s; + sle->add(s); + stateObject->set_data(s.peekData().data(), s.getLength()); + stateObject->set_key(sle->key().data(), sle->key().size()); + } + return {response, status}; +} +*/ diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index 4c8b8ac2..e687c94f 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -80,6 +80,8 @@ private: const CassPrepared* insertTransaction_ = nullptr; const CassPrepared* selectTransaction_ = nullptr; const CassPrepared* selectObject_ = nullptr; + const CassPrepared* upperBound_ = nullptr; + const CassPrepared* getToken_ = nullptr; // io_context used for exponential backoff for write retries mutable boost::asio::io_context ioContext_; @@ -547,10 +549,10 @@ public: */ cass_future_free(prepare_future); - /* - query.str(""); - query << "SELECT hash, entry FROM " << stateTableName - << " WHERE TOKEN(hash) >= TOKEN(?) and seq <= ?" + query = {}; + query << "SELECT key, object FROM " << tableName << "flat " + << " WHERE TOKEN(key) >= ? and sequence <= ?" + << " and object > 0x" << " PER PARTITION LIMIT 1 LIMIT ?" << " ALLOW FILTERING"; @@ -566,9 +568,9 @@ public: cass_future_free(prepare_future); std::stringstream ss; - ss << "nodestore: Error preparing state table select : " << rc - << ", " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + ss << "nodestore: Error preparing upperBound : " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str() << " : " << query.str(); continue; } @@ -579,7 +581,30 @@ public: // object // cass_future_free(prepare_future); - */ + query = {}; + query << "SELECT TOKEN(key) FROM " << tableName << "flat " + << " WHERE key = ? LIMIT 1"; + + prepare_future = + cass_session_prepare(session_.get(), query.str().c_str()); + + // Wait for the statement to prepare and get the result + rc = cass_future_error_code(prepare_future); + + if (rc != CASS_OK) + { + // Handle error + cass_future_free(prepare_future); + + std::stringstream ss; + ss << "nodestore: Error preparing getToken : " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + + getToken_ = cass_future_get_prepared(prepare_future); + setupPreparedStatements = true; } @@ -621,6 +646,16 @@ public: cass_prepared_free(selectObject_); selectObject_ = nullptr; } + if (upperBound_) + { + cass_prepared_free(upperBound_); + upperBound_ = nullptr; + } + if (getToken_) + { + cass_prepared_free(getToken_); + getToken_ = nullptr; + } work_.reset(); ioThread_.join(); } @@ -702,6 +737,64 @@ public: << " microseconds"; return result; } + + std::optional + getToken(void const* key) const + { + BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; + auto start = std::chrono::system_clock::now(); + CassStatement* statement = cass_prepared_bind(getToken_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + CassError rc = cass_statement_bind_bytes( + statement, 0, static_cast(key), 32); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + CassFuture* fut; + do + { + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + + CassResult const* res = cass_future_get_result(fut); + cass_statement_free(statement); + cass_future_free(fut); + + CassRow const* row = cass_result_first_row(res); + if (!row) + { + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows"; + cass_result_free(res); + return {}; + } + cass_int64_t token; + rc = cass_value_get_int64(cass_row_get_column(row, 0), &token); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + cass_result_free(res); + if (token == INT64_MAX) + return {}; + return token + 1; + } + std::optional< std::pair, std::vector>> fetchTransaction(void const* hash) const @@ -779,122 +872,121 @@ public: << " microseconds"; return {{txResult, metaResult}}; } - /* - std::pair>>, - Status> doUpperBound(uint256 marker, std::uint32_t seq, std::uint32_t limit) - override + struct LedgerObject + { + ripple::uint256 key; + std::vector blob; + }; + std::pair, std::optional> + doUpperBound( + std::optional marker, + std::uint32_t seq, + std::uint32_t limit) const + { + CassStatement* statement = cass_prepared_bind(upperBound_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + int64_t markerVal = marker ? marker.value() : INT64_MIN; + + CassError rc = cass_statement_bind_int64(statement, 0, markerVal); + if (rc != CASS_OK) { - CassStatement* statement = cass_prepared_bind(upperBound_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - - CassError rc = cass_statement_bind_bytes( - statement, - 0, - static_cast(marker.begin()), - keyBytes_); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra hash to doUpperBound query: " << rc - << ", " << cass_error_desc(rc); - return {{}, backendError}; - } - - rc = cass_statement_bind_int64(statement, 1, seq); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra seq to doUpperBound query: " << rc << - ", " - << cass_error_desc(rc); - return {{}, backendError}; - } - - rc = cass_statement_bind_int32(statement, 2, limit + 1); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra limit to doUpperBound query: " << rc - << ", " << cass_error_desc(rc); - return {{}, backendError}; - } - - CassFuture* fut; - do - { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ++counters_.readRetries; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - - CassResult const* res = cass_future_get_result(fut); cass_statement_free(statement); - cass_future_free(fut); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra hash to doUpperBound query: " << rc + << ", " << cass_error_desc(rc); + return {}; + } - std::vector>> result = {}; + rc = cass_statement_bind_int64(statement, 1, seq); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra seq to doUpperBound query: " << rc << ", " + << cass_error_desc(rc); + return {}; + } - CassIterator* iter = cass_iterator_from_result(res); - while (cass_iterator_next(iter)) + rc = cass_statement_bind_int32(statement, 2, limit + 1); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra limit to doUpperBound query: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + + CassFuture* fut; + do + { + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) { - CassRow const* row = cass_iterator_get_row(iter); + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); - cass_byte_t const* outData; - std::size_t outSize; + CassResult const* res = cass_future_get_result(fut); + cass_statement_free(statement); + cass_future_free(fut); - CassValue const* hash = cass_row_get_column(row, 0); - rc = cass_value_get_bytes(hash, &outData, &outSize); - if (rc != CASS_OK) - { - cass_iterator_free(iter); + std::vector result; - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); + CassIterator* iter = cass_iterator_from_result(res); + while (cass_iterator_next(iter)) + { + CassRow const* row = cass_iterator_get_row(iter); - return {{}, backendError}; - } - uint256 resultHash = uint256::fromVoid(outData); + cass_byte_t const* outData; + std::size_t outSize; - CassValue const* entry = cass_row_get_column(row, 1); - rc = cass_value_get_bytes(entry, &outData, &outSize); - if (rc != CASS_OK) - { - cass_iterator_free(iter); + CassValue const* hash = cass_row_get_column(row, 0); + rc = cass_value_get_bytes(hash, &outData, &outSize); + if (rc != CASS_OK) + { + cass_iterator_free(iter); - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + ripple::uint256 resultHash = ripple::uint256::fromVoid(outData); - return {{}, backendError}; - } + CassValue const* entry = cass_row_get_column(row, 1); + rc = cass_value_get_bytes(entry, &outData, &outSize); + if (rc != CASS_OK) + { + cass_iterator_free(iter); - nudb::detail::buffer bf; - auto [data, size] = lz4_decompress(outData, outSize, bf); - auto slice = Slice(data, size); - std::shared_ptr resultBlob = - std::make_shared(slice.begin(), slice.end()); + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + if (outSize > 0) + { + std::vector resultBlob{ + outData, outData + outSize}; result.push_back({resultHash, resultBlob}); } - - return {result, Status::ok}; } -*/ + if (result.size()) + { + auto token = getToken(result[result.size() - 1].key.data()); + assert(token); + return {result, token}; + } + + return {result, {}}; + } bool canFetchBatch() diff --git a/test.py b/test.py index a209be2a..60a169a1 100755 --- a/test.py +++ b/test.py @@ -44,19 +44,58 @@ async def tx(ip, port, tx_hash): await ws.send(json.dumps({"command":"tx","transaction":tx_hash})) res = json.loads(await ws.recv()) print(res) - except websockets.exceptions.ConnectionClosedError as e: + except websockets.exceptions.connectionclosederror as e: + print(e) + + +async def ledger_data(ip, port, ledger, limit): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"limit":int(limit)})) + res = json.loads(await ws.recv()) + print(res) + except websockets.exceptions.connectionclosederror as e: + print(e) + +async def ledger_data_full(ip, port, ledger): + address = 'ws://' + str(ip) + ':' + str(port) + try: + + async with websockets.connect(address) as ws: + marker = None + while True: + if marker is None: + await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger)})) + res = json.loads(await ws.recv()) + print(res) + + else: + await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"marker":marker})) + res = json.loads(await ws.recv()) + + if "marker" in res: + marker = int(res["marker"]) + print(marker) + elif "result" in res and "marker" in res["result"]: + marker = res["result"]["marker"] + print(marker) + else: + print("done") + break + except websockets.exceptions.connectionclosederror as e: print(e) - parser = argparse.ArgumentParser(description='test script for xrpl-reporting') -parser.add_argument('action', choices=["account_info", "tx", "account_tx"]) +parser.add_argument('action', choices=["account_info", "tx", "account_tx", "ledger_data", "ledger_data_full"]) parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--port', default='8080') parser.add_argument('--hash') parser.add_argument('--account', default="rLC64xxNif3GiY9FQnbaM4kcE6VvDhwRod") parser.add_argument('--ledger') +parser.add_argument('--limit', default='200') @@ -73,6 +112,12 @@ def run(args): elif args.action == "account_tx": asyncio.get_event_loop().run_until_complete( account_tx(args.ip, args.port, args.account)) + elif args.action == "ledger_data": + asyncio.get_event_loop().run_until_complete( + ledger_data(args.ip, args.port, args.ledger, args.limit)) + elif args.action == "ledger_data_full": + asyncio.get_event_loop().run_until_complete( + ledger_data_full(args.ip, args.port, args.ledger)) else: print("incorrect arguments") diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp index 5a7ea066..fa3a0624 100644 --- a/websocket_server_async.cpp +++ b/websocket_server_async.cpp @@ -35,12 +35,13 @@ #include //------------------------------------------------------------------------------ -enum RPCCommand { tx, account_tx, ledger, account_info }; +enum RPCCommand { tx, account_tx, ledger, account_info, ledger_data }; std::unordered_map commandMap{ {"tx", tx}, {"account_tx", account_tx}, {"ledger", ledger}, - {"account_info", account_info}}; + {"account_info", account_info}, + {"ledger_data", ledger_data}}; boost::json::object doAccountInfo( @@ -57,6 +58,10 @@ doAccountTx( boost::json::object const& request, CassandraFlatMapBackend const& backend, std::shared_ptr& pgPool); +boost::json::object +doLedgerData( + boost::json::object const& request, + CassandraFlatMapBackend const& backend); boost::json::object buildResponse( @@ -76,6 +81,9 @@ buildResponse( break; case ledger: break; + case ledger_data: + return doLedgerData(request, backend); + break; case account_info: return doAccountInfo(request, backend, pgPool); break;