From 78c6bde902eed8fe18bdfd31a2033a107f35dcee Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 23 Feb 2021 16:13:48 -0500 Subject: [PATCH] clean up ledger page interface. wire up account_tx --- handlers/AccountInfo.cpp | 4 ++-- handlers/AccountTx.cpp | 39 ++++++++++++++++++++++++++++++------ handlers/BookOffers.cpp | 2 +- handlers/LedgerData.cpp | 20 +++++++++--------- handlers/Tx.cpp | 4 ++-- reporting/ETLSource.cpp | 6 +++--- reporting/ETLSource.h | 8 ++++---- reporting/ReportingBackend.h | 21 +++++++++++-------- reporting/ReportingETL.cpp | 28 +++++++++++++------------- reporting/ReportingETL.h | 8 ++++---- websocket_server_async.cpp | 20 +++++++++--------- 11 files changed, 97 insertions(+), 63 deletions(-) diff --git a/handlers/AccountInfo.cpp b/handlers/AccountInfo.cpp index 8cf79fb1..59aec6c2 100644 --- a/handlers/AccountInfo.cpp +++ b/handlers/AccountInfo.cpp @@ -21,8 +21,8 @@ #include #include #include +#include #include -#include // { // account: , @@ -43,7 +43,7 @@ boost::json::object doAccountInfo( boost::json::object const& request, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& postgres) { boost::json::object response; diff --git a/handlers/AccountTx.cpp b/handlers/AccountTx.cpp index 62f5a4eb..07dfabc7 100644 --- a/handlers/AccountTx.cpp +++ b/handlers/AccountTx.cpp @@ -18,8 +18,8 @@ //============================================================================== #include +#include #include -#include std::vector, @@ -27,7 +27,7 @@ std::vector& pgPool, - CassandraFlatMapBackend const& backend) + BackendInterface const& backend) { pg_params dbParams; @@ -129,7 +129,7 @@ doAccountTxStoredProcedure( boost::json::object doAccountTx( boost::json::object const& request, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& pgPool) { boost::json::object response; @@ -148,16 +148,43 @@ doAccountTx( return response; } + std::optional cursor; + if (request.contains("cursor")) + { + auto const& obj = request.at("cursor").as_object(); + std::optional ledgerSequence; + if (obj.contains("ledger_sequence")) + { + ledgerSequence = (uint32_t)obj.at("ledger_sequence").as_int64(); + } + std::optional transactionIndex; + if (obj.contains("transaction_index")) + { + transactionIndex = (uint32_t)obj.at("transaction_index").as_int64(); + } + if (!ledgerSequence || !transactionIndex) + { + response["error"] = + "malformed cursor. include transaction_index and " + "ledger_sequence in an object named \"cursor\""; + return response; + } + cursor = {*ledgerSequence, *transactionIndex}; + } + boost::json::array txns; - auto res = doAccountTxStoredProcedure(*account, pgPool, backend); - for (auto const& [sttx, meta] : res) + auto [blobs, retCursor] = + backend.fetchAccountTransactions(*account, cursor); + for (auto const& txnPlusMeta : blobs) { boost::json::object obj; - obj["transaction"] = getJson(*sttx); + auto [txn, meta] = deserializeTxPlusMeta(txnPlusMeta); + obj["transaction"] = getJson(*txn); obj["metadata"] = getJson(*meta); txns.push_back(obj); } response["transactions"] = txns; + response["cursor"] = {}; return response; } diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index 8efa6619..7ac71836 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -7,9 +7,9 @@ #include #include #include +#include #include #include -#include std::optional ledgerSequenceFromRequest( diff --git a/handlers/LedgerData.cpp b/handlers/LedgerData.cpp index 09b3e215..07b78328 100644 --- a/handlers/LedgerData.cpp +++ b/handlers/LedgerData.cpp @@ -21,7 +21,7 @@ #include #include #include -#include +#include // Get state nodes from a ledger // Inputs: // limit: integer, maximum number of entries @@ -38,21 +38,23 @@ boost::json::object doLedgerData( boost::json::object const& request, - CassandraFlatMapBackend const& backend) + BackendInterface const& backend) { boost::json::object response; uint32_t ledger = request.at("ledger_index").as_int64(); - std::optional marker = request.contains("marker") - ? request.at("marker").as_int64() - : std::optional{}; + ripple::uint256 cursor; + if (request.contains("cursor")) + { + cursor.parseHex(request.at("cursor").as_string().c_str()); + } bool binary = request.contains("binary") ? request.at("binary").as_bool() : false; size_t limit = request.contains("limit") ? request.at("limit").as_int64() : (binary ? 2048 : 256); BackendInterface::LedgerPage page; auto start = std::chrono::system_clock::now(); - page = backend.fetchLedgerPage(marker, ledger, limit); + page = backend.fetchLedgerPage(cursor, ledger, limit); auto end = std::chrono::system_clock::now(); @@ -61,7 +63,7 @@ doLedgerData( .count(); boost::json::array objects; std::vector& results = page.objects; - std::optional& returnedMarker = page.cursor; + std::optional const& returnedCursor = page.cursor; BOOST_LOG_TRIVIAL(debug) << "doUpperBound returned " << results.size() << " results"; for (auto const& [key, object] : results) @@ -79,8 +81,8 @@ doLedgerData( objects.push_back(getJson(sle)); } response["objects"] = objects; - if (returnedMarker) - response["marker"] = returnedMarker.value(); + if (returnedCursor) + response["marker"] = ripple::strHex(*returnedCursor); response["num_results"] = results.size(); response["db_time"] = time; diff --git a/handlers/Tx.cpp b/handlers/Tx.cpp index 853fc9b1..65cc83bc 100644 --- a/handlers/Tx.cpp +++ b/handlers/Tx.cpp @@ -18,8 +18,8 @@ //============================================================================== #include +#include #include -#include // { // transaction: @@ -28,7 +28,7 @@ boost::json::object doTx( boost::json::object const& request, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& postgres) { boost::json::object response; diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index 83bd2399..8a6ff90a 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -31,7 +31,7 @@ // Primarly used in read-only mode, to monitor when ledgers are validated ETLSource::ETLSource( boost::json::object const& config, - CassandraFlatMapBackend& backend, + BackendInterface& backend, NetworkValidatedLedgers& networkValidatedLedgers, boost::asio::io_context& ioContext) : ioc_(ioContext) @@ -400,7 +400,7 @@ public: process( std::unique_ptr& stub, grpc::CompletionQueue& cq, - CassandraFlatMapBackend& backend, + BackendInterface& backend, bool abort = false) { BOOST_LOG_TRIVIAL(info) << "Processing response. " @@ -582,7 +582,7 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects) } ETLLoadBalancer::ETLLoadBalancer( boost::json::array const& config, - CassandraFlatMapBackend& backend, + BackendInterface& backend, NetworkValidatedLedgers& nwvl, boost::asio::io_context& ioContext) { diff --git a/reporting/ETLSource.h b/reporting/ETLSource.h index 6befc0a8..2b942b10 100644 --- a/reporting/ETLSource.h +++ b/reporting/ETLSource.h @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" #include @@ -85,7 +85,7 @@ class ETLSource // used for retrying connections boost::asio::steady_timer timer_; - CassandraFlatMapBackend& backend_; + BackendInterface& backend_; public: bool @@ -113,7 +113,7 @@ public: /// Primarly used in read-only mode, to monitor when ledgers are validated ETLSource( boost::json::object const& config, - CassandraFlatMapBackend& backend, + BackendInterface& backend, NetworkValidatedLedgers& networkValidatedLedgers, boost::asio::io_context& ioContext); @@ -285,7 +285,7 @@ private: public: ETLLoadBalancer( boost::json::array const& config, - CassandraFlatMapBackend& backend, + BackendInterface& backend, NetworkValidatedLedgers& nwvl, boost::asio::io_context& ioContext); diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index e8b1de78..d9dde638 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -55,7 +55,7 @@ flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); void flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); -class CassandraFlatMapBackend : BackendInterface +class CassandraFlatMapBackend : public BackendInterface { private: // convenience function for one-off queries. For normal reads and writes, @@ -141,7 +141,7 @@ public: { } - ~CassandraFlatMapBackend() + ~CassandraFlatMapBackend() override { if (open_) close(); @@ -915,16 +915,23 @@ public: } BackendInterface::LedgerPage fetchLedgerPage( - std::optional const& cursor, + std::optional const& cursor, std::uint32_t ledgerSequence, std::uint32_t limit) const override { BOOST_LOG_TRIVIAL(debug) << "Starting doUpperBound"; CassStatement* statement = cass_prepared_bind(upperBound_); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - int64_t cursorVal = cursor.has_value() ? cursor.value() : INT64_MIN; - CassError rc = cass_statement_bind_int64(statement, 0, cursorVal); + int64_t intCursor = INT64_MIN; + if (cursor) + { + auto token = getToken(cursor->data()); + if (token) + intCursor = *token; + } + + CassError rc = cass_statement_bind_int64(statement, 0, intCursor); if (rc != CASS_OK) { cass_statement_free(statement); @@ -1017,9 +1024,7 @@ public: { results.push_back({keys[i], objs[i]}); } - auto token = getToken(results[results.size() - 1].key.data()); - assert(token); - return {results, token}; + return {results, keys[keys.size() - 1]}; } return {{}, {}}; diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index d64cf3f5..a0384faf 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -18,6 +18,7 @@ //============================================================================== #include +#include #include #include @@ -76,7 +77,7 @@ ReportingETL::insertTransactions( auto journal = ripple::debugLog(); accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal); std::string keyStr{(const char*)sttx.getTransactionID().data(), 32}; - flatMapBackend_.writeTransaction( + flatMapBackend_->writeTransaction( std::move(keyStr), ledger.seq, std::move(*raw), @@ -89,7 +90,7 @@ std::optional ReportingETL::loadInitialLedger(uint32_t startingSequence) { // check that database is actually empty - auto ledger = flatMapBackend_.fetchLedgerBySequence(startingSequence); + auto ledger = flatMapBackend_->fetchLedgerBySequence(startingSequence); if (ledger) { BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " @@ -128,9 +129,9 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) { for (auto& data : accountTxData) { - flatMapBackend_.writeAccountTransactions(std::move(data)); + flatMapBackend_->writeAccountTransactions(std::move(data)); } - bool success = flatMapBackend_.writeLedger( + bool success = flatMapBackend_->writeLedger( lgrInfo, std::move(*ledgerData->mutable_ledger_header())); } auto end = std::chrono::system_clock::now(); @@ -155,7 +156,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) size_t numAttempts = 0; while (!stopping_) { - auto ledger = flatMapBackend_.fetchLedgerBySequence(ledgerSequence); + auto ledger = flatMapBackend_->fetchLedgerBySequence(ledgerSequence); if (!ledger) { @@ -292,7 +293,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) } assert(not(isCreated and isDeleted)); - flatMapBackend_.writeLedgerObject( + flatMapBackend_->writeLedgerObject( std::move(*obj.mutable_key()), lgrInfo.seq, std::move(*obj.mutable_data()), @@ -302,9 +303,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) } for (auto& data : accountTxData) { - flatMapBackend_.writeAccountTransactions(std::move(data)); + flatMapBackend_->writeAccountTransactions(std::move(data)); } - bool success = flatMapBackend_.writeLedger( + bool success = flatMapBackend_->writeLedger( lgrInfo, std::move(*rawData.mutable_ledger_header())); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " @@ -347,7 +348,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence) << "Starting etl pipeline"; writing_ = true; - auto parent = flatMapBackend_.fetchLedgerBySequence(startSequence - 1); + auto parent = flatMapBackend_->fetchLedgerBySequence(startSequence - 1); if (!parent) { assert(false); @@ -482,7 +483,7 @@ void ReportingETL::monitor() { std::optional latestSequence = - flatMapBackend_.fetchLatestLedgerSequence(); + flatMapBackend_->fetchLatestLedgerSequence(); if (!latestSequence) { BOOST_LOG_TRIVIAL(info) << __func__ << " : " @@ -637,17 +638,16 @@ ReportingETL::ReportingETL( boost::asio::io_context& ioc) : publishStrand_(ioc) , ioContext_(ioc) - , flatMapBackend_( - config.at("database").as_object().at("cassandra").as_object()) + , flatMapBackend_(makeBackend(config)) , pgPool_(make_PgPool( config.at("database").as_object().at("postgres").as_object())) , loadBalancer_( config.at("etl_sources").as_array(), - flatMapBackend_, + *flatMapBackend_, networkValidatedLedgers_, ioc) { - flatMapBackend_.open(); + flatMapBackend_->open(); initSchema(pgPool_); } diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index f14017c0..86537f1d 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -25,10 +25,10 @@ #include #include #include +#include #include #include #include -#include #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" #include @@ -59,7 +59,7 @@ struct AccountTransactionsData; class ReportingETL { private: - CassandraFlatMapBackend flatMapBackend_; + std::unique_ptr flatMapBackend_; std::shared_ptr pgPool_; std::thread worker_; @@ -321,10 +321,10 @@ public: return loadBalancer_; } - CassandraFlatMapBackend& + BackendInterface& getFlatMapBackend() { - return flatMapBackend_; + return *flatMapBackend_; } std::shared_ptr& diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp index 9e37ee5b..6cd09e52 100644 --- a/websocket_server_async.cpp +++ b/websocket_server_async.cpp @@ -54,32 +54,32 @@ std::unordered_map commandMap{ boost::json::object doAccountInfo( boost::json::object const& request, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& postgres); boost::json::object doTx( boost::json::object const& request, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& pgPool); boost::json::object doAccountTx( boost::json::object const& request, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& pgPool); boost::json::object doLedgerData( boost::json::object const& request, - CassandraFlatMapBackend const& backend); + BackendInterface const& backend); boost::json::object doBookOffers( boost::json::object const& request, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& pgPool); boost::json::object buildResponse( boost::json::object const& request, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& pgPool) { std::string command = request.at("command").as_string().c_str(); @@ -122,14 +122,14 @@ class session : public std::enable_shared_from_this boost::beast::websocket::stream ws_; boost::beast::flat_buffer buffer_; std::string response_; - CassandraFlatMapBackend const& backend_; + BackendInterface const& backend_; std::shared_ptr& pgPool_; public: // Take ownership of the socket explicit session( boost::asio::ip::tcp::socket&& socket, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& pgPool) : ws_(std::move(socket)), backend_(backend), pgPool_(pgPool) { @@ -242,14 +242,14 @@ class listener : public std::enable_shared_from_this { boost::asio::io_context& ioc_; boost::asio::ip::tcp::acceptor acceptor_; - CassandraFlatMapBackend const& backend_; + BackendInterface const& backend_; std::shared_ptr& pgPool_; public: listener( boost::asio::io_context& ioc, boost::asio::ip::tcp::endpoint endpoint, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& pgPool) : ioc_(ioc), acceptor_(ioc), backend_(backend), pgPool_(pgPool) {