diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d49867c3..eb33ca589 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,7 +75,7 @@ target_sources(reporting PRIVATE backend/DBHelpers.cpp etl/ETLSource.cpp etl/ReportingETL.cpp - server/session.cpp + server/Handlers.cpp server/SubscriptionManager.cpp handlers/AccountInfo.cpp handlers/Tx.cpp diff --git a/backend/CassandraBackend.cpp b/backend/CassandraBackend.cpp index 30f20a88f..b6bf4ef25 100644 --- a/backend/CassandraBackend.cpp +++ b/backend/CassandraBackend.cpp @@ -1339,7 +1339,7 @@ CassandraBackend::open(bool readOnly) << std::to_string(rf) << "'} AND durable_writes = true"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "USE " << keyspace; if (!executeSimpleStatement(query.str())) continue; @@ -1578,7 +1578,8 @@ CassandraBackend::open(bool readOnly) "(?,null)"; if (!updateLedgerRange_.prepareStatement(query, session_.get())) continue; - query = {}; + + query.str(""); query << " update " << tablePrefix << "ledger_range" << " set sequence = ? where is_latest = false"; if (!deleteLedgerRange_.prepareStatement(query, session_.get())) diff --git a/etl/ReportingETL.cpp b/etl/ReportingETL.cpp index 96892af74..87ab9cb48 100644 --- a/etl/ReportingETL.cpp +++ b/etl/ReportingETL.cpp @@ -26,7 +26,7 @@ #include #include #include -#include +#include #include #include #include @@ -245,6 +245,8 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) auto lgr = backend_->fetchLedgerBySequence(ledgerSequence); assert(lgr); publishLedger(*lgr); + + return true; } } catch (Backend::DatabaseTimeout const& e) @@ -252,22 +254,6 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) continue; } - // publishStrand_.post([this, &ledger, &fees]() { - // subs_->pubLedger(*ledger, *fees); - // setLastPublish(); - // BOOST_LOG_TRIVIAL(info) - // << __func__ << " : " - // << "Published ledger. " << ledger->seq; - // }); -<<<<<<< HEAD:etl/ReportingETL.cpp - - publishLedger(ledger); -======= - - // publishLedger(ledger); ->>>>>>> f27312a (impliment flex websocket server):reporting/ReportingETL.cpp - - return true; } return false; } diff --git a/handlers/RPCHelpers.cpp b/handlers/RPCHelpers.cpp index 5ead801ff..96c99da57 100644 --- a/handlers/RPCHelpers.cpp +++ b/handlers/RPCHelpers.cpp @@ -92,7 +92,6 @@ getJson(Json::Value const& value) } boost::json::object -<<<<<<< HEAD toJson(ripple::TxMeta const& meta) { auto start = std::chrono::system_clock::now(); @@ -107,9 +106,6 @@ toJson(ripple::TxMeta const& meta) boost::json::object toJson(ripple::SLE const& sle) -======= -getJson(ripple::SLE const& sle) ->>>>>>> 03a0315 (compiles) { auto start = std::chrono::system_clock::now(); boost::json::value value = boost::json::parse( diff --git a/handlers/Subscribe.cpp b/handlers/Subscribe.cpp index c6de3fb89..33141c542 100644 --- a/handlers/Subscribe.cpp +++ b/handlers/Subscribe.cpp @@ -1,6 +1,7 @@ #include #include -#include +#include +#include static std::unordered_set validStreams{ "ledger", @@ -156,7 +157,7 @@ unsubscribeToAccounts( void subscribeToAccountsProposed( boost::json::object const& request, - std::shared_ptr& session, + std::shared_ptr& session, SubscriptionManager& manager) { boost::json::array const& accounts = @@ -181,7 +182,7 @@ subscribeToAccountsProposed( void unsubscribeToAccountsProposed( boost::json::object const& request, - std::shared_ptr& session, + std::shared_ptr& session, SubscriptionManager& manager) { boost::json::array const& accounts = diff --git a/reporting/server/Handlers.cpp b/reporting/server/Handlers.cpp deleted file mode 100644 index 010145830..000000000 --- a/reporting/server/Handlers.cpp +++ /dev/null @@ -1,55 +0,0 @@ -#include - -extern boost::json::object -buildResponse( - boost::json::object const& request, - ReportingETL& etl, - std::shared_ptr session) -{ - std::string command = request.at("command").as_string().c_str(); - BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request; - boost::json::object response; - - BackendInterface& backend = etl.getFlatMapBackend(); - SubscriptionManager& manager = etl.getSubscriptionManager(); - switch (commandMap[command]) - { - case tx: - return doTx(request, backend); - case account_tx: - return doAccountTx(request, backend); - case ledger: - return doLedger(request, backend); - case ledger_entry: - return doLedgerEntry(request, backend); - case ledger_range: - return doLedgerRange(request, backend); - case ledger_data: - return doLedgerData(request, backend); - case account_info: - return doAccountInfo(request, backend); - case book_offers: - return doBookOffers(request, backend); - case account_channels: - return doAccountChannels(request, backend); - case account_lines: - return doAccountLines(request, backend); - case account_currencies: - return doAccountCurrencies(request, backend); - case account_offers: - return doAccountOffers(request, backend); - case account_objects: - return doAccountObjects(request, backend); - case channel_authorize: - return doChannelAuthorize(request); - case channel_verify: - return doChannelVerify(request); - case subscribe: - return doSubscribe(request, session, manager); - case unsubscribe: - return doUnsubscribe(request, session, manager); - default: - BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command; - } - return response; -} \ No newline at end of file diff --git a/reporting/server/Listener.h b/reporting/server/Listener.h deleted file mode 100644 index 9f9d0ce83..000000000 --- a/reporting/server/Listener.h +++ /dev/null @@ -1,194 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2020 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef LISTENER_H -#define LISTENER_H - -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -class SubscriptionManager; - -template -class Detector : public std::enable_shared_from_this> -{ - using std::enable_shared_from_this>::shared_from_this; - - boost::beast::tcp_stream stream_; - ssl::context& ctx_; - ReportingETL& etl_; - boost::beast::flat_buffer buffer_; - -public: - Detector( - tcp::socket&& socket, - ssl::context& ctx, - ReportingETL& etl) - : stream_(std::move(socket)) - , ctx_(ctx) - , etl_(etl) - { - } - - // Launch the detector - void - run() - { - // Set the timeout. - boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30)); - // Detect a TLS handshake - async_detect_ssl( - stream_, - buffer_, - boost::beast::bind_front_handler( - &Detector::on_detect, - shared_from_this())); - } - - void - on_detect(boost::beast::error_code ec, bool result) - { - if(ec) - return httpFail(ec, "detect"); - - if(result) - { - // Launch SSL session - std::make_shared( - stream_.release_socket(), - ctx_, - etl_, - std::move(buffer_))->run(); - return; - } - - // Launch plain session - std::make_shared( - stream_.release_socket(), - etl_, - std::move(buffer_))->run(); - } -}; - -template -class Listener : public std::enable_shared_from_this> -{ - using std::enable_shared_from_this>::shared_from_this; - - net::io_context& ioc_; - ssl::context& ctx_; - tcp::acceptor acceptor_; - ReportingETL& etl_; - -public: - Listener( - net::io_context& ioc, - ssl::context& ctx, - tcp::endpoint endpoint, - ReportingETL& etl) - : ioc_(ioc) - , ctx_(ctx) - , acceptor_(net::make_strand(ioc)) - , etl_(etl) - { - boost::beast::error_code ec; - - // Open the acceptor - acceptor_.open(endpoint.protocol(), ec); - if(ec) - { - httpFail(ec, "open"); - return; - } - - // Allow address reuse - acceptor_.set_option(net::socket_base::reuse_address(true), ec); - if(ec) - { - httpFail(ec, "set_option"); - return; - } - - // Bind to the server address - acceptor_.bind(endpoint, ec); - if(ec) - { - httpFail(ec, "bind"); - return; - } - - // Start listening for connections - acceptor_.listen( - net::socket_base::max_listen_connections, ec); - if(ec) - { - httpFail(ec, "listen"); - return; - } - } - - // Start accepting incoming connections - void - run() - { - do_accept(); - } - -private: - void - do_accept() - { - // The new connection gets its own strand - acceptor_.async_accept( - net::make_strand(ioc_), - boost::beast::bind_front_handler( - &Listener::on_accept, - shared_from_this())); - } - - void - on_accept(boost::beast::error_code ec, tcp::socket socket) - { - if(ec) - { - httpFail(ec, "listener_accept"); - } - else - { - // Create the detector session and run it - std::make_shared>( - std::move(socket), - ctx_, - etl_)->run(); - } - - // Accept another connection - do_accept(); - } -}; - -#endif // LISTENER_H \ No newline at end of file diff --git a/reporting/server/SubscriptionManager.cpp b/reporting/server/SubscriptionManager.cpp deleted file mode 100644 index ecdd7bdcc..000000000 --- a/reporting/server/SubscriptionManager.cpp +++ /dev/null @@ -1,91 +0,0 @@ -#include -#include -#include - -void -SubscriptionManager::subLedger(std::shared_ptr& session) -{ - streamSubscribers_[Ledgers].emplace(std::move(session)); -} - -void -SubscriptionManager::unsubLedger(std::shared_ptr& session) -{ - streamSubscribers_[Ledgers].erase(session); -} - -void -SubscriptionManager::pubLedger( - ripple::LedgerInfo const& lgrInfo, - ripple::Fees const& fees, - std::string const& ledgerRange, - std::uint32_t txnCount) -{ - boost::json::object pubMsg; - - pubMsg["type"] = "ledgerClosed"; - pubMsg["ledger_index"] = lgrInfo.seq; - pubMsg["ledger_hash"] = to_string(lgrInfo.hash); - pubMsg["ledger_time"] = lgrInfo.closeTime.time_since_epoch().count(); - - pubMsg["fee_ref"] = getJson(fees.units.jsonClipped()); - pubMsg["fee_base"] = getJson(fees.base.jsonClipped()); - pubMsg["reserve_base"] = getJson(fees.accountReserve(0).jsonClipped()); - pubMsg["reserve_inc"] = getJson(fees.increment.jsonClipped()); - - pubMsg["validated_ledgers"] = ledgerRange; - pubMsg["txn_count"] = txnCount; - - for (auto const& session: streamSubscribers_[Ledgers]) - session->send(boost::json::serialize(pubMsg)); -} - -void -SubscriptionManager::subTransactions(std::shared_ptr& session) -{ - streamSubscribers_[Transactions].emplace(std::move(session)); -} - -void -SubscriptionManager::unsubTransactions(std::shared_ptr& session) -{ - streamSubscribers_[Transactions].erase(session); -} - -void -SubscriptionManager::subAccount( - ripple::AccountID const& account, - std::shared_ptr& session) -{ - accountSubscribers_[account].emplace(std::move(session)); -} - -void -SubscriptionManager::unsubAccount( - ripple::AccountID const& account, - std::shared_ptr& session) -{ - accountSubscribers_[account].erase(session); -} - -void -SubscriptionManager::pubTransaction( - Backend::TransactionAndMetadata const& blob, - std::uint32_t seq) -{ - auto [tx, meta] = deserializeTxPlusMeta(blob, seq); - - boost::json::object pubMsg; - pubMsg["transaction"] = getJson(*tx); - pubMsg["meta"] = getJson(*meta); - - for (auto const& session: streamSubscribers_[Transactions]) - session->send(boost::json::serialize(pubMsg)); - - auto journal = ripple::debugLog(); - auto accounts = meta->getAffectedAccounts(journal); - - for (ripple::AccountID const& account : accounts) - for (auto const& session: accountSubscribers_[account]) - session->send(boost::json::serialize(pubMsg)); -} diff --git a/server/session.cpp b/server/Handlers.cpp similarity index 96% rename from server/session.cpp rename to server/Handlers.cpp index 97dd0206e..9b6e3ab78 100644 --- a/server/session.cpp +++ b/server/Handlers.cpp @@ -1,10 +1,4 @@ -#include - -void -fail(boost::beast::error_code ec, char const* what) -{ - std::cerr << what << ": " << ec.message() << "\n"; -} +#include bool shouldForwardToRippled(boost::json::object const& request) @@ -35,13 +29,14 @@ shouldForwardToRippled(boost::json::object const& request) return false; } + std::pair buildResponse( boost::json::object const& request, std::shared_ptr backend, std::shared_ptr manager, std::shared_ptr balancer, - std::shared_ptr session) + std::shared_ptr session) { std::string command = request.at("command").as_string().c_str(); BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request; diff --git a/reporting/server/Handlers.h b/server/Handlers.h similarity index 84% rename from reporting/server/Handlers.h rename to server/Handlers.h index 8da8f8198..99522d9cf 100644 --- a/reporting/server/Handlers.h +++ b/server/Handlers.h @@ -7,8 +7,8 @@ #include #include -#include -#include +#include +#include #include #include @@ -21,6 +21,16 @@ class SubscriptionManager; class WsSession; //------------------------------------------------------------------------------ + +static std::unordered_set forwardCommands{ + "submit", + "submit_multisigned", + "fee", + "path_find", + "ripple_path_find", + "manifest" +}; + enum RPCCommand { tx, account_tx, @@ -38,7 +48,8 @@ enum RPCCommand { channel_authorize, channel_verify, subscribe, - unsubscribe + unsubscribe, + server_info }; static std::unordered_map commandMap{ @@ -58,7 +69,8 @@ static std::unordered_map commandMap{ {"channel_authorize", channel_authorize}, {"channel_verify", channel_verify}, {"subscribe", subscribe}, - {"unsubscribe", unsubscribe}}; + {"unsubscribe", unsubscribe}, + {"server_info", server_info}}; boost::json::object doTx( @@ -121,6 +133,11 @@ doChannelAuthorize(boost::json::object const& request); boost::json::object doChannelVerify(boost::json::object const& request); +boost::json::object +doServerInfo( + boost::json::object const& request, + BackendInterface const& backend); + boost::json::object doSubscribe( boost::json::object const& request, @@ -132,10 +149,12 @@ doUnsubscribe( std::shared_ptr& session, SubscriptionManager& manager); -extern boost::json::object +std::pair buildResponse( boost::json::object const& request, - ReportingETL& etl, + std::shared_ptr backend, + std::shared_ptr manager, + std::shared_ptr balancer, std::shared_ptr session); #endif // RIPPLE_REPORTING_HANDLERS_H diff --git a/reporting/server/HttpBase.h b/server/HttpBase.h similarity index 90% rename from reporting/server/HttpBase.h rename to server/HttpBase.h index 89403ed14..9864a7280 100644 --- a/reporting/server/HttpBase.h +++ b/server/HttpBase.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -35,7 +36,8 @@ #include #include -#include +#include +#include #include namespace http = boost::beast::http; @@ -109,7 +111,9 @@ void handle_request( boost::beast::http::request>&& req, Send&& send, - ReportingETL& etl) + std::shared_ptr backend, + std::shared_ptr balancer, + DOSGuard& dosGuard) { auto const response = [&req]( @@ -181,7 +185,12 @@ handle_request( std::cout << "Transfromed to ws style stuff" << std::endl; - auto builtResponse = buildResponse(wsStyleRequest, etl, nullptr); + auto [builtResponse, cost] = buildResponse( + wsStyleRequest, + backend, + nullptr, + balancer, + nullptr); send(response( http::status::ok, @@ -252,15 +261,23 @@ class HttpBase http::request req_; std::shared_ptr res_; - ReportingETL& etl_; + std::shared_ptr backend_; + std::shared_ptr balancer_; + DOSGuard& dosGuard_; send_lambda lambda_; protected: boost::beast::flat_buffer buffer_; public: - HttpBase(ReportingETL& etl, boost::beast::flat_buffer buffer) - : etl_(etl) + HttpBase( + std::shared_ptr backend, + std::shared_ptr balancer, + DOSGuard& dosGuard, + boost::beast::flat_buffer buffer) + : backend_(backend) + , balancer_(balancer) + , dosGuard_(dosGuard) , lambda_(*this) , buffer_(std::move(buffer)) {} @@ -299,8 +316,10 @@ public: if(ec) return httpFail(ec, "read"); + auto ip = derived().ip(); + // Send the response - handle_request(std::move(req_), lambda_, etl_); + handle_request(std::move(req_), lambda_, backend_, balancer_, dosGuard_); } void diff --git a/reporting/server/HttpSession.h b/server/HttpSession.h similarity index 84% rename from reporting/server/HttpSession.h rename to server/HttpSession.h index 315aef797..929b57cbb 100644 --- a/reporting/server/HttpSession.h +++ b/server/HttpSession.h @@ -20,7 +20,7 @@ #ifndef RIPPLE_REPORTING_HTTP_SESSION_H #define RIPPLE_REPORTING_HTTP_SESSION_H -#include +#include namespace http = boost::beast::http; namespace net = boost::asio; @@ -38,9 +38,12 @@ public: explicit HttpSession( tcp::socket&& socket, - ReportingETL& etl, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + DOSGuard& dosGuard, boost::beast::flat_buffer buffer) - : HttpBase(etl, std::move(buffer)) + : HttpBase(backend, balancer, dosGuard, std::move(buffer)) , stream_(std::move(socket)) {} @@ -50,6 +53,12 @@ public: return stream_; } + std::string + ip() + { + return stream_.socket().remote_endpoint().address().to_string(); + } + // Start the asynchronous operation void run() diff --git a/reporting/server/WsSession.h b/server/PlainWsSession.h similarity index 90% rename from reporting/server/WsSession.h rename to server/PlainWsSession.h index 0a7d6cd66..5b1200368 100644 --- a/reporting/server/WsSession.h +++ b/server/PlainWsSession.h @@ -26,9 +26,9 @@ #include #include -#include -#include -#include +#include +#include +#include #include @@ -69,42 +69,6 @@ public: { } - static void - make_session( - boost::asio::ip::tcp::socket&& socket, - std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr balancer, - DOSGuard& dosGuard) - { - std::make_shared( - std::move(socket), backend, subscriptions, balancer, dosGuard) - ->run(); - } - - void - send(std::string&& msg) - { - ws_.text(ws_.got_text()); - ws_.async_write( - boost::asio::buffer(msg), - boost::beast::bind_front_handler( - &session::on_write, shared_from_this())); - } - - void - close(boost::beast::websocket::close_reason const& cr) - { - boost::beast::error_code ec; - - ws_.close(cr, ec); - - if (ec) - return fail(ec, "close"); - } - -private: - // Get on the correct executor void send(std::string&& msg) { @@ -141,6 +105,8 @@ private: &WsSession::on_accept, shared_from_this())); } + +private: void on_accept(boost::beast::error_code ec) @@ -169,7 +135,6 @@ private: void on_read(boost::beast::error_code ec, std::size_t bytes_transferred) { - std::cout << "readed WS" << std::endl; boost::ignore_unused(bytes_transferred); // This indicates that the session was closed @@ -271,15 +236,23 @@ class WsUpgrader : public std::enable_shared_from_this boost::beast::tcp_stream http_; boost::optional> parser_; boost::beast::flat_buffer buffer_; - ReportingETL& etl_; - + std::shared_ptr backend_; + std::shared_ptr subscriptions_; + std::shared_ptr balancer_; + DOSGuard& dosGuard_; public: WsUpgrader( boost::asio::ip::tcp::socket&& socket, - ReportingETL& etl, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + DOSGuard& dosGuard, boost::beast::flat_buffer&& b) : http_(std::move(socket)) - , etl_(etl) + , backend_(backend) + , subscriptions_(subscriptions) + , balancer_(balancer) + , dosGuard_(dosGuard) , buffer_(std::move(b)) {} @@ -346,8 +319,10 @@ private: std::make_shared( http_.release_socket(), - etl_, - std::move(buffer_))->run(parser_->release()); + backend_, + subscriptions_, + balancer_, + dosGuard_)->run(parser_->release()); } }; diff --git a/reporting/server/SslHttpSession.h b/server/SslHttpSession.h similarity index 87% rename from reporting/server/SslHttpSession.h rename to server/SslHttpSession.h index 102a1aa73..f92fabf6b 100644 --- a/reporting/server/SslHttpSession.h +++ b/server/SslHttpSession.h @@ -20,7 +20,7 @@ #ifndef RIPPLE_REPORTING_HTTPS_SESSION_H #define RIPPLE_REPORTING_HTTPS_SESSION_H -#include +#include namespace http = boost::beast::http; namespace net = boost::asio; @@ -39,9 +39,12 @@ public: SslHttpSession( tcp::socket&& socket, ssl::context& ctx, - ReportingETL& etl, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + DOSGuard& dosGuard, boost::beast::flat_buffer buffer) - : HttpBase(etl, std::move(buffer)) + : HttpBase(backend, balancer, dosGuard, std::move(buffer)) , stream_(std::move(socket), ctx) {} @@ -51,6 +54,12 @@ public: return stream_; } + std::string + ip() + { + return stream_.next_layer().socket().remote_endpoint().address().to_string(); + } + // Start the asynchronous operation void run() diff --git a/reporting/server/SslWsSession.h b/server/SslWsSession.h similarity index 69% rename from reporting/server/SslWsSession.h rename to server/SslWsSession.h index a56267676..74e03df11 100644 --- a/reporting/server/SslWsSession.h +++ b/server/SslWsSession.h @@ -26,14 +26,15 @@ #include #include -#include -#include +#include +#include -#include +#include namespace http = boost::beast::http; namespace net = boost::asio; -namespace ssl = boost::asio::ssl; +namespace ssl = boost::asio::ssl; +namespace websocket = boost::beast::websocket; using tcp = boost::asio::ip::tcp; class ReportingETL; @@ -46,17 +47,26 @@ class SslWsSession : public WsBase std::string response_; boost::beast::flat_buffer buffer_; http::request_parser parser_; - ReportingETL& etl_; + std::shared_ptr backend_; + std::weak_ptr subscriptions_; + std::shared_ptr balancer_; + DOSGuard& dosGuard_; public: // Take ownership of the socket explicit SslWsSession( boost::beast::ssl_stream&& stream, - ReportingETL& etl, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + DOSGuard& dosGuard, boost::beast::flat_buffer b) : WsBase() , ws_(std::move(stream)) - , etl_(etl) + , backend_(backend) + , subscriptions_(subscriptions) + , balancer_(balancer) + , dosGuard_(dosGuard) { } @@ -128,34 +138,64 @@ public: if (ec) wsFail(ec, "read"); + std::string msg{ static_cast(buffer_.data().data()), buffer_.size()}; // BOOST_LOG_TRIVIAL(debug) << __func__ << msg; boost::json::object response; - try + auto ip = + ws_.next_layer().next_layer().socket().remote_endpoint().address().to_string(); + BOOST_LOG_TRIVIAL(debug) + << __func__ << " received request from ip = " << ip; + if (!dosGuard_.isOk(ip)) + response["error"] = "Too many requests. Slow down"; + else { - boost::json::value raw = boost::json::parse(msg); - boost::json::object request = raw.as_object(); - BOOST_LOG_TRIVIAL(debug) << " received request : " << request; try { - response = buildResponse( - request, - etl_, - shared_from_this()); + boost::json::value raw = boost::json::parse(msg); + boost::json::object request = raw.as_object(); + BOOST_LOG_TRIVIAL(debug) << " received request : " << request; + try + { + std::shared_ptr subPtr = + subscriptions_.lock(); + if (!subPtr) + return; + + auto [res, cost] = buildResponse( + request, + backend_, + subPtr, + balancer_, + shared_from_this()); + auto start = std::chrono::system_clock::now(); + response = std::move(res); + if (!dosGuard_.add(ip, cost)) + { + response["warning"] = "Too many requests"; + } + + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ << " RPC call took " + << ((end - start).count() / 1000000000.0) + << " . request = " << request; + } + catch (Backend::DatabaseTimeout const& t) + { + BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout"; + response["error"] = + "Database read timeout. Please retry the request"; + } } - catch (Backend::DatabaseTimeout const& t) + catch (std::exception const& e) { - BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout"; - response["error"] = - "Database read timeout. Please retry the request"; + BOOST_LOG_TRIVIAL(error) + << __func__ << "caught exception : " << e.what(); + response["error"] = "Unknown exception"; } } - catch (std::exception const& e) - { - BOOST_LOG_TRIVIAL(error) - << __func__ << "caught exception : " << e.what(); - } BOOST_LOG_TRIVIAL(trace) << __func__ << response; response_ = boost::json::serialize(response); @@ -189,17 +229,25 @@ class SslWsUpgrader : public std::enable_shared_from_this boost::optional> parser_; boost::beast::flat_buffer buffer_; ssl::context& ctx_; - ReportingETL& etl_; - + std::shared_ptr backend_; + std::shared_ptr subscriptions_; + std::shared_ptr balancer_; + DOSGuard& dosGuard_; public: SslWsUpgrader( boost::asio::ip::tcp::socket&& socket, ssl::context& ctx, - ReportingETL& etl, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + DOSGuard& dosGuard, boost::beast::flat_buffer&& b) : https_(std::move(socket), ctx) , ctx_(ctx) - , etl_(etl) + , backend_(backend) + , subscriptions_(subscriptions) + , balancer_(balancer) + , dosGuard_(dosGuard) , buffer_(std::move(b)) {} @@ -281,7 +329,10 @@ private: std::make_shared( std::move(https_), - etl_, + backend_, + subscriptions_, + balancer_, + dosGuard_, std::move(buffer_))->run(parser_->release()); } }; diff --git a/server/SubscriptionManager.cpp b/server/SubscriptionManager.cpp index 03517cfc6..74b51bdf7 100644 --- a/server/SubscriptionManager.cpp +++ b/server/SubscriptionManager.cpp @@ -2,13 +2,13 @@ #include void -SubscriptionManager::subLedger(std::shared_ptr& session) +SubscriptionManager::subLedger(std::shared_ptr& session) { streamSubscribers_[Ledgers].emplace(std::move(session)); } void -SubscriptionManager::unsubLedger(std::shared_ptr& session) +SubscriptionManager::unsubLedger(std::shared_ptr& session) { streamSubscribers_[Ledgers].erase(session); } @@ -40,13 +40,13 @@ SubscriptionManager::pubLedger( } void -SubscriptionManager::subTransactions(std::shared_ptr& session) +SubscriptionManager::subTransactions(std::shared_ptr& session) { streamSubscribers_[Transactions].emplace(std::move(session)); } void -SubscriptionManager::unsubTransactions(std::shared_ptr& session) +SubscriptionManager::unsubTransactions(std::shared_ptr& session) { streamSubscribers_[Transactions].erase(session); } @@ -54,7 +54,7 @@ SubscriptionManager::unsubTransactions(std::shared_ptr& session) void SubscriptionManager::subAccount( ripple::AccountID const& account, - std::shared_ptr& session) + std::shared_ptr& session) { accountSubscribers_[account].emplace(std::move(session)); } @@ -62,7 +62,7 @@ SubscriptionManager::subAccount( void SubscriptionManager::unsubAccount( ripple::AccountID const& account, - std::shared_ptr& session) + std::shared_ptr& session) { accountSubscribers_[account].erase(session); } @@ -107,7 +107,7 @@ SubscriptionManager::forwardProposedTransaction( void SubscriptionManager::subProposedAccount( ripple::AccountID const& account, - std::shared_ptr& session) + std::shared_ptr& session) { accountProposedSubscribers_[account].emplace(std::move(session)); } @@ -115,20 +115,20 @@ SubscriptionManager::subProposedAccount( void SubscriptionManager::unsubProposedAccount( ripple::AccountID const& account, - std::shared_ptr& session) + std::shared_ptr& session) { accountProposedSubscribers_[account].erase(session); } void -SubscriptionManager::subProposedTransactions(std::shared_ptr& session) +SubscriptionManager::subProposedTransactions(std::shared_ptr& session) { streamSubscribers_[TransactionsProposed].emplace(std::move(session)); } void SubscriptionManager::unsubProposedTransactions( - std::shared_ptr& session) + std::shared_ptr& session) { streamSubscribers_[TransactionsProposed].erase(session); } diff --git a/server/SubscriptionManager.h b/server/SubscriptionManager.h index 17491bd53..0e1c61cee 100644 --- a/server/SubscriptionManager.h +++ b/server/SubscriptionManager.h @@ -20,11 +20,11 @@ #ifndef SUBSCRIPTION_MANAGER_H #define SUBSCRIPTION_MANAGER_H -#include +#include #include -#include -#include +#include +#include class SubscriptionManager { @@ -77,12 +77,12 @@ public: void subAccount( ripple::AccountID const& account, - std::shared_ptr& session); + std::shared_ptr& session); void unsubAccount( ripple::AccountID const& account, - std::shared_ptr& session); + std::shared_ptr& session); void forwardProposedTransaction(boost::json::object const& response); @@ -90,18 +90,18 @@ public: void subProposedAccount( ripple::AccountID const& account, - std::shared_ptr& session); + std::shared_ptr& session); void unsubProposedAccount( ripple::AccountID const& account, - std::shared_ptr& session); + std::shared_ptr& session); void - subProposedTransactions(std::shared_ptr& session); + subProposedTransactions(std::shared_ptr& session); void - unsubProposedTransactions(std::shared_ptr& session); + unsubProposedTransactions(std::shared_ptr& session); }; #endif // SUBSCRIPTION_MANAGER_H diff --git a/reporting/server/WsBase.h b/server/WsBase.h similarity index 100% rename from reporting/server/WsBase.h rename to server/WsBase.h diff --git a/server/listener.h b/server/listener.h index ba72a019a..45480a3bc 100644 --- a/server/listener.h +++ b/server/listener.h @@ -23,11 +23,11 @@ #include #include #include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include #include @@ -40,17 +40,26 @@ class Detector : public std::enable_shared_from_this backend_; + std::shared_ptr subscriptions_; + std::shared_ptr balancer_; + DOSGuard& dosGuard_; boost::beast::flat_buffer buffer_; public: Detector( tcp::socket&& socket, ssl::context& ctx, - ReportingETL& etl) + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + DOSGuard& dosGuard) : stream_(std::move(socket)) , ctx_(ctx) - , etl_(etl) + , backend_(backend) + , subscriptions_(subscriptions) + , balancer_(balancer) + , dosGuard_(dosGuard) { } @@ -81,7 +90,10 @@ public: std::make_shared( stream_.release_socket(), ctx_, - etl_, + backend_, + subscriptions_, + balancer_, + dosGuard_, std::move(buffer_))->run(); return; } @@ -89,7 +101,10 @@ public: // Launch plain session std::make_shared( stream_.release_socket(), - etl_, + backend_, + subscriptions_, + balancer_, + dosGuard_, std::move(buffer_))->run(); } }; @@ -102,18 +117,27 @@ class Listener : public std::enable_shared_from_this backend_; + std::shared_ptr subscriptions_; + std::shared_ptr balancer_; + DOSGuard& dosGuard_; public: Listener( net::io_context& ioc, ssl::context& ctx, tcp::endpoint endpoint, - ReportingETL& etl) + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + DOSGuard& dosGuard) : ioc_(ioc) , ctx_(ctx) , acceptor_(net::make_strand(ioc)) - , etl_(etl) + , backend_(backend) + , subscriptions_(subscriptions) + , balancer_(balancer) + , dosGuard_(dosGuard) { boost::beast::error_code ec; @@ -151,15 +175,14 @@ public: } } - ~listener() = default; - -private: + // Start accepting incoming connections void run() { do_accept(); } +private: void do_accept() { @@ -184,7 +207,10 @@ private: std::make_shared>( std::move(socket), ctx_, - etl_)->run(); + backend_, + subscriptions_, + balancer_, + dosGuard_)->run(); } // Accept another connection @@ -192,4 +218,76 @@ private: } }; -#endif // LISTENER_H +namespace Server +{ + using WebsocketServer = Listener; + using HttpServer = Listener; + + static std::shared_ptr + make_WebSocketServer( + boost::json::object const& config, + boost::asio::io_context& ioc, + ssl::context& ctx, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + DOSGuard& dosGuard) + { + if (!config.contains("websocket_public")) + return nullptr; + + auto const& wsConfig = config.at("websocket_public").as_object(); + + auto const address = + boost::asio::ip::make_address(wsConfig.at("ip").as_string().c_str()); + auto const port = + static_cast(wsConfig.at("port").as_int64()); + + auto server = std::make_shared( + ioc, + ctx, + boost::asio::ip::tcp::endpoint{address, port}, + backend, + subscriptions, + balancer, + dosGuard); + + server->run(); + return server; + } + + static std::shared_ptr + make_HttpServer( + boost::json::object const& config, + boost::asio::io_context& ioc, + ssl::context& ctx, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + DOSGuard& dosGuard) + { + if (!config.contains("http_public")) + return nullptr; + + auto const& httpConfig = config.at("http_public").as_object(); + + auto const address = + boost::asio::ip::make_address(httpConfig.at("ip").as_string().c_str()); + auto const port = + static_cast(httpConfig.at("port").as_int64()); + + auto server = std::make_shared( + ioc, + ctx, + boost::asio::ip::tcp::endpoint{address, port}, + backend, + subscriptions, + balancer, + dosGuard); + + server->run(); + return server; + } +} + +#endif // LISTENER_H \ No newline at end of file diff --git a/server/session.h b/server/session.h deleted file mode 100644 index 62d9fd350..000000000 --- a/server/session.h +++ /dev/null @@ -1,398 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2020 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_REPORTING_SESSION_H -#define RIPPLE_REPORTING_SESSION_H - -#include -#include -#include - -#include -#include -#include -#include - -class SubscriptionManager; -class ETLLoadBalancer; - -//------------------------------------------------------------------------------ -enum RPCCommand { - tx, - account_tx, - ledger, - account_info, - ledger_data, - book_offers, - ledger_range, - ledger_entry, - account_channels, - account_lines, - account_currencies, - account_offers, - account_objects, - channel_authorize, - channel_verify, - server_info, - subscribe, - unsubscribe -}; - -static std::unordered_map commandMap{ - {"tx", tx}, - {"account_tx", account_tx}, - {"ledger", ledger}, - {"ledger_range", ledger_range}, - {"ledger_entry", ledger_entry}, - {"account_info", account_info}, - {"ledger_data", ledger_data}, - {"book_offers", book_offers}, - {"account_channels", account_channels}, - {"account_lines", account_lines}, - {"account_currencies", account_currencies}, - {"account_offers", account_offers}, - {"account_objects", account_objects}, - {"channel_authorize", channel_authorize}, - {"channel_verify", channel_verify}, - {"server_info", server_info}, - {"subscribe", subscribe}, - {"unsubscribe", unsubscribe}}; - -static std::unordered_set forwardCommands{ - "submit", - "submit_multisigned", - "fee", - "path_find", - "ripple_path_find", - "manifest"}; - -boost::json::object -doTx(boost::json::object const& request, BackendInterface const& backend); -boost::json::object -doAccountTx( - boost::json::object const& request, - BackendInterface const& backend); - -boost::json::object -doBookOffers( - boost::json::object const& request, - BackendInterface const& backend); - -boost::json::object -doLedgerData( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doLedgerEntry( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doLedger(boost::json::object const& request, BackendInterface const& backend); - -boost::json::object -doLedgerRange( - boost::json::object const& request, - BackendInterface const& backend); - -boost::json::object -doAccountInfo( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doAccountChannels( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doAccountLines( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doAccountCurrencies( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doAccountOffers( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doAccountObjects( - boost::json::object const& request, - BackendInterface const& backend); -boost::json::object -doServerInfo( - boost::json::object const& request, - BackendInterface const& backend); - -boost::json::object -doChannelAuthorize(boost::json::object const& request); -boost::json::object -doChannelVerify(boost::json::object const& request); - -boost::json::object -doSubscribe( - boost::json::object const& request, - std::shared_ptr& session, - SubscriptionManager& manager); -boost::json::object -doUnsubscribe( - boost::json::object const& request, - std::shared_ptr& session, - SubscriptionManager& manager); - -std::pair -buildResponse( - boost::json::object const& request, - std::shared_ptr backend, - std::shared_ptr manager, - std::shared_ptr balancer, - std::shared_ptr session); - -void -fail(boost::beast::error_code ec, char const* what); - -// Echoes back all received WebSocket messages -class WsSession : public std::enable_shared_from_this -{ - boost::beast::websocket::stream ws_; - boost::beast::flat_buffer buffer_; - std::string response_; - - std::shared_ptr backend_; - std::weak_ptr subscriptions_; - std::shared_ptr balancer_; - DOSGuard& dosGuard_; - -public: - // Take ownership of the socket - explicit WsSession( - boost::asio::ip::tcp::socket&& socket, - std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr balancer, - DOSGuard& dosGuard) - : ws_(std::move(socket)) - , backend_(backend) - , subscriptions_(subscriptions) - , balancer_(balancer) - , dosGuard_(dosGuard) - { - } - - static void - make_session( - boost::asio::ip::tcp::socket&& socket, - std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr balancer, - DOSGuard& dosGuard) - { - std::make_shared( - std::move(socket), backend, subscriptions, balancer, dosGuard) - ->run(); - } - - ~session() - { - close(1012); - } - - void - send(std::string&& msg) - { - ws_.text(ws_.got_text()); - ws_.async_write( - boost::asio::buffer(msg), - boost::beast::bind_front_handler( - &session::on_write, shared_from_this())); - } - - void - close(boost::beast::websocket::close_reason const& cr) - { - boost::beast::error_code ec; - - ws_.close(cr, ec); - - if (ec) - return fail(ec, "close"); - } - -private: - // Get on the correct executor - void - run() - { - // We need to be executing within a strand to perform async operations - // on the I/O objects in this session. Although not strictly necessary - // for single-threaded contexts, this example code is written to be - // thread-safe by default. - boost::asio::dispatch( - ws_.get_executor(), - boost::beast::bind_front_handler( - &WsSession::on_run, shared_from_this())); - } - - // Start the asynchronous operation - void - on_run() - { - // Set suggested timeout settings for the websocket - ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested( - boost::beast::role_type::server)); - - // Set a decorator to change the Server of the handshake - ws_.set_option(boost::beast::websocket::stream_base::decorator( - [](boost::beast::websocket::response_type& res) { - res.set( - boost::beast::http::field::server, - std::string(BOOST_BEAST_VERSION_STRING) + - " websocket-server-async"); - })); - // Accept the websocket handshake - ws_.async_accept(boost::beast::bind_front_handler( - &WsSession::on_accept, shared_from_this())); - } - - void - on_accept(boost::beast::error_code ec) - { - if (ec) - return fail(ec, "accept"); - - // Read a message - do_read(); - } - - void - do_read() - { - // Read a message into our buffer - ws_.async_read( - buffer_, - boost::beast::bind_front_handler( - &WsSession::on_read, shared_from_this())); - } - - void - on_read(boost::beast::error_code ec, std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - // This indicates that the session was closed - if (ec == boost::beast::websocket::error::closed) - return; - - if (ec) - fail(ec, "read"); - - std::string msg{ - static_cast(buffer_.data().data()), buffer_.size()}; - // BOOST_LOG_TRIVIAL(debug) << __func__ << msg; - boost::json::object response; - auto ip = - ws_.next_layer().socket().remote_endpoint().address().to_string(); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " received request from ip = " << ip; - if (!dosGuard_.isOk(ip)) - response["error"] = "Too many requests. Slow down"; - else - { - try - { - boost::json::value raw = boost::json::parse(msg); - boost::json::object request = raw.as_object(); - BOOST_LOG_TRIVIAL(debug) << " received request : " << request; - try - { - std::shared_ptr subPtr = - subscriptions_.lock(); - if (!subPtr) - return; - - auto [res, cost] = buildResponse( - request, - backend_, - subPtr, - balancer_, - shared_from_this()); - auto start = std::chrono::system_clock::now(); - response = std::move(res); - if (!dosGuard_.add(ip, cost)) - { - response["warning"] = "Too many requests"; - } - - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << " RPC call took " - << ((end - start).count() / 1000000000.0) - << " . request = " << request; - } - catch (Backend::DatabaseTimeout const& t) - { - BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout"; - response["error"] = - "Database read timeout. Please retry the request"; - } - } - catch (std::exception const& e) - { - BOOST_LOG_TRIVIAL(error) - << __func__ << "caught exception : " << e.what(); - response["error"] = "Unknown exception"; - } - } - BOOST_LOG_TRIVIAL(trace) << __func__ << response; - response_ = boost::json::serialize(response); - - // Echo the message - ws_.text(ws_.got_text()); - ws_.async_write( - boost::asio::buffer(response_), - boost::beast::bind_front_handler( - &WsSession::on_write, shared_from_this())); - } - - void - send(std::string&& msg) - { - ws_.text(ws_.got_text()); - ws_.async_write( - boost::asio::buffer(msg), - boost::beast::bind_front_handler( - &WsSession::on_write, shared_from_this())); - } - - void - on_write(boost::beast::error_code ec, std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - if (ec) - return fail(ec, "write"); - - // Clear the buffer - buffer_.consume(buffer_.size()); - - // Do another read - do_read(); - } -}; - -#endif // RIPPLE_REPORTING_SESSION_H diff --git a/server/websocket_server_async.cpp b/server/websocket_server_async.cpp index 147fad404..efac5cece 100644 --- a/server/websocket_server_async.cpp +++ b/server/websocket_server_async.cpp @@ -28,12 +28,8 @@ #include #include #include -#include -#include -#include -#include -#include -#include +#include +#include #include #include #include @@ -142,13 +138,6 @@ start(boost::asio::io_context& ioc, std::uint32_t numThreads) v.reserve(numThreads - 1); for (auto i = numThreads - 1; i > 0; --i) v.emplace_back([&ioc] { ioc.run(); }); - - std::make_shared>( - ioc, - ctx, - boost::asio::ip::tcp::endpoint{address, port}, - etl) - ->run(); } int @@ -167,7 +156,7 @@ main(int argc, char* argv[]) auto const threads = std::max(1, std::atoi(argv[1])); auto const config = parse_config(argv[2]); - auto const ctx = parse_certs(argv[3], argv[4]); + auto ctx = parse_certs(argv[3], argv[4]); if (argc > 5) { @@ -201,20 +190,42 @@ main(int argc, char* argv[]) std::shared_ptr ledgers{ NetworkValidatedLedgers::make_ValidatedLedgers()}; - std::shared_ptr balancer{ - ETLLoadBalancer::make_ETLLoadBalancer( - *config, ioc, backend, subscriptions, ledgers)}; - - std::shared_ptr etl{ReportingETL::make_ReportingETL( - *config, ioc, backend, subscriptions, balancer, ledgers)}; - - listener::make_listener( + auto balancer = ETLLoadBalancer::make_ETLLoadBalancer( + *config, + ioc, + backend, + subscriptions, + ledgers + ); + + auto etl = ReportingETL::make_ReportingETL( + *config, ioc, - boost::asio::ip::tcp::endpoint{address, port}, backend, subscriptions, balancer, - dosGuard); + ledgers + ); + + auto wsServer = Server::make_WebSocketServer( + *config, + ioc, + *ctx, + backend, + subscriptions, + balancer, + dosGuard + ); + + auto httpServer = Server::make_HttpServer( + *config, + ioc, + *ctx, + backend, + subscriptions, + balancer, + dosGuard + ); // Blocks until stopped. // When stopped, shared_ptrs fall out of scope