From 262cadf514d40b661e8f2f68dd8c1901ae0a69b4 Mon Sep 17 00:00:00 2001 From: Nathan Nichols Date: Wed, 26 May 2021 13:46:18 -0500 Subject: [PATCH] rebase session --- CMakeLists.txt | 1 + handlers/RPCHelpers.cpp | 47 ++++----- handlers/RPCHelpers.h | 4 + handlers/Subscribe.cpp | 116 ++++++++++++++++++++--- reporting/ETLSource.cpp | 73 +++++++------- reporting/ETLSource.h | 101 +++++++++++--------- reporting/P2pProxy.cpp | 53 ++++------- reporting/P2pProxy.h | 4 - reporting/ReportingETL.cpp | 1 + reporting/ReportingETL.h | 1 - reporting/server/SubscriptionManager.cpp | 43 +++++++++ reporting/server/SubscriptionManager.h | 17 ++++ reporting/server/listener.h | 11 +-- reporting/server/session.cpp | 8 +- reporting/server/session.h | 17 ++-- websocket_server_async.cpp | 5 +- 16 files changed, 336 insertions(+), 166 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b133b9f0..24cea9c1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,6 +60,7 @@ target_sources(reporting PRIVATE reporting/PostgresBackend.cpp reporting/BackendIndexer.cpp reporting/Pg.cpp + reporting/P2pProxy.cpp reporting/DBHelpers.cpp reporting/ReportingETL.cpp reporting/server/session.cpp diff --git a/handlers/RPCHelpers.cpp b/handlers/RPCHelpers.cpp index 8a2a0577..ab209487 100644 --- a/handlers/RPCHelpers.cpp +++ b/handlers/RPCHelpers.cpp @@ -57,7 +57,6 @@ std::pair< std::shared_ptr> deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_t seq) { -<<<<<<< HEAD auto [tx, meta] = deserializeTxPlusMeta(blobs); std::shared_ptr m = @@ -67,27 +66,6 @@ deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_ *meta); return {tx, m}; -======= - std::pair< - std::shared_ptr, - std::shared_ptr> - result; - { - ripple::SerialIter s{ - blobs.transaction.data(), blobs.transaction.size()}; - result.first = std::make_shared(s); - } - { - // ripple::Blob{blobs.metadata.data(), blobs.metadata.size()}; - - result.second = - std::make_shared( - result.first->getTransactionID(), - seq, - blobs.metadata); - } - return result; ->>>>>>> 5f429d4 (adds account subscription) } boost::json::object @@ -240,6 +218,7 @@ traverseOwnedNodes( return nextCursor; } + boost::optional parseRippleLibSeed(boost::json::value const& value) { @@ -400,3 +379,27 @@ keypairFromRequst(boost::json::object const& request, boost::json::value& error) return generateKeyPair(*keyType, *seed); } + +std::vector +getAccountsFromTransaction(boost::json::object const& transaction) +{ + std::vector accounts = {}; + for (auto const& [key, value] : transaction) + { + if (value.is_object()) + { + auto inObject = getAccountsFromTransaction(value.as_object()); + accounts.insert(accounts.end(), inObject.begin(), inObject.end()); + } + else if (value.is_string()) + { + auto account = accountFromStringStrict(value.as_string().c_str()); + if (account) + { + accounts.push_back(*account); + } + } + } + + return accounts; +} diff --git a/handlers/RPCHelpers.h b/handlers/RPCHelpers.h index ddafd0b7..a36e9290 100644 --- a/handlers/RPCHelpers.h +++ b/handlers/RPCHelpers.h @@ -44,9 +44,13 @@ traverseOwnedNodes( std::uint32_t sequence, ripple::uint256 const& cursor, std::function atOwnedNode); + std::pair keypairFromRequst( boost::json::object const& request, boost::json::value& error); + +std::vector +getAccountsFromTransaction(boost::json::object const& transaction); #endif diff --git a/handlers/Subscribe.cpp b/handlers/Subscribe.cpp index 002bc53e..0a5a20c0 100644 --- a/handlers/Subscribe.cpp +++ b/handlers/Subscribe.cpp @@ -4,7 +4,8 @@ static std::unordered_set validStreams { "ledger", - "transactions" }; + "transactions", + "transactions_proposed" }; boost::json::value validateStreams(boost::json::object const& request) @@ -50,6 +51,8 @@ subscribeToStreams( manager.subLedger(session); else if (s == "transactions") manager.subTransactions(session); + else if (s == "transactions_proposed") + manager.subProposedTransactions(session); else assert(false); } @@ -71,21 +74,18 @@ unsubscribeToStreams( manager.unsubLedger(session); else if (s == "transactions") manager.unsubTransactions(session); + else if (s == "transactions_proposed") + manager.unsubProposedTransactions(session); else assert(false); } } boost::json::value -validateAccounts(boost::json::object const& request) +validateAccounts( + boost::json::object const& request, + boost::json::array const& accounts) { - if (!request.at("accounts").is_array()) - { - return "accounts must be array"; - } - - boost::json::array const& accounts = request.at("accounts").as_array(); - for (auto const& account : accounts) { if (!account.is_string()) @@ -153,6 +153,55 @@ unsubscribeToAccounts( } } +void +subscribeToAccountsProposed( + boost::json::object const& request, + std::shared_ptr& session, + SubscriptionManager& manager) +{ + boost::json::array const& accounts = request.at("accounts_proposed").as_array(); + + for (auto const& account : accounts) + { + std::string s = account.as_string().c_str(); + + auto accountID = ripple::parseBase58(s); + + if(!accountID) + { + assert(false); + continue; + } + + manager.subProposedAccount(*accountID, session); + } +} + +void +unsubscribeToAccountsProposed( + boost::json::object const& request, + std::shared_ptr& session, + SubscriptionManager& manager) +{ + boost::json::array const& accounts = request.at("accounts_proposed").as_array(); + + for (auto const& account : accounts) + { + std::string s = account.as_string().c_str(); + + auto accountID = ripple::parseBase58(s); + + if(!accountID) + { + assert(false); + continue; + } + + manager.unsubProposedAccount(*accountID, session); + } +} + + boost::json::object doSubscribe( boost::json::object const& request, @@ -174,7 +223,33 @@ doSubscribe( if (request.contains("accounts")) { - boost::json::value error = validateAccounts(request); + + if (!request.at("accounts").is_array()) + { + response["error"] = "accounts must be array"; + return response; + } + + boost::json::array accounts = request.at("accounts").as_array(); + boost::json::value error = validateAccounts(request, accounts); + + if(!error.is_null()) + { + response["error"] = error; + return response; + } + } + + if (request.contains("accounts_proposed")) + { + if (!request.at("accounts_proposed").is_array()) + { + response["error"] = "accounts_proposed must be array"; + return response; + } + + boost::json::array accounts = request.at("accounts_proposed").as_array(); + boost::json::value error = validateAccounts(request, accounts); if(!error.is_null()) { @@ -189,6 +264,9 @@ doSubscribe( if (request.contains("accounts")) subscribeToAccounts(request, session, manager); + if (request.contains("accounts_proposed")) + subscribeToAccountsProposed(request, session, manager); + response["status"] = "success"; return response; } @@ -214,7 +292,20 @@ doUnsubscribe( if (request.contains("accounts")) { - boost::json::value error = validateAccounts(request); + boost::json::array accounts = request.at("accounts").as_array(); + boost::json::value error = validateAccounts(request, accounts); + + if(!error.is_null()) + { + response["error"] = error; + return response; + } + } + + if (request.contains("accounts_proposed")) + { + boost::json::array accounts = request.at("accounts_proposed").as_array(); + boost::json::value error = validateAccounts(request, accounts); if(!error.is_null()) { @@ -229,6 +320,9 @@ doUnsubscribe( if (request.contains("accounts")) unsubscribeToAccounts(request, session, manager); + if (request.contains("accounts_proposed")) + unsubscribeToAccountsProposed(request, session, manager); + response["status"] = "success"; return response; } \ No newline at end of file diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index fc508de1..f044da24 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -21,10 +21,13 @@ #include #include +#include #include #include #include +#include #include +#include // Create ETL source without grpc endpoint // Fetch ledger and load initial ledger will fail for this source @@ -32,6 +35,7 @@ ETLSource::ETLSource( boost::json::object const& config, BackendInterface& backend, + ReportingETL& etl, NetworkValidatedLedgers& networkValidatedLedgers, boost::asio::io_context& ioContext) : ioc_(ioContext) @@ -39,6 +43,7 @@ ETLSource::ETLSource( boost::beast::websocket::stream>( boost::asio::make_strand(ioc_))) , resolver_(boost::asio::make_strand(ioc_)) + , etl_(etl) , timer_(ioc_) , networkValidatedLedgers_(networkValidatedLedgers) , backend_(backend) @@ -325,14 +330,10 @@ ETLSource::handleMessage() { if (response.contains("transaction")) { - /* - if - (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this)) + if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this)) { - etl_.getApplication().getOPs().forwardProposedTransaction( - response); + etl_.getSubscriptionManager().forwardProposedTransaction(response); } - */ } else { @@ -579,13 +580,15 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects) ETLLoadBalancer::ETLLoadBalancer( boost::json::array const& config, BackendInterface& backend, + ReportingETL& etl, NetworkValidatedLedgers& nwvl, boost::asio::io_context& ioContext) + : etl_(etl) { for (auto& entry : config) { std::unique_ptr source = std::make_unique( - entry.as_object(), backend, nwvl, ioContext); + entry.as_object(), backend, etl, nwvl, ioContext); sources_.push_back(std::move(source)); BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - " << sources_.back()->toString(); @@ -643,7 +646,6 @@ ETLLoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects) return {}; } -/* std::unique_ptr ETLLoadBalancer::getP2pForwardingStub() const { @@ -666,10 +668,10 @@ ETLLoadBalancer::getP2pForwardingStub() const return nullptr; } -Json::Value -ETLLoadBalancer::forwardToP2p(RPC::JsonContext& context) const +boost::json::object +ETLLoadBalancer::forwardToP2p(boost::json::object const& request) const { - Json::Value res; + boost::json::object res; if (sources_.size() == 0) return res; srand((unsigned)time(0)); @@ -677,8 +679,9 @@ ETLLoadBalancer::forwardToP2p(RPC::JsonContext& context) const auto numAttempts = 0; while (numAttempts < sources_.size()) { - res = sources_[sourceIdx]->forwardToP2p(context); - if (!res.isMember("forwarded") || res["forwarded"] != true) + res = sources_[sourceIdx]->forwardToP2p(request); + + if (!res.contains("forwarded") || res.at("forwarded") != true) { sourceIdx = (sourceIdx + 1) % sources_.size(); ++numAttempts; @@ -686,8 +689,7 @@ ETLLoadBalancer::forwardToP2p(RPC::JsonContext& context) const } return res; } - RPC::Status err = {rpcFAILED_TO_FORWARD}; - err.inject(res); + res["error"] = "Failed to forward"; return res; } @@ -712,13 +714,13 @@ ETLSource::getP2pForwardingStub() const } } -Json::Value -ETLSource::forwardToP2p(RPC::JsonContext& context) const +boost::json::object +ETLSource::forwardToP2p(boost::json::object const& request) const { BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. " - << "request = " << context.params.toStyledString(); + << "request = " << boost::json::serialize(request); - Json::Value response; + boost::json::object response; if (!connected_) { BOOST_LOG_TRIVIAL(error) @@ -728,9 +730,8 @@ ETLSource::forwardToP2p(RPC::JsonContext& context) const namespace beast = boost::beast; // from namespace http = beast::http; // from namespace websocket = beast::websocket; // from - namespace net = boost::asio; // from - using tcp = boost::asio::ip::tcp; // from - Json::Value& request = context.params; + namespace net = boost::asio; // from + using tcp = boost::asio::ip::tcp; // from try { // The io_context is required for all I/O @@ -753,42 +754,46 @@ ETLSource::forwardToP2p(RPC::JsonContext& context) const // and to tell rippled to charge the client IP for RPC // resources. See "secure_gateway" in // - https: // github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg + // https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg ws->set_option(websocket::stream_base::decorator( - [&context](websocket::request_type& req) { + [&request](websocket::request_type& req) { req.set( http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro"); req.set( http::field::forwarded, - "for=" + context.consumer.to_string()); + "for=" + boost::json::serialize(request)); })); BOOST_LOG_TRIVIAL(debug) - << "client ip: " << context.consumer.to_string(); + << "client ip: " << boost::json::serialize(request); BOOST_LOG_TRIVIAL(debug) << "Performing websocket handshake"; // Perform the websocket handshake ws->handshake(ip_, "/"); - Json::FastWriter fastWriter; - BOOST_LOG_TRIVIAL(debug) << "Sending request"; // Send the message - ws->write(net::buffer(fastWriter.write(request))); + ws->write(net::buffer(boost::json::serialize(request))); beast::flat_buffer buffer; ws->read(buffer); + + auto begin = static_cast(buffer.data().data()); + auto end = begin + buffer.data().size(); + auto parsed = + boost::json::parse(std::string(begin, end)); - Json::Reader reader; - if (!reader.parse( - static_cast(buffer.data().data()), response)) + if (!parsed.is_object()) { BOOST_LOG_TRIVIAL(error) << "Error parsing response"; - response[jss::error] = "Error parsing response from tx"; + response["error"] = "Error parsing response from tx"; + return response; } BOOST_LOG_TRIVIAL(debug) << "Successfully forward request"; + response = parsed.as_object(); + response["forwarded"] = true; return response; } @@ -798,7 +803,7 @@ ETLSource::forwardToP2p(RPC::JsonContext& context) const return response; } } -*/ + template bool ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence) diff --git a/reporting/ETLSource.h b/reporting/ETLSource.h index 2b942b10..859edb1b 100644 --- a/reporting/ETLSource.h +++ b/reporting/ETLSource.h @@ -31,6 +31,8 @@ #include #include +class ReportingETL; + /// This class manages a connection to a single ETL source. This is almost /// always a p2p node, but really could be another reporting node. This class /// subscribes to the ledgers and transactions_proposed streams of the @@ -38,6 +40,7 @@ /// class also has methods for extracting said ledgers. Lastly this class /// forwards transactions received on the transactions_proposed streams to any /// subscribers. + class ETLSource { std::string ip_; @@ -62,6 +65,8 @@ class ETLSource NetworkValidatedLedgers& networkValidatedLedgers_; + ReportingETL& etl_; + // beast::Journal journal_; mutable std::mutex mtx_; @@ -114,6 +119,7 @@ public: ETLSource( boost::json::object const& config, BackendInterface& backend, + ReportingETL& etl, NetworkValidatedLedgers& networkValidatedLedgers, boost::asio::io_context& ioContext); @@ -212,6 +218,12 @@ public: ", grpc port : " + grpcPort_ + " }"; } + boost::json::value + toJson() const + { + return boost::json::string(toString()); + } + /// Download a ledger in full /// @param ledgerSequence sequence of the ledger to download /// @param writeQueue queue to push downloaded ledger objects @@ -262,12 +274,14 @@ public: void close(bool startAgain); - /* /// Get grpc stub to forward requests to p2p node /// @return stub to send requests to ETL source std::unique_ptr getP2pForwardingStub() const; - */ + + boost::json::object + forwardToP2p(boost::json::object const& request) const; + }; /// This class is used to manage connections to transaction processing processes /// This class spawns a listener for each etl source, which listens to messages @@ -278,7 +292,7 @@ public: class ETLLoadBalancer { private: - // ReportingETL& etl_; + ReportingETL& etl_; std::vector> sources_; @@ -286,6 +300,7 @@ public: ETLLoadBalancer( boost::json::array const& config, BackendInterface& backend, + ReportingETL& etl, NetworkValidatedLedgers& nwvl, boost::asio::io_context& ioContext); @@ -321,47 +336,47 @@ public: /// to clients). /// @param in ETLSource in question /// @return true if messages should be forwarded - // bool - // shouldPropagateTxnStream(ETLSource* in) const - // { - // for (auto& src : sources_) - // { - // assert(src); - // // We pick the first ETLSource encountered that is connected - // if (src->isConnected()) - // { - // if (src.get() == in) - // return true; - // else - // return false; - // } - // } - // - // // If no sources connected, then this stream has not been - // forwarded. return true; - // } + bool + shouldPropagateTxnStream(ETLSource* in) const + { + for (auto& src : sources_) + { + assert(src); + // We pick the first ETLSource encountered that is connected + if (src->isConnected()) + { + if (src.get() == in) + return true; + else + return false; + } + } + + // If no sources connected, then this stream has not been forwarded + return true; + } - // Json::Value - // toJson() const - // { - // Json::Value ret(Json::arrayValue); - // for (auto& src : sources_) - // { - // ret.append(src->toJson()); - // } - // return ret; - // } - // - // /// Randomly select a p2p node to forward a gRPC request to - // /// @return gRPC stub to forward requests to p2p node - // std::unique_ptr - // getP2pForwardingStub() const; - // - // /// Forward a JSON RPC request to a randomly selected p2p node - // /// @param context context of the request - // /// @return response received from p2p node - // Json::Value - // forwardToP2p(RPC::JsonContext& context) const; + boost::json::value + toJson() const + { + boost::json::array ret; + for (auto& src : sources_) + { + ret.push_back(src->toJson()); + } + return ret; + } + + /// Randomly select a p2p node to forward a gRPC request to + /// @return gRPC stub to forward requests to p2p node + std::unique_ptr + getP2pForwardingStub() const; + + /// Forward a JSON RPC request to a randomly selected p2p node + /// @param request JSON-RPC request + /// @return response received from p2p node + boost::json::object + forwardToP2p(boost::json::object const& request) const; private: /// f is a function that takes an ETLSource as an argument and returns a diff --git a/reporting/P2pProxy.cpp b/reporting/P2pProxy.cpp index af3893cf..dc2f29bc 100644 --- a/reporting/P2pProxy.cpp +++ b/reporting/P2pProxy.cpp @@ -17,24 +17,21 @@ */ //============================================================================== -#include -#include -#include -#include +#include +#include namespace ripple { -Json::Value -forwardToP2p(RPC::JsonContext& context) +boost::json::object +forwardToP2p(boost::json::object const& request, ReportingETL& etl) { - return context.app.getReportingETL().getETLLoadBalancer().forwardToP2p( - context); + return etl.getETLLoadBalancer().forwardToP2p(request); } std::unique_ptr -getP2pForwardingStub(RPC::Context& context) +getP2pForwardingStub(ReportingETL& etl) { - return context.app.getReportingETL() + return etl .getETLLoadBalancer() .getP2pForwardingStub(); } @@ -42,42 +39,34 @@ getP2pForwardingStub(RPC::Context& context) // We only forward requests where ledger_index is "current" or "closed" // otherwise, attempt to handle here bool -shouldForwardToP2p(RPC::JsonContext& context) +shouldForwardToP2p(boost::json::object const& request) { - if (!context.app.config().reporting()) - return false; + std::string strCommand = request.contains("command") + ? request.at("command").as_string().c_str() + : request.at("method").as_string().c_str(); - Json::Value& params = context.params; - std::string strCommand = params.isMember(jss::command) - ? params[jss::command].asString() - : params[jss::method].asString(); + BOOST_LOG_TRIVIAL(info) << "COMMAND:" << strCommand; + BOOST_LOG_TRIVIAL(info) << "REQUEST:" << request; - JLOG(context.j.trace()) << "COMMAND:" << strCommand; - JLOG(context.j.trace()) << "REQUEST:" << params; - auto handler = RPC::getHandler(context.apiVersion, strCommand); + auto handler = forwardCommands.find(strCommand) != forwardCommands.end(); if (!handler) { - JLOG(context.j.error()) + BOOST_LOG_TRIVIAL(error) << "Error getting handler. command = " << strCommand; return false; } - if (handler->condition_ == RPC::NEEDS_CURRENT_LEDGER || - handler->condition_ == RPC::NEEDS_CLOSED_LEDGER) + if (request.contains("ledger_index")) { - return true; - } - - if (params.isMember(jss::ledger_index)) - { - auto indexValue = params[jss::ledger_index]; - if (!indexValue.isNumeric()) + auto indexValue = request.at("ledger_index"); + if (!indexValue.is_uint64()) { - auto index = indexValue.asString(); + std::string index = indexValue.as_string().c_str(); return index == "current" || index == "closed"; } } - return false; + + return true; } } // namespace ripple diff --git a/reporting/P2pProxy.h b/reporting/P2pProxy.h index a3984018..ba782572 100644 --- a/reporting/P2pProxy.h +++ b/reporting/P2pProxy.h @@ -20,10 +20,6 @@ #ifndef RIPPLE_APP_REPORTING_P2PPROXY_H_INCLUDED #define RIPPLE_APP_REPORTING_P2PPROXY_H_INCLUDED -#include -#include -#include - #include #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 3cdc61c9..d11721bb 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -762,6 +762,7 @@ ReportingETL::ReportingETL( , loadBalancer_( config.at("etl_sources").as_array(), *flatMapBackend_, + *this, networkValidatedLedgers_, ioc) { diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index 28e5db6f..f64359b5 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -26,7 +26,6 @@ #include #include #include -#include #include #include #include diff --git a/reporting/server/SubscriptionManager.cpp b/reporting/server/SubscriptionManager.cpp index 0dd30054..76ca5272 100644 --- a/reporting/server/SubscriptionManager.cpp +++ b/reporting/server/SubscriptionManager.cpp @@ -88,3 +88,46 @@ SubscriptionManager::pubTransaction( for (auto const& session: accountSubscribers_[account]) session->send(boost::json::serialize(pubMsg)); } + + +void +SubscriptionManager::forwardProposedTransaction(boost::json::object const& response) +{ + for (auto const& session : streamSubscribers_[TransactionsProposed]) + session->send(boost::json::serialize(response)); + + auto transaction = response.at("transaction").as_object(); + auto accounts = getAccountsFromTransaction(transaction); + + for (ripple::AccountID const& account : accounts) + for (auto const& session: accountProposedSubscribers_[account]) + session->send(boost::json::serialize(response)); +} + +void +SubscriptionManager::subProposedAccount( + ripple::AccountID const& account, + std::shared_ptr& session) +{ + accountProposedSubscribers_[account].emplace(std::move(session)); +} + +void +SubscriptionManager::unsubProposedAccount( + ripple::AccountID const& account, + std::shared_ptr& session) +{ + accountProposedSubscribers_[account].erase(session); +} + +void +SubscriptionManager::subProposedTransactions(std::shared_ptr& session) +{ + streamSubscribers_[TransactionsProposed].emplace(std::move(session)); +} + +void +SubscriptionManager::unsubProposedTransactions(std::shared_ptr& session) +{ + streamSubscribers_[TransactionsProposed].erase(session); +} \ No newline at end of file diff --git a/reporting/server/SubscriptionManager.h b/reporting/server/SubscriptionManager.h index 06cfc9cf..86479c06 100644 --- a/reporting/server/SubscriptionManager.h +++ b/reporting/server/SubscriptionManager.h @@ -34,12 +34,14 @@ class SubscriptionManager enum SubscriptionType { Ledgers, Transactions, + TransactionsProposed, finalEntry }; std::array streamSubscribers_; std::unordered_map accountSubscribers_; + std::unordered_map accountProposedSubscribers_; public: @@ -70,6 +72,21 @@ public: void unsubAccount(ripple::AccountID const& account, std::shared_ptr& session); + + void + forwardProposedTransaction(boost::json::object const& response); + + void + subProposedAccount(ripple::AccountID const& account, std::shared_ptr& session); + + void + unsubProposedAccount(ripple::AccountID const& account, std::shared_ptr& session); + + void + subProposedTransactions(std::shared_ptr& session); + + void + unsubProposedTransactions(std::shared_ptr& session); }; #endif //SUBSCRIPTION_MANAGER_H \ No newline at end of file diff --git a/reporting/server/listener.h b/reporting/server/listener.h index 35624232..59b4ff48 100644 --- a/reporting/server/listener.h +++ b/reporting/server/listener.h @@ -36,19 +36,16 @@ class listener : public std::enable_shared_from_this { boost::asio::io_context& ioc_; boost::asio::ip::tcp::acceptor acceptor_; - BackendInterface const& backend_; - SubscriptionManager& subscriptions_; + ReportingETL& etl_; public: listener( boost::asio::io_context& ioc, boost::asio::ip::tcp::endpoint endpoint, - SubscriptionManager& subs, - BackendInterface const& backend) + ReportingETL& etl) : ioc_(ioc) , acceptor_(ioc) - , backend_(backend) - , subscriptions_(subs) + , etl_(etl) { boost::beast::error_code ec; @@ -113,7 +110,7 @@ private: else { // Create the session and run it - std::make_shared(std::move(socket), subscriptions_, backend_)->run(); + std::make_shared(std::move(socket), etl_)->run(); } // Accept another connection diff --git a/reporting/server/session.cpp b/reporting/server/session.cpp index 53672dd7..10c71c2c 100644 --- a/reporting/server/session.cpp +++ b/reporting/server/session.cpp @@ -16,6 +16,10 @@ buildResponse( std::string command = request.at("command").as_string().c_str(); BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request; boost::json::object response; + + if (forwardCommands.find(command) != forwardCommands.end()) + return etl.getETLLoadBalancer().forwardToP2p(request); + switch (commandMap[command]) { case tx: @@ -53,7 +57,7 @@ buildResponse( case unsubscribe: return doUnsubscribe(request, session, manager); default: - BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command; + response["error"] = "Unknown command: " + command; + return response; } - return response; } \ No newline at end of file diff --git a/reporting/server/session.h b/reporting/server/session.h index 9947d2e6..5c70c529 100644 --- a/reporting/server/session.h +++ b/reporting/server/session.h @@ -71,6 +71,11 @@ static std::unordered_map commandMap{ {"subscribe", subscribe}, {"unsubscribe", unsubscribe}}; +static std::unordered_set forwardCommands{ + "submit", + "fee" +}; + boost::json::object doTx( boost::json::object const& request, @@ -159,18 +164,15 @@ class session : public std::enable_shared_from_this boost::beast::websocket::stream ws_; boost::beast::flat_buffer buffer_; std::string response_; - BackendInterface const& backend_; - SubscriptionManager& subscriptions_; + ReportingETL& etl_; public: // Take ownership of the socket explicit session( boost::asio::ip::tcp::socket&& socket, - SubscriptionManager& subs, - BackendInterface const& backend) + ReportingETL& etl) : ws_(std::move(socket)) - , subscriptions_(subs) - , backend_(backend) + , etl_(etl) { } @@ -253,8 +255,7 @@ public: { response = buildResponse( request, - backend_, - subscriptions_, + etl_, shared_from_this()); } catch (Backend::DatabaseTimeout const& t) diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp index 1b98cc23..f17add28 100644 --- a/websocket_server_async.cpp +++ b/websocket_server_async.cpp @@ -41,9 +41,11 @@ parse_config(const char* filename) { try { + std::cout << "TRYING" << std::endl; std::ifstream in(filename, std::ios::in | std::ios::binary); if (in) { + std::cout << "GOT IN" << std::endl; std::stringstream contents; contents << in.rdbuf(); in.close(); @@ -134,8 +136,7 @@ main(int argc, char* argv[]) std::make_shared( ioc, boost::asio::ip::tcp::endpoint{address, port}, - etl.getSubscriptionManager(), - etl.getFlatMapBackend()) + etl) ->run(); // Run the I/O service on the requested number of threads