From 8ee213f6bcce078384fe8b02313e14252388cde8 Mon Sep 17 00:00:00 2001 From: Nathan Nichols Date: Tue, 18 May 2021 08:06:46 -0700 Subject: [PATCH] Subscribe/Unsubscribe RPC handlers (#2) Subscribe to ledgers, transactions and accounts --- handlers/RPCHelpers.cpp | 35 ++++- handlers/Subscribe.cpp | 234 +++++++++++++++++++++++++++++++++ reporting/ReportingETL.cpp | 2 +- reporting/server/session.cpp | 48 +++++++ reporting/server/session.h | 243 +++++++++++++++++++++++++++++++++++ 5 files changed, 557 insertions(+), 5 deletions(-) create mode 100644 handlers/Subscribe.cpp create mode 100644 reporting/server/session.cpp create mode 100644 reporting/server/session.h diff --git a/handlers/RPCHelpers.cpp b/handlers/RPCHelpers.cpp index 79a49ea4..0d891d85 100644 --- a/handlers/RPCHelpers.cpp +++ b/handlers/RPCHelpers.cpp @@ -4,11 +4,21 @@ std::optional accountFromStringStrict(std::string const& account) { + auto blob = ripple::strUnHex(account); + + boost::optional publicKey = {}; + if (blob && ripple::publicKeyType(ripple::makeSlice(*blob))) + { + publicKey = ripple::PublicKey( + ripple::Slice{blob->data(), blob->size()}); + } + else + { + publicKey = ripple::parseBase58( + ripple::TokenType::AccountPublic, account); + } + boost::optional result; - - auto const publicKey = ripple::parseBase58( - ripple::TokenType::AccountPublic, account); - if (publicKey) result = ripple::calcAccountID(*publicKey); else @@ -41,6 +51,23 @@ deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs) return result; } + +std::pair< + std::shared_ptr, + std::shared_ptr> +deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_t seq) +{ + auto [tx, meta] = deserializeTxPlusMeta(blobs); + + std::shared_ptr m = + std::make_shared( + tx->getTransactionID(), + seq, + *meta); + + return {tx, m}; +} + boost::json::object getJson(ripple::STBase const& obj) { diff --git a/handlers/Subscribe.cpp b/handlers/Subscribe.cpp new file mode 100644 index 00000000..002bc53e --- /dev/null +++ b/handlers/Subscribe.cpp @@ -0,0 +1,234 @@ +#include +#include +#include + +static std::unordered_set validStreams { + "ledger", + "transactions" }; + +boost::json::value +validateStreams(boost::json::object const& request) +{ + if (!request.at("streams").is_array()) + { + return "missing or invalid streams"; + } + + boost::json::array const& streams = request.at("streams").as_array(); + + for (auto const& stream : streams) + { + if (!stream.is_string()) + { + return "streams must be strings"; + } + + std::string s = stream.as_string().c_str(); + + if (validStreams.find(s) == validStreams.end()) + { + return boost::json::string("invalid stream " + s); + } + } + + return nullptr; +} + +void +subscribeToStreams( + boost::json::object const& request, + std::shared_ptr& session, + SubscriptionManager& manager) +{ + boost::json::array const& streams = request.at("streams").as_array(); + + for (auto const& stream : streams) + { + std::string s = stream.as_string().c_str(); + + if (s == "ledger") + manager.subLedger(session); + else if (s == "transactions") + manager.subTransactions(session); + else + assert(false); + } +} + +void +unsubscribeToStreams( + boost::json::object const& request, + std::shared_ptr& session, + SubscriptionManager& manager) +{ + boost::json::array const& streams = request.at("streams").as_array(); + + for (auto const& stream : streams) + { + std::string s = stream.as_string().c_str(); + + if (s == "ledger") + manager.unsubLedger(session); + else if (s == "transactions") + manager.unsubTransactions(session); + else + assert(false); + } +} + +boost::json::value +validateAccounts(boost::json::object const& request) +{ + if (!request.at("accounts").is_array()) + { + return "accounts must be array"; + } + + boost::json::array const& accounts = request.at("accounts").as_array(); + + for (auto const& account : accounts) + { + if (!account.is_string()) + { + return "account must be strings"; + } + + std::string s = account.as_string().c_str(); + auto id = accountFromStringStrict(s); + + if (!id) + { + return boost::json::string("invalid account " + s); + } + } + + return nullptr; +} + +void +subscribeToAccounts( + boost::json::object const& request, + std::shared_ptr& session, + SubscriptionManager& manager) +{ + boost::json::array const& accounts = request.at("accounts").as_array(); + + for (auto const& account : accounts) + { + std::string s = account.as_string().c_str(); + + auto accountID = accountFromStringStrict(s); + + if(!accountID) + { + assert(false); + continue; + } + + manager.subAccount(*accountID, session); + } +} + +void +unsubscribeToAccounts( + boost::json::object const& request, + std::shared_ptr& session, + SubscriptionManager& manager) +{ + boost::json::array const& accounts = request.at("accounts").as_array(); + + for (auto const& account : accounts) + { + std::string s = account.as_string().c_str(); + + auto accountID = accountFromStringStrict(s); + + if(!accountID) + { + assert(false); + continue; + } + + manager.unsubAccount(*accountID, session); + } +} + +boost::json::object +doSubscribe( + boost::json::object const& request, + std::shared_ptr& session, + SubscriptionManager& manager) +{ + boost::json::object response; + + if (request.contains("streams")) + { + boost::json::value error = validateStreams(request); + + if(!error.is_null()) + { + response["error"] = error; + return response; + } + } + + if (request.contains("accounts")) + { + boost::json::value error = validateAccounts(request); + + if(!error.is_null()) + { + response["error"] = error; + return response; + } + } + + if (request.contains("streams")) + subscribeToStreams(request, session, manager); + + if (request.contains("accounts")) + subscribeToAccounts(request, session, manager); + + response["status"] = "success"; + return response; +} + +boost::json::object +doUnsubscribe( + boost::json::object const& request, + std::shared_ptr& session, + SubscriptionManager& manager) +{ + boost::json::object response; + + if (request.contains("streams")) + { + boost::json::value error = validateStreams(request); + + if(!error.is_null()) + { + response["error"] = error; + return response; + } + } + + if (request.contains("accounts")) + { + boost::json::value error = validateAccounts(request); + + if(!error.is_null()) + { + response["error"] = error; + return response; + } + } + + if (request.contains("streams")) + unsubscribeToStreams(request, session, manager); + + if (request.contains("accounts")) + unsubscribeToAccounts(request, session, manager); + + response["status"] = "success"; + return response; +} \ No newline at end of file diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 44dd1448..503169c4 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -329,6 +329,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) { if (finishSequence_ && startSequence > *finishSequence_) return {}; + /* * Behold, mortals! This function spawns three separate threads, which talk * to each other via 2 different thread safe queues and 1 atomic variable. @@ -386,7 +387,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) { auto transformQueue = std::make_shared(maxQueueSize); queues.push_back(transformQueue); - std::cout << "added to queues"; extractors.emplace_back([this, &startSequence, diff --git a/reporting/server/session.cpp b/reporting/server/session.cpp new file mode 100644 index 00000000..83dff5ee --- /dev/null +++ b/reporting/server/session.cpp @@ -0,0 +1,48 @@ +#include + +void +fail(boost::beast::error_code ec, char const* what) +{ + std::cerr << what << ": " << ec.message() << "\n"; +} + +boost::json::object +buildResponse( + boost::json::object const& request, + BackendInterface const& backend, + SubscriptionManager& subManager, + std::shared_ptr session) +{ + std::string command = request.at("command").as_string().c_str(); + boost::json::object response; + switch (commandMap[command]) + { + case tx: + return doTx(request, backend); + break; + case account_tx: + return doAccountTx(request, backend); + break; + case book_offers: + return doBookOffers(request, backend); + break; + case ledger: + return doLedger(request, backend); + break; + case ledger_data: + return doLedgerData(request, backend); + break; + case account_info: + return doAccountInfo(request, backend); + break; + case subscribe: + return doSubscribe(request, session, subManager); + break; + case unsubscribe: + return doUnsubscribe(request, session, subManager); + break; + default: + BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command; + } + return response; +} \ No newline at end of file diff --git a/reporting/server/session.h b/reporting/server/session.h new file mode 100644 index 00000000..6ad1795b --- /dev/null +++ b/reporting/server/session.h @@ -0,0 +1,243 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_REPORTING_SESSION_H +#define RIPPLE_REPORTING_SESSION_H + +#include +#include +#include + +#include + +#include + +class session; +class SubscriptionManager; + +static enum RPCCommand { tx, account_tx, ledger, account_info, book_offers, ledger_data, subscribe, unsubscribe }; +static std::unordered_map commandMap{ + {"tx", tx}, + {"account_tx", account_tx}, + {"ledger", ledger}, + {"account_info", account_info}, + {"book_offers", book_offers}, + {"ledger_data", ledger_data}, + {"subscribe", subscribe}, + {"unsubscribe", unsubscribe}}; + +boost::json::object +doAccountInfo( + boost::json::object const& request, + BackendInterface const& backend); +boost::json::object +doTx( + boost::json::object const& request, + BackendInterface const& backend); +boost::json::object +doAccountTx( + boost::json::object const& request, + BackendInterface const& backend); +boost::json::object +doBookOffers( + boost::json::object const& request, + BackendInterface const& backend); +boost::json::object +doLedgerData( + boost::json::object const& request, + BackendInterface const& backend); +boost::json::object +doLedger( + boost::json::object const& request, + BackendInterface const& backend); +boost::json::object +doSubscribe( + boost::json::object const& request, + std::shared_ptr& session, + SubscriptionManager& manager); +boost::json::object +doUnsubscribe( + boost::json::object const& request, + std::shared_ptr& session, + SubscriptionManager& manager); + +boost::json::object +buildResponse( + boost::json::object const& request, + BackendInterface const& backend, + SubscriptionManager& subManager, + std::shared_ptr session); + +void +fail(boost::beast::error_code ec, char const* what); + +// Echoes back all received WebSocket messages +class session : public std::enable_shared_from_this +{ + boost::beast::websocket::stream ws_; + boost::beast::flat_buffer buffer_; + std::string response_; + BackendInterface const& backend_; + SubscriptionManager& subscriptions_; + +public: + // Take ownership of the socket + explicit session( + boost::asio::ip::tcp::socket&& socket, + SubscriptionManager& subs, + BackendInterface const& backend) + : ws_(std::move(socket)) + , subscriptions_(subs) + , backend_(backend) + { + } + + // Get on the correct executor + void + run() + { + // We need to be executing within a strand to perform async operations + // on the I/O objects in this session. Although not strictly necessary + // for single-threaded contexts, this example code is written to be + // thread-safe by default. + boost::asio::dispatch( + ws_.get_executor(), + boost::beast::bind_front_handler( + &session::on_run, shared_from_this())); + } + + // Start the asynchronous operation + void + on_run() + { + // Set suggested timeout settings for the websocket + ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested( + boost::beast::role_type::server)); + + // Set a decorator to change the Server of the handshake + ws_.set_option(boost::beast::websocket::stream_base::decorator( + [](boost::beast::websocket::response_type& res) { + res.set( + boost::beast::http::field::server, + std::string(BOOST_BEAST_VERSION_STRING) + + " websocket-server-async"); + })); + // Accept the websocket handshake + ws_.async_accept(boost::beast::bind_front_handler( + &session::on_accept, shared_from_this())); + } + + void + on_accept(boost::beast::error_code ec) + { + if (ec) + return fail(ec, "accept"); + + // Read a message + do_read(); + } + + void + do_read() + { + // Read a message into our buffer + ws_.async_read( + buffer_, + boost::beast::bind_front_handler( + &session::on_read, shared_from_this())); + } + + void + on_read(boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + // This indicates that the session was closed + if (ec == boost::beast::websocket::error::closed) + return; + + if (ec) + fail(ec, "read"); + std::string msg{ + static_cast(buffer_.data().data()), buffer_.size()}; + // BOOST_LOG_TRIVIAL(debug) << __func__ << msg; + boost::json::object response; + try + { + boost::json::value raw = boost::json::parse(msg); + boost::json::object request = raw.as_object(); + BOOST_LOG_TRIVIAL(debug) << " received request : " << request; + try + { + response = buildResponse( + request, + backend_, + subscriptions_, + shared_from_this()); + } + catch (Backend::DatabaseTimeout const& t) + { + BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout"; + response["error"] = + "Database read timeout. Please retry the request"; + } + } + catch (std::exception const& e) + { + BOOST_LOG_TRIVIAL(error) + << __func__ << "caught exception : " << e.what(); + } + BOOST_LOG_TRIVIAL(trace) << __func__ << response; + response_ = boost::json::serialize(response); + + // Echo the message + ws_.text(ws_.got_text()); + ws_.async_write( + boost::asio::buffer(response_), + boost::beast::bind_front_handler( + &session::on_write, shared_from_this())); + } + + void + send(std::string&& msg) + { + ws_.text(ws_.got_text()); + ws_.async_write( + boost::asio::buffer(msg), + boost::beast::bind_front_handler( + &session::on_write, shared_from_this())); + } + + void + on_write(boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec) + return fail(ec, "write"); + + // Clear the buffer + buffer_.consume(buffer_.size()); + + // Do another read + do_read(); + } +}; + +#endif // RIPPLE_REPORTING_SESSION_H \ No newline at end of file