diff --git a/src/app/CliArgs.cpp b/src/app/CliArgs.cpp index 374ee986..5aae7ed7 100644 --- a/src/app/CliArgs.cpp +++ b/src/app/CliArgs.cpp @@ -47,7 +47,6 @@ CliArgs::parse(int argc, char const* argv[]) ("help,h", "Print help message and exit") ("version,v", "Print version and exit") ("conf,c", po::value()->default_value(kDEFAULT_CONFIG_PATH), "Configuration file") - ("ng-web-server,w", "Use ng-web-server") ("migrate", po::value(), "Start migration helper") ("verify", "Checks the validity of config values") ("config-description,d", po::value(), "Generate config description markdown file") @@ -93,8 +92,7 @@ CliArgs::parse(int argc, char const* argv[]) if (parsed.count("verify") != 0u) return Action{Action::VerifyConfig{.configPath = std::move(configPath)}}; - return Action{Action::Run{.configPath = std::move(configPath), .useNgWebServer = parsed.count("ng-web-server") != 0} - }; + return Action{Action::Run{.configPath = std::move(configPath)}}; } } // namespace app diff --git a/src/app/CliArgs.hpp b/src/app/CliArgs.hpp index 8f2a8c35..aa465b5e 100644 --- a/src/app/CliArgs.hpp +++ b/src/app/CliArgs.hpp @@ -45,7 +45,6 @@ public: /** @brief Run action. */ struct Run { std::string configPath; ///< Configuration file path. - bool useNgWebServer; ///< Whether to use a ng web server }; /** @brief Exit action. */ diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index 34a754ac..36b5c62e 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -49,8 +49,6 @@ #include "web/dosguard/IntervalSweepHandler.hpp" #include "web/dosguard/Weights.hpp" #include "web/dosguard/WhitelistHandler.hpp" -#include "web/ng/RPCServerHandler.hpp" -#include "web/ng/Server.hpp" #include @@ -96,7 +94,7 @@ ClioApplication::ClioApplication(util::config::ClioConfigDefinition const& confi } int -ClioApplication::run(bool const useNgWebServer) +ClioApplication::run() { auto const threads = config_.get("io_threads"); LOG(util::LogService::info()) << "Number of io threads = " << threads; @@ -170,51 +168,37 @@ ClioApplication::run(bool const useNgWebServer) auto const rpcEngine = RPCEngineType::makeRPCEngine(config_, backend, balancer, dosGuard, workQueue, counters, handlerProvider); - if (useNgWebServer or config_.get("server.__ng_web_server")) { - web::ng::RPCServerHandler handler{config_, backend, rpcEngine, etl, dosGuard}; + web::RPCServerHandler handler{config_, backend, rpcEngine, etl, dosGuard}; - auto expectedAdminVerifier = web::makeAdminVerificationStrategy(config_); - if (not expectedAdminVerifier.has_value()) { - LOG(util::LogService::error()) << "Error creating admin verifier: " << expectedAdminVerifier.error(); - return EXIT_FAILURE; - } - auto const adminVerifier = std::move(expectedAdminVerifier).value(); + auto expectedAdminVerifier = web::makeAdminVerificationStrategy(config_); + if (not expectedAdminVerifier.has_value()) { + LOG(util::LogService::error()) << "Error creating admin verifier: " << expectedAdminVerifier.error(); + return EXIT_FAILURE; + } + auto const adminVerifier = std::move(expectedAdminVerifier).value(); - auto httpServer = web::ng::makeServer(config_, OnConnectCheck{dosGuard}, DisconnectHook{dosGuard}, ioc); + auto httpServer = web::makeServer(config_, OnConnectCheck{dosGuard}, DisconnectHook{dosGuard}, ioc); - if (not httpServer.has_value()) { - LOG(util::LogService::error()) << "Error creating web server: " << httpServer.error(); - return EXIT_FAILURE; - } - - httpServer->onGet("/metrics", MetricsHandler{adminVerifier}); - httpServer->onGet("/health", HealthCheckHandler{}); - auto requestHandler = RequestHandler{adminVerifier, handler}; - httpServer->onPost("/", requestHandler); - httpServer->onWs(std::move(requestHandler)); - - auto const maybeError = httpServer->run(); - if (maybeError.has_value()) { - LOG(util::LogService::error()) << "Error starting web server: " << *maybeError; - return EXIT_FAILURE; - } - - appStopper_.setOnStop( - Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, ioc) - ); - - // Blocks until stopped. - // When stopped, shared_ptrs fall out of scope - // Calls destructors on all resources, and destructs in order - start(ioc, threads); - - return EXIT_SUCCESS; + if (not httpServer.has_value()) { + LOG(util::LogService::error()) << "Error creating web server: " << httpServer.error(); + return EXIT_FAILURE; } - // Init the web server - auto handler = std::make_shared>(config_, backend, rpcEngine, etl, dosGuard); + httpServer->onGet("/metrics", MetricsHandler{adminVerifier}); + httpServer->onGet("/health", HealthCheckHandler{}); + auto requestHandler = RequestHandler{adminVerifier, handler}; + httpServer->onPost("/", requestHandler); + httpServer->onWs(std::move(requestHandler)); - auto const httpServer = web::makeHttpServer(config_, ioc, dosGuard, handler); + auto const maybeError = httpServer->run(); + if (maybeError.has_value()) { + LOG(util::LogService::error()) << "Error starting web server: " << *maybeError; + return EXIT_FAILURE; + } + + appStopper_.setOnStop( + Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, ioc) + ); // Blocks until stopped. // When stopped, shared_ptrs fall out of scope diff --git a/src/app/ClioApplication.hpp b/src/app/ClioApplication.hpp index 871fcd6a..9d03ed0d 100644 --- a/src/app/ClioApplication.hpp +++ b/src/app/ClioApplication.hpp @@ -44,12 +44,10 @@ public: /** * @brief Run the application * - * @param useNgWebServer Whether to use the new web server - * * @return exit code */ int - run(bool useNgWebServer); + run(); }; } // namespace app diff --git a/src/app/Stopper.hpp b/src/app/Stopper.hpp index daba1164..056f6f3c 100644 --- a/src/app/Stopper.hpp +++ b/src/app/Stopper.hpp @@ -25,7 +25,7 @@ #include "feed/SubscriptionManagerInterface.hpp" #include "util/CoroutineGroup.hpp" #include "util/log/Logger.hpp" -#include "web/ng/Server.hpp" +#include "web/Server.hpp" #include #include @@ -74,7 +74,7 @@ public: * @param ioc The io_context to stop. * @return The callback to be called on application stop. */ - template + template static std::function makeOnStopCallback( ServerType& server, diff --git a/src/app/WebHandlers.cpp b/src/app/WebHandlers.cpp index 97cddb7a..20c3069f 100644 --- a/src/app/WebHandlers.cpp +++ b/src/app/WebHandlers.cpp @@ -22,11 +22,11 @@ #include "util/Assert.hpp" #include "util/prometheus/Http.hpp" #include "web/AdminVerificationStrategy.hpp" +#include "web/Connection.hpp" +#include "web/Request.hpp" +#include "web/Response.hpp" #include "web/SubscriptionContextInterface.hpp" #include "web/dosguard/DOSGuardInterface.hpp" -#include "web/ng/Connection.hpp" -#include "web/ng/Request.hpp" -#include "web/ng/Response.hpp" #include #include @@ -41,13 +41,13 @@ OnConnectCheck::OnConnectCheck(web::dosguard::DOSGuardInterface& dosguard) : dos { } -std::expected -OnConnectCheck::operator()(web::ng::Connection const& connection) +std::expected +OnConnectCheck::operator()(web::Connection const& connection) { dosguard_.get().increment(connection.ip()); if (not dosguard_.get().isOk(connection.ip())) { return std::unexpected{ - web::ng::Response{boost::beast::http::status::too_many_requests, "Too many requests", connection} + web::Response{boost::beast::http::status::too_many_requests, "Too many requests", connection} }; } @@ -59,7 +59,7 @@ DisconnectHook::DisconnectHook(web::dosguard::DOSGuardInterface& dosguard) : dos } void -DisconnectHook::operator()(web::ng::Connection const& connection) +DisconnectHook::operator()(web::Connection const& connection) { dosguard_.get().decrement(connection.ip()); } @@ -69,10 +69,10 @@ MetricsHandler::MetricsHandler(std::shared_ptr a { } -web::ng::Response +web::Response MetricsHandler::operator()( - web::ng::Request const& request, - web::ng::ConnectionMetadata& connectionMetadata, + web::Request const& request, + web::ConnectionMetadata& connectionMetadata, web::SubscriptionContextPtr, boost::asio::yield_context ) @@ -86,13 +86,13 @@ MetricsHandler::operator()( httpRequest, adminVerifier_->isAdmin(httpRequest, connectionMetadata.ip()) ); ASSERT(maybeResponse.has_value(), "Got unexpected request for Prometheus"); - return web::ng::Response{std::move(maybeResponse).value(), request}; + return web::Response{std::move(maybeResponse).value(), request}; } -web::ng::Response +web::Response HealthCheckHandler::operator()( - web::ng::Request const& request, - web::ng::ConnectionMetadata&, + web::Request const& request, + web::ConnectionMetadata&, web::SubscriptionContextPtr, boost::asio::yield_context ) @@ -105,7 +105,7 @@ HealthCheckHandler::operator()( )html"; - return web::ng::Response{boost::beast::http::status::ok, kHEALTH_CHECK_HTML, request}; + return web::Response{boost::beast::http::status::ok, kHEALTH_CHECK_HTML, request}; } } // namespace app diff --git a/src/app/WebHandlers.hpp b/src/app/WebHandlers.hpp index 5dacc222..35afa2e5 100644 --- a/src/app/WebHandlers.hpp +++ b/src/app/WebHandlers.hpp @@ -22,11 +22,11 @@ #include "rpc/Errors.hpp" #include "util/log/Logger.hpp" #include "web/AdminVerificationStrategy.hpp" +#include "web/Connection.hpp" +#include "web/Request.hpp" +#include "web/Response.hpp" #include "web/SubscriptionContextInterface.hpp" #include "web/dosguard/DOSGuardInterface.hpp" -#include "web/ng/Connection.hpp" -#include "web/ng/Request.hpp" -#include "web/ng/Response.hpp" #include #include @@ -60,8 +60,8 @@ public: * @param connection The connection to check. * @return A response if the connection is not allowed to proceed or void otherwise. */ - std::expected - operator()(web::ng::Connection const& connection); + std::expected + operator()(web::Connection const& connection); }; /** @@ -84,7 +84,7 @@ public: * @param connection The connection which has disconnected. */ void - operator()(web::ng::Connection const& connection); + operator()(web::Connection const& connection); }; /** @@ -108,10 +108,10 @@ public: * @param connectionMetadata The connection metadata. * @return The response to the request. */ - web::ng::Response + web::Response operator()( - web::ng::Request const& request, - web::ng::ConnectionMetadata& connectionMetadata, + web::Request const& request, + web::ConnectionMetadata& connectionMetadata, web::SubscriptionContextPtr, boost::asio::yield_context ); @@ -128,10 +128,10 @@ public: * @param request The request to handle. * @return The response to the request */ - web::ng::Response + web::Response operator()( - web::ng::Request const& request, - web::ng::ConnectionMetadata&, + web::Request const& request, + web::ConnectionMetadata&, web::SubscriptionContextPtr, boost::asio::yield_context ); @@ -169,10 +169,10 @@ public: * @param yield The yield context. * @return The response to the request. */ - web::ng::Response + web::Response operator()( - web::ng::Request const& request, - web::ng::ConnectionMetadata& connectionMetadata, + web::Request const& request, + web::ConnectionMetadata& connectionMetadata, web::SubscriptionContextPtr subscriptionContext, boost::asio::yield_context yield ) @@ -188,7 +188,7 @@ public: try { return rpcHandler_(request, connectionMetadata, std::move(subscriptionContext), yield); } catch (std::exception const&) { - return web::ng::Response{ + return web::Response{ boost::beast::http::status::internal_server_error, rpc::makeError(rpc::RippledError::rpcINTERNAL), request diff --git a/src/main/Main.cpp b/src/main/Main.cpp index 4aebd3c8..5447d56c 100644 --- a/src/main/Main.cpp +++ b/src/main/Main.cpp @@ -57,7 +57,7 @@ try { return EXIT_FAILURE; } app::ClioApplication clio{gClioConfig}; - return clio.run(run.useNgWebServer); + return clio.run(); }, [](app::CliArgs::Action::Migrate const& migrate) { if (not app::parseConfig(migrate.configPath)) diff --git a/src/util/config/ConfigDefinition.hpp b/src/util/config/ConfigDefinition.hpp index 558ccb7c..28944679 100644 --- a/src/util/config/ConfigDefinition.hpp +++ b/src/util/config/ConfigDefinition.hpp @@ -338,7 +338,6 @@ static ClioConfigDefinition gClioConfig = ClioConfigDefinition{ {"server.parallel_requests_limit", ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint16)}, {"server.ws_max_sending_queue_size", ConfigValue{ConfigType::Integer}.defaultValue(1500).withConstraint(gValidateUint32)}, - {"server.__ng_web_server", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, {"prometheus.enabled", ConfigValue{ConfigType::Boolean}.defaultValue(true)}, {"prometheus.compress_reply", ConfigValue{ConfigType::Boolean}.defaultValue(true)}, diff --git a/src/web/CMakeLists.txt b/src/web/CMakeLists.txt index 2b62bbe4..2bd0938e 100644 --- a/src/web/CMakeLists.txt +++ b/src/web/CMakeLists.txt @@ -7,14 +7,14 @@ target_sources( dosguard/IntervalSweepHandler.cpp dosguard/Weights.cpp dosguard/WhitelistHandler.cpp - ng/Connection.cpp - ng/impl/ErrorHandling.cpp - ng/impl/ConnectionHandler.cpp - ng/impl/ServerSslContext.cpp - ng/Request.cpp - ng/Response.cpp - ng/Server.cpp - ng/SubscriptionContext.cpp + Connection.cpp + impl/ErrorHandling.cpp + impl/ConnectionHandler.cpp + impl/ServerSslContext.cpp + Request.cpp + Response.cpp + Server.cpp + SubscriptionContext.cpp Resolver.cpp SubscriptionContext.cpp ) diff --git a/src/web/ng/Connection.cpp b/src/web/Connection.cpp similarity index 95% rename from src/web/ng/Connection.cpp rename to src/web/Connection.cpp index e02fe64b..5b9e8b50 100644 --- a/src/web/ng/Connection.cpp +++ b/src/web/Connection.cpp @@ -17,7 +17,7 @@ */ //============================================================================== -#include "web/ng/Connection.hpp" +#include "web/Connection.hpp" #include "util/Taggable.hpp" @@ -26,7 +26,7 @@ #include #include -namespace web::ng { +namespace web { ConnectionMetadata::ConnectionMetadata(std::string ip, util::TagDecoratorFactory const& tagDecoratorFactory) : util::Taggable(tagDecoratorFactory), ip_{std::move(ip)} @@ -54,4 +54,4 @@ Connection::Connection( { } -} // namespace web::ng +} // namespace web diff --git a/src/web/ng/Connection.hpp b/src/web/Connection.hpp similarity index 97% rename from src/web/ng/Connection.hpp rename to src/web/Connection.hpp index 9283e964..3cd9dc78 100644 --- a/src/web/ng/Connection.hpp +++ b/src/web/Connection.hpp @@ -20,9 +20,9 @@ #pragma once #include "util/Taggable.hpp" -#include "web/ng/Error.hpp" -#include "web/ng/Request.hpp" -#include "web/ng/Response.hpp" +#include "web/Error.hpp" +#include "web/Request.hpp" +#include "web/Response.hpp" #include #include @@ -35,7 +35,7 @@ #include #include -namespace web::ng { +namespace web { /** * @brief An interface for a connection metadata class. @@ -159,4 +159,4 @@ public: */ using ConnectionPtr = std::unique_ptr; -} // namespace web::ng +} // namespace web diff --git a/src/web/ng/Error.hpp b/src/web/Error.hpp similarity index 96% rename from src/web/ng/Error.hpp rename to src/web/Error.hpp index 93f35646..d4c8f53b 100644 --- a/src/web/ng/Error.hpp +++ b/src/web/Error.hpp @@ -21,11 +21,11 @@ #include -namespace web::ng { +namespace web { /** * @brief Error of any async operation. */ using Error = boost::system::error_code; -} // namespace web::ng +} // namespace web diff --git a/src/web/HttpSession.hpp b/src/web/HttpSession.hpp deleted file mode 100644 index a60979b2..00000000 --- a/src/web/HttpSession.hpp +++ /dev/null @@ -1,144 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of clio: https://github.com/XRPLF/clio - Copyright (c) 2023, the clio developers. - - Permission to use, copy, modify, and distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#pragma once - -#include "util/Taggable.hpp" -#include "web/AdminVerificationStrategy.hpp" -#include "web/PlainWsSession.hpp" -#include "web/dosguard/DOSGuardInterface.hpp" -#include "web/impl/HttpBase.hpp" -#include "web/interface/Concepts.hpp" -#include "web/interface/ConnectionBase.hpp" - -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -namespace web { - -using tcp = boost::asio::ip::tcp; - -/** - * @brief Represents a HTTP connection established by a client. - * - * It will handle the upgrade to websocket, pass the ownership of the socket to the upgrade session. - * Otherwise, it will pass control to the base class. - * - * @tparam HandlerType The type of the server handler to use - */ -template -class HttpSession : public impl::HttpBase, - public std::enable_shared_from_this> { - boost::beast::tcp_stream stream_; - std::reference_wrapper tagFactory_; - std::uint32_t maxWsSendingQueueSize_; - -public: - /** - * @brief Create a new session. - * - * @param socket The socket. Ownership is transferred to HttpSession - * @param ip Client's IP address - * @param adminVerification The admin verification strategy to use - * @param tagFactory A factory that is used to generate tags to track requests and sessions - * @param dosGuard The denial of service guard to use - * @param handler The server handler to use - * @param buffer Buffer with initial data received from the peer - * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket - */ - explicit HttpSession( - tcp::socket&& socket, - std::string const& ip, - std::shared_ptr const& adminVerification, - std::reference_wrapper tagFactory, - std::reference_wrapper dosGuard, - std::shared_ptr const& handler, - boost::beast::flat_buffer buffer, - std::uint32_t maxWsSendingQueueSize - ) - : impl::HttpBase( - ip, - tagFactory, - adminVerification, - dosGuard, - handler, - std::move(buffer) - ) - , stream_(std::move(socket)) - , tagFactory_(tagFactory) - , maxWsSendingQueueSize_(maxWsSendingQueueSize) - { - } - - ~HttpSession() override = default; - - /** @return The TCP stream */ - boost::beast::tcp_stream& - stream() - { - return stream_; - } - - /** @brief Starts reading from the stream. */ - void - run() - { - boost::asio::dispatch( - stream_.get_executor(), - boost::beast::bind_front_handler( - &impl::HttpBase::doRead, this->shared_from_this() - ) - ); - } - - /** @brief Closes the underlying socket. */ - void - doClose() - { - boost::beast::error_code ec; - stream_.socket().shutdown(tcp::socket::shutdown_send, ec); - } - - /** @brief Upgrade to WebSocket connection. */ - void - upgrade() - { - std::make_shared>( - std::move(stream_), - this->clientIp, - tagFactory_, - this->dosGuard_, - this->handler_, - std::move(this->buffer_), - std::move(this->req_), - ConnectionBase::isAdmin(), - maxWsSendingQueueSize_ - ) - ->run(); - } -}; -} // namespace web diff --git a/src/web/ng/MessageHandler.hpp b/src/web/MessageHandler.hpp similarity index 90% rename from src/web/ng/MessageHandler.hpp rename to src/web/MessageHandler.hpp index 6b3224c3..ec2f0a1a 100644 --- a/src/web/ng/MessageHandler.hpp +++ b/src/web/MessageHandler.hpp @@ -19,16 +19,16 @@ #pragma once +#include "web/Connection.hpp" +#include "web/Request.hpp" +#include "web/Response.hpp" #include "web/SubscriptionContextInterface.hpp" -#include "web/ng/Connection.hpp" -#include "web/ng/Request.hpp" -#include "web/ng/Response.hpp" #include #include -namespace web::ng { +namespace web { /** * @brief Handler for messages. @@ -36,4 +36,4 @@ namespace web::ng { using MessageHandler = std::function; -} // namespace web::ng +} // namespace web diff --git a/src/web/PlainWsSession.hpp b/src/web/PlainWsSession.hpp deleted file mode 100644 index 6066765a..00000000 --- a/src/web/PlainWsSession.hpp +++ /dev/null @@ -1,204 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of clio: https://github.com/XRPLF/clio - Copyright (c) 2023, the clio developers. - - Permission to use, copy, modify, and distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#pragma once - -#include "util/Taggable.hpp" -#include "web/dosguard/DOSGuardInterface.hpp" -#include "web/impl/WsBase.hpp" -#include "web/interface/ConnectionBase.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -namespace web { - -/** - * @brief Represents a non-secure websocket session. - * - * Majority of the operations are handled by the base class. - */ -template -class PlainWsSession : public impl::WsBase { - using StreamType = boost::beast::websocket::stream; - StreamType ws_; - -public: - /** - * @brief Create a new non-secure websocket session. - * - * @param socket The socket. Ownership is transferred - * @param ip Client's IP address - * @param tagFactory A factory that is used to generate tags to track requests and sessions - * @param dosGuard The denial of service guard to use - * @param handler The server handler to use - * @param buffer Buffer with initial data received from the peer - * @param isAdmin Whether the connection has admin privileges, - * @param maxSendingQueueSize The maximum size of the sending queue for websocket - */ - explicit PlainWsSession( - boost::asio::ip::tcp::socket&& socket, - std::string ip, - std::reference_wrapper tagFactory, - std::reference_wrapper dosGuard, - std::shared_ptr const& handler, - boost::beast::flat_buffer&& buffer, - bool isAdmin, - std::uint32_t maxSendingQueueSize - ) - : impl::WsBase( - ip, - tagFactory, - dosGuard, - handler, - std::move(buffer), - maxSendingQueueSize - ) - , ws_(std::move(socket)) - { - ConnectionBase::isAdmin_ = isAdmin; // NOLINT(cppcoreguidelines-prefer-member-initializer) - } - - ~PlainWsSession() override = default; - - /** @return The websocket stream. */ - StreamType& - ws() - { - return ws_; - } -}; - -/** - * @brief The websocket upgrader class, upgrade from an HTTP session to a non-secure websocket session. - * - * Pass the socket to the session class after upgrade. - */ -template -class WsUpgrader : public std::enable_shared_from_this> { - using std::enable_shared_from_this>::shared_from_this; - - boost::beast::tcp_stream http_; - boost::optional> parser_; - boost::beast::flat_buffer buffer_; - std::reference_wrapper tagFactory_; - std::reference_wrapper dosGuard_; - http::request req_; - std::string ip_; - std::shared_ptr const handler_; - bool isAdmin_; - std::uint32_t maxWsSendingQueueSize_; - -public: - /** - * @brief Create a new upgrader to non-secure websocket. - * - * @param stream The TCP stream. Ownership is transferred - * @param ip Client's IP address - * @param tagFactory A factory that is used to generate tags to track requests and sessions - * @param dosGuard The denial of service guard to use - * @param handler The server handler to use - * @param buffer Buffer with initial data received from the peer. Ownership is transferred - * @param request The request. Ownership is transferred - * @param isAdmin Whether the connection has admin privileges - * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket - */ - WsUpgrader( - boost::beast::tcp_stream&& stream, - std::string ip, - std::reference_wrapper tagFactory, - std::reference_wrapper dosGuard, - std::shared_ptr const& handler, - boost::beast::flat_buffer&& buffer, - http::request request, - bool isAdmin, - std::uint32_t maxWsSendingQueueSize - ) - : http_(std::move(stream)) - , buffer_(std::move(buffer)) - , tagFactory_(tagFactory) - , dosGuard_(dosGuard) - , req_(std::move(request)) - , ip_(std::move(ip)) - , handler_(handler) - , isAdmin_(isAdmin) - , maxWsSendingQueueSize_(maxWsSendingQueueSize) - { - } - - /** @brief Initiate the upgrade. */ - void - run() - { - boost::asio::dispatch( - http_.get_executor(), - boost::beast::bind_front_handler(&WsUpgrader::doUpgrade, shared_from_this()) - ); - } - -private: - void - doUpgrade() - { - parser_.emplace(); - - static constexpr auto kMAX_BODY_SIZE = 10000; - parser_->body_limit(kMAX_BODY_SIZE); - - boost::beast::get_lowest_layer(http_).expires_after(std::chrono::seconds(30)); - onUpgrade(); - } - - void - onUpgrade() - { - if (!boost::beast::websocket::is_upgrade(req_)) - return; - - // Disable the timeout. The websocket::stream uses its own timeout settings. - boost::beast::get_lowest_layer(http_).expires_never(); - - std::make_shared>( - http_.release_socket(), - ip_, - tagFactory_, - dosGuard_, - handler_, - std::move(buffer_), - isAdmin_, - maxWsSendingQueueSize_ - ) - ->run(std::move(req_)); - } -}; - -} // namespace web diff --git a/src/web/ng/ProcessingPolicy.hpp b/src/web/ProcessingPolicy.hpp similarity index 96% rename from src/web/ng/ProcessingPolicy.hpp rename to src/web/ProcessingPolicy.hpp index ad69a744..d336866d 100644 --- a/src/web/ng/ProcessingPolicy.hpp +++ b/src/web/ProcessingPolicy.hpp @@ -19,11 +19,11 @@ #pragma once -namespace web::ng { +namespace web { /** * @brief Requests processing policy. */ enum class ProcessingPolicy { Sequential, Parallel }; -} // namespace web::ng +} // namespace web diff --git a/src/web/README.md b/src/web/README.md index 76b0ce41..cf7768e2 100644 --- a/src/web/README.md +++ b/src/web/README.md @@ -1,15 +1,67 @@ -# Web server subsystem +# Web Server Subsystem -This folder contains all of the classes for running the web server. +This folder contains all of the classes for running Clio's web server. + +## Overview The web server subsystem: -- Handles JSON-RPC and websocket requests. +- Handles JSON-RPC requests over HTTP and WebSocket connections +- Supports SSL/TLS encryption if certificate and key files are specified in the config +- Processes all types of requests on a single port +- Implements asynchronous request handling using [Boost Asio](https://www.boost.org/doc/libs/1_83_0/doc/html/boost_asio.html) +- Provides request rate limiting through a built-in Denial-of-Service (DoS) Guard mechanism +- Supports both sequential and parallel request processing policies -- Supports SSL if a cert and key file are specified in the config. +## Key Components -- Handles all types of requests on a single port. +### Core Components -Each request is handled asynchronously using [Boost Asio](https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio.html). +- **Server** (`Server.hpp/cpp`): The main web server class that manages connections and routes requests +- **Connection** (`Connection.hpp/cpp`): Represents a client connection and provides an abstraction layer over HTTP and WebSocket connections +- **Request/Response** (`Request.hpp/cpp`, `Response.hpp/cpp`): Classes for handling HTTP requests and responses +- **MessageHandler** (`MessageHandler.hpp`): An interface for handling different types of messages (e.g., HTTP GET/POST, WebSocket) +- **RPCServerHandler** (`RPCServerHandler.hpp`): Handles RPC requests and integrates with the RPC engine -Much of this code was originally copied from Boost beast example code. +### Connection Processing + +- **ConnectionHandler** (`impl/ConnectionHandler.hpp/cpp`): Manages the lifecycle of connections and processes requests +- **ProcessingPolicy** (`ProcessingPolicy.hpp`): Defines whether requests are processed sequentially or in parallel +- **HttpConnection/WsConnection** (`impl/HttpConnection.hpp`, `impl/WsConnection.hpp`): Concrete implementations for HTTP and WebSocket connections + +### Security Features + +- **DOSGuard** (`dosguard/DOSGuard.hpp/cpp`): Denial-of-Service protection system that implements rate limiting +- **IntervalSweepHandler** (`dosguard/IntervalSweepHandler.hpp/cpp`): Periodically clears DoS guard state +- **WhitelistHandler** (`dosguard/WhitelistHandler.hpp/cpp`): Manages IP address whitelisting for bypass of rate limits +- **AdminVerificationStrategy** (`AdminVerificationStrategy.hpp/cpp`): Handles verification of admin privileges + +### Subscription + +- **SubscriptionContext** (`SubscriptionContext.hpp/cpp`): Manages WebSocket subscriptions for streaming updates + +## Architecture + +The server design uses the following patterns: + +- **RAII**: Resource management through C++ RAII principles +- **Dependency Injection**: Components accept their dependencies through constructor parameters +- **Interface-based design**: Components depend on interfaces rather than concrete implementations +- **Asynchronous programming**: Uses Boost Asio for non-blocking I/O operations with coroutines + +Each incoming request is handled asynchronously, with the processing being dispatched to appropriate handlers based on the request type (GET, POST, WebSocket). The server supports both secure (SSL/TLS) and non-secure connections based on configuration. + +## SSL Support + +The server creates an SSL context if certificate and key files are specified in the configuration. When SSL is enabled, all connections are encrypted. + +## Request Flow + +1. Client connects to the server +2. Server performs security checks (e.g., DoS Guard, admin verification if needed) +3. Server reads the request asynchronously +4. Request is routed to appropriate handler based on HTTP method and target +5. Handler processes the request and generates a response +6. Response is sent back to the client +7. For persistent connections, the server returns to step 3 +8. When the client disconnects or an error occurs, the connection is closed diff --git a/src/web/RPCServerHandler.hpp b/src/web/RPCServerHandler.hpp index 851f62a7..a13f0062 100644 --- a/src/web/RPCServerHandler.hpp +++ b/src/web/RPCServerHandler.hpp @@ -26,17 +26,23 @@ #include "rpc/JS.hpp" #include "rpc/RPCHelpers.hpp" #include "rpc/common/impl/APIVersionParser.hpp" +#include "util/Assert.hpp" +#include "util/CoroutineGroup.hpp" #include "util/JsonUtils.hpp" #include "util/Profiler.hpp" #include "util/Taggable.hpp" -#include "util/config/ConfigDefinition.hpp" #include "util/log/Logger.hpp" +#include "web/Connection.hpp" +#include "web/Request.hpp" +#include "web/Response.hpp" +#include "web/SubscriptionContextInterface.hpp" #include "web/dosguard/DOSGuardInterface.hpp" #include "web/impl/ErrorHandling.hpp" -#include "web/interface/ConnectionBase.hpp" #include +#include #include +#include #include #include #include @@ -48,8 +54,8 @@ #include #include #include +#include #include -#include #include #include @@ -65,9 +71,9 @@ class RPCServerHandler { std::shared_ptr const backend_; std::shared_ptr const rpcEngine_; std::shared_ptr const etl_; + std::reference_wrapper dosguard_; util::TagDecoratorFactory const tagFactory_; rpc::impl::ProductionAPIVersionParser apiVersionParser_; // can be injected if needed - std::reference_wrapper dosguard_; util::Logger log_{"RPC"}; util::Logger perfLog_{"Performance"}; @@ -87,14 +93,14 @@ public: std::shared_ptr const& backend, std::shared_ptr const& rpcEngine, std::shared_ptr const& etl, - web::dosguard::DOSGuardInterface& dosguard + dosguard::DOSGuardInterface& dosguard ) : backend_(backend) , rpcEngine_(rpcEngine) , etl_(etl) + , dosguard_(dosguard) , tagFactory_(config) , apiVersionParser_(config.getObject("api_version")) - , dosguard_(dosguard) { } @@ -102,123 +108,165 @@ public: * @brief The callback when server receives a request. * * @param request The request - * @param connection The connection + * @param connectionMetadata The connection metadata + * @param subscriptionContext The subscription context + * @param yield The yield context + * @return The response */ - void - operator()(std::string const& request, std::shared_ptr const& connection) + [[nodiscard]] Response + operator()( + Request const& request, + ConnectionMetadata const& connectionMetadata, + SubscriptionContextPtr subscriptionContext, + boost::asio::yield_context yield + ) { - if (not dosguard_.get().isOk(connection->clientIp)) { - connection->sendSlowDown(request); - return; + if (not dosguard_.get().isOk(connectionMetadata.ip())) { + return makeSlowDownResponse(request, std::nullopt); } - try { - auto req = boost::json::parse(request).as_object(); - LOG(perfLog_.debug()) << connection->tag() << "Adding to work queue"; + std::optional response; + util::CoroutineGroup coroutineGroup{yield, 1}; + auto const onTaskComplete = coroutineGroup.registerForeign(yield); + ASSERT(onTaskComplete.has_value(), "Coroutine group can't be full"); - if (not connection->upgraded and shouldReplaceParams(req)) - req[JS(params)] = boost::json::array({boost::json::object{}}); + bool const postSuccessful = rpcEngine_->post( + [this, + &request, + &response, + &onTaskComplete = onTaskComplete.value(), + &connectionMetadata, + subscriptionContext = std::move(subscriptionContext)](boost::asio::yield_context innerYield) mutable { + try { + boost::system::error_code ec; + auto parsedRequest = boost::json::parse(request.message(), ec); + if (ec.failed() or not parsedRequest.is_object()) { + rpcEngine_->notifyBadSyntax(); + response = impl::ErrorHelper{request}.makeJsonParsingError(); + if (ec.failed()) { + LOG(log_.warn()) + << "Error parsing JSON: " << ec.message() << ". For request: " << request.message(); + } else { + LOG(log_.warn()) << "Received not a JSON object. For request: " << request.message(); + } + } else { + auto parsedObject = std::move(parsedRequest).as_object(); - if (not dosguard_.get().request(connection->clientIp, req)) { - connection->sendSlowDown(request); - return; - } + if (not dosguard_.get().request(connectionMetadata.ip(), parsedObject)) { + response = makeSlowDownResponse(request, parsedObject); + } else { + LOG(perfLog_.debug()) << connectionMetadata.tag() << "Adding to work queue"; - if (!rpcEngine_->post( - [this, request = std::move(req), connection](boost::asio::yield_context yield) mutable { - handleRequest(yield, std::move(request), connection); - }, - connection->clientIp - )) { - rpcEngine_->notifyTooBusy(); - web::impl::ErrorHelper(connection).sendTooBusyError(); - } - } catch (boost::system::system_error const& ex) { - // system_error thrown when json parsing failed - rpcEngine_->notifyBadSyntax(); - web::impl::ErrorHelper(connection).sendJsonParsingError(); - LOG(log_.warn()) << "Error parsing JSON: " << ex.what() << ". For request: " << request; - } catch (std::invalid_argument const& ex) { - // thrown when json parses something that is not an object at top level - rpcEngine_->notifyBadSyntax(); - LOG(log_.warn()) << "Invalid argument error: " << ex.what() << ". For request: " << request; - web::impl::ErrorHelper(connection).sendJsonParsingError(); - } catch (std::exception const& ex) { - LOG(perfLog_.error()) << connection->tag() << "Caught exception: " << ex.what(); - rpcEngine_->notifyInternalError(); - throw; + if (not connectionMetadata.wasUpgraded() and shouldReplaceParams(parsedObject)) + parsedObject[JS(params)] = boost::json::array({boost::json::object{}}); + + response = handleRequest( + innerYield, + request, + std::move(parsedObject), + connectionMetadata, + std::move(subscriptionContext) + ); + } + } + } catch (std::exception const& ex) { + LOG(perfLog_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what(); + rpcEngine_->notifyInternalError(); + response = impl::ErrorHelper{request}.makeInternalError(); + } + + // notify the coroutine group that the foreign task is done + onTaskComplete(); + }, + connectionMetadata.ip() + ); + + if (not postSuccessful) { + // onTaskComplete must be called to notify coroutineGroup that the foreign task is done + onTaskComplete->operator()(); + rpcEngine_->notifyTooBusy(); + return impl::ErrorHelper{request}.makeTooBusyError(); } + + // Put the coroutine to sleep until the foreign task is done + coroutineGroup.asyncWait(yield); + ASSERT(response.has_value(), "Woke up coroutine without setting response"); + + if (not dosguard_.get().add(connectionMetadata.ip(), response->message().size())) { + response->setMessage(makeLoadWarning(*response)); + } + + return std::move(response).value(); } private: - void + Response handleRequest( boost::asio::yield_context yield, + Request const& rawRequest, boost::json::object&& request, - std::shared_ptr const& connection + ConnectionMetadata const& connectionMetadata, + SubscriptionContextPtr subscriptionContext ) { - LOG(log_.info()) << connection->tag() << (connection->upgraded ? "ws" : "http") + LOG(log_.info()) << connectionMetadata.tag() << (connectionMetadata.wasUpgraded() ? "ws" : "http") << " received request from work queue: " << util::removeSecret(request) - << " ip = " << connection->clientIp; + << " ip = " << connectionMetadata.ip(); try { auto const range = backend_->fetchLedgerRange(); if (!range) { // for error that happened before the handler, we don't attach any warnings rpcEngine_->notifyNotReady(); - web::impl::ErrorHelper(connection, std::move(request)).sendNotReadyError(); - - return; + return impl::ErrorHelper{rawRequest, std::move(request)}.makeNotReadyError(); } auto const context = [&] { - if (connection->upgraded) { + if (connectionMetadata.wasUpgraded()) { + ASSERT(subscriptionContext != nullptr, "Subscription context must exist for a WS connection"); return rpc::makeWsContext( yield, request, - connection->makeSubscriptionContext(tagFactory_), - tagFactory_.with(connection->tag()), + std::move(subscriptionContext), + tagFactory_.with(connectionMetadata.tag()), *range, - connection->clientIp, + connectionMetadata.ip(), std::cref(apiVersionParser_), - connection->isAdmin() + connectionMetadata.isAdmin() ); } return rpc::makeHttpContext( yield, request, - tagFactory_.with(connection->tag()), + tagFactory_.with(connectionMetadata.tag()), *range, - connection->clientIp, + connectionMetadata.ip(), std::cref(apiVersionParser_), - connection->isAdmin() + connectionMetadata.isAdmin() ); }(); if (!context) { auto const err = context.error(); - LOG(perfLog_.warn()) << connection->tag() << "Could not create Web context: " << err; - LOG(log_.warn()) << connection->tag() << "Could not create Web context: " << err; + LOG(perfLog_.warn()) << connectionMetadata.tag() << "Could not create Web context: " << err; + LOG(log_.warn()) << connectionMetadata.tag() << "Could not create Web context: " << err; // we count all those as BadSyntax - as the WS path would. // Although over HTTP these will yield a 400 status with a plain text response (for most). rpcEngine_->notifyBadSyntax(); - web::impl::ErrorHelper(connection, std::move(request)).sendError(err); - - return; + return impl::ErrorHelper(rawRequest, std::move(request)).makeError(err); } auto [result, timeDiff] = util::timed([&]() { return rpcEngine_->buildResponse(*context); }); - auto const us = std::chrono::duration(timeDiff); + auto us = std::chrono::duration(timeDiff); rpc::logDuration(request, context->tag(), us); boost::json::object response; if (!result.response.has_value()) { // note: error statuses are counted/notified in buildResponse itself - response = web::impl::ErrorHelper(connection, request).composeError(result.response.error()); + response = impl::ErrorHelper(rawRequest, request).composeError(result.response.error()); auto const responseStr = boost::json::serialize(response); LOG(perfLog_.debug()) << context->tag() << "Encountered error: " << responseStr; @@ -237,7 +285,7 @@ private: // if the result is forwarded - just use it as is // if forwarded request has error, for http, error should be in "result"; for ws, error should // be at top - if (isForwarded && (json.contains(JS(result)) || connection->upgraded)) { + if (isForwarded && (json.contains(JS(result)) || connectionMetadata.wasUpgraded())) { for (auto const& [k, v] : json) response.insert_or_assign(k, v); } else { @@ -249,7 +297,7 @@ private: // for ws there is an additional field "status" in the response, // otherwise the "status" is in the "result" field - if (connection->upgraded) { + if (connectionMetadata.wasUpgraded()) { auto const appendFieldIfExist = [&](auto const& field) { if (request.contains(field) and not request.at(field).is_null()) response[field] = request.at(field); @@ -275,20 +323,51 @@ private: warnings.emplace_back(rpc::makeWarning(rpc::WarnRpcOutdated)); response["warnings"] = warnings; - connection->send(boost::json::serialize(response)); + return Response{boost::beast::http::status::ok, response, rawRequest}; } catch (std::exception const& ex) { // note: while we are catching this in buildResponse too, this is here to make sure // that any other code that may throw is outside of buildResponse is also worked around. - LOG(perfLog_.error()) << connection->tag() << "Caught exception: " << ex.what(); - LOG(log_.error()) << connection->tag() << "Caught exception: " << ex.what(); + LOG(perfLog_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what(); + LOG(log_.error()) << connectionMetadata.tag() << "Caught exception: " << ex.what(); rpcEngine_->notifyInternalError(); - web::impl::ErrorHelper(connection, std::move(request)).sendInternalError(); - - return; + return impl::ErrorHelper(rawRequest, std::move(request)).makeInternalError(); } } + static Response + makeSlowDownResponse(Request const& request, std::optional requestJson) + { + auto error = rpc::makeError(rpc::RippledError::rpcSLOW_DOWN); + + if (not request.isHttp()) { + try { + if (not requestJson.has_value()) { + requestJson = boost::json::parse(request.message()); + } + if (requestJson->is_object() && requestJson->as_object().contains("id")) + error["id"] = requestJson->as_object().at("id"); + error["request"] = request.message(); + } catch (std::exception const&) { + error["request"] = request.message(); + } + } + return web::Response{boost::beast::http::status::service_unavailable, error, request}; + } + + static boost::json::object + makeLoadWarning(Response const& response) + { + auto jsonResponse = boost::json::parse(response.message()).as_object(); + jsonResponse["warning"] = "load"; + if (jsonResponse.contains("warnings") && jsonResponse["warnings"].is_array()) { + jsonResponse["warnings"].as_array().push_back(rpc::makeWarning(rpc::WarnRpcRateLimit)); + } else { + jsonResponse["warnings"] = boost::json::array{rpc::makeWarning(rpc::WarnRpcRateLimit)}; + } + return jsonResponse; + } + bool shouldReplaceParams(boost::json::object const& req) const { diff --git a/src/web/ng/Request.cpp b/src/web/Request.cpp similarity index 98% rename from src/web/ng/Request.cpp rename to src/web/Request.cpp index 9bf1c9a4..a9a7385c 100644 --- a/src/web/ng/Request.cpp +++ b/src/web/Request.cpp @@ -17,7 +17,7 @@ */ //============================================================================== -#include "web/ng/Request.hpp" +#include "web/Request.hpp" #include "util/OverloadSet.hpp" @@ -33,7 +33,7 @@ #include #include -namespace web::ng { +namespace web { namespace { @@ -142,4 +142,4 @@ Request::httpRequest() const return std::get(data_); } -} // namespace web::ng +} // namespace web diff --git a/src/web/ng/Request.hpp b/src/web/Request.hpp similarity index 99% rename from src/web/ng/Request.hpp rename to src/web/Request.hpp index 55f4ff56..5ece22ca 100644 --- a/src/web/ng/Request.hpp +++ b/src/web/Request.hpp @@ -29,7 +29,7 @@ #include #include -namespace web::ng { +namespace web { /** * @brief Represents an HTTP or WebSocket request. @@ -150,4 +150,4 @@ private: httpRequest() const; }; -} // namespace web::ng +} // namespace web diff --git a/src/web/ng/Response.cpp b/src/web/Response.cpp similarity index 97% rename from src/web/ng/Response.cpp rename to src/web/Response.cpp index 94f40278..08b4cb20 100644 --- a/src/web/ng/Response.cpp +++ b/src/web/Response.cpp @@ -17,13 +17,13 @@ */ //============================================================================== -#include "web/ng/Response.hpp" +#include "web/Response.hpp" #include "util/Assert.hpp" #include "util/OverloadSet.hpp" #include "util/build/Build.hpp" -#include "web/ng/Connection.hpp" -#include "web/ng/Request.hpp" +#include "web/Connection.hpp" +#include "web/Request.hpp" #include #include @@ -43,7 +43,7 @@ namespace http = boost::beast::http; -namespace web::ng { +namespace web { namespace { @@ -193,4 +193,4 @@ Response::asWsResponse() const& return boost::asio::buffer(message.data(), message.size()); } -} // namespace web::ng +} // namespace web diff --git a/src/web/ng/Response.hpp b/src/web/Response.hpp similarity index 98% rename from src/web/ng/Response.hpp rename to src/web/Response.hpp index 06324951..7deeeb74 100644 --- a/src/web/ng/Response.hpp +++ b/src/web/Response.hpp @@ -19,7 +19,7 @@ #pragma once -#include "web/ng/Request.hpp" +#include "web/Request.hpp" #include #include @@ -30,7 +30,7 @@ #include #include -namespace web::ng { +namespace web { class Connection; @@ -133,4 +133,4 @@ public: asWsResponse() const&; }; -} // namespace web::ng +} // namespace web diff --git a/src/web/ng/Server.cpp b/src/web/Server.cpp similarity index 97% rename from src/web/ng/Server.cpp rename to src/web/Server.cpp index a8bb6477..4790295c 100644 --- a/src/web/ng/Server.cpp +++ b/src/web/Server.cpp @@ -17,19 +17,19 @@ */ //============================================================================== -#include "web/ng/Server.hpp" +#include "web/Server.hpp" #include "util/Assert.hpp" #include "util/Taggable.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/config/ObjectView.hpp" #include "util/log/Logger.hpp" -#include "web/ng/Connection.hpp" -#include "web/ng/MessageHandler.hpp" -#include "web/ng/ProcessingPolicy.hpp" -#include "web/ng/Response.hpp" -#include "web/ng/impl/HttpConnection.hpp" -#include "web/ng/impl/ServerSslContext.hpp" +#include "web/Connection.hpp" +#include "web/MessageHandler.hpp" +#include "web/ProcessingPolicy.hpp" +#include "web/Response.hpp" +#include "web/impl/HttpConnection.hpp" +#include "web/impl/ServerSslContext.hpp" #include #include @@ -54,7 +54,7 @@ #include #include -namespace web::ng { +namespace web { namespace { @@ -316,7 +316,7 @@ Server::handleConnection(boost::asio::ip::tcp::socket socket, boost::asio::yield boost::asio::spawn( ctx_.get(), [connection = std::move(connectionExpected).value()](boost::asio::yield_context yield) { - web::ng::impl::ConnectionHandler::stopConnection(*connection, yield); + web::impl::ConnectionHandler::stopConnection(*connection, yield); } ); return; @@ -382,4 +382,4 @@ makeServer( }; } -} // namespace web::ng +} // namespace web diff --git a/src/web/Server.hpp b/src/web/Server.hpp index b480b322..180d7428 100644 --- a/src/web/Server.hpp +++ b/src/web/Server.hpp @@ -1,7 +1,7 @@ //------------------------------------------------------------------------------ /* This file is part of clio: https://github.com/XRPLF/clio - Copyright (c) 2023, the clio developers. + Copyright (c) 2024, the clio developers. Permission to use, copy, modify, and distribute this software for any purpose with or without fee is hereby granted, provided that the above @@ -20,363 +20,173 @@ #pragma once #include "util/Taggable.hpp" +#include "util/config/ConfigDefinition.hpp" #include "util/log/Logger.hpp" -#include "web/AdminVerificationStrategy.hpp" -#include "web/HttpSession.hpp" -#include "web/SslHttpSession.hpp" -#include "web/dosguard/DOSGuardInterface.hpp" -#include "web/interface/Concepts.hpp" -#include "web/ng/impl/ServerSslContext.hpp" +#include "web/Connection.hpp" +#include "web/MessageHandler.hpp" +#include "web/ProcessingPolicy.hpp" +#include "web/Response.hpp" +#include "web/impl/ConnectionHandler.hpp" #include -#include #include -#include +#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include #include -#include #include -#include #include -#include -/** - * @brief This namespace implements the web server and related components. - * - * The web server is leveraging the power of `boost::asio` with it's coroutine support thru `boost::asio::yield_context` - * and `boost::asio::spawn`. - * - * Majority of the code is based on examples that came with boost. - */ namespace web { /** - * @brief The Detector class to detect if the connection is a ssl or not. - * - * If it is an SSL connection, the Detector will pass the ownership of the socket to SslSessionType, otherwise to - * PlainSessionType. - * - * @tparam PlainSessionType The plain session type - * @tparam SslSessionType The SSL session type - * @tparam HandlerType The executor to handle the requests + * @brief A tag class for server to help identify Server in templated code. */ -template < - template class PlainSessionType, - template class SslSessionType, - SomeServerHandler HandlerType> -class Detector : public std::enable_shared_from_this> { - using std::enable_shared_from_this>::shared_from_this; - - util::Logger log_{"WebServer"}; - boost::beast::tcp_stream stream_; - std::optional> ctx_; - std::reference_wrapper tagFactory_; - std::reference_wrapper const dosGuard_; - std::shared_ptr const handler_; - boost::beast::flat_buffer buffer_; - std::shared_ptr const adminVerification_; - std::uint32_t maxWsSendingQueueSize_; - -public: - /** - * @brief Create a new detector. - * - * @param socket The socket. Ownership is transferred - * @param ctx The SSL context if any - * @param tagFactory A factory that is used to generate tags to track requests and sessions - * @param dosGuard The denial of service guard to use - * @param handler The server handler to use - * @param adminVerification The admin verification strategy to use - * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket - */ - Detector( - tcp::socket&& socket, - std::optional> ctx, - std::reference_wrapper tagFactory, - std::reference_wrapper dosGuard, - std::shared_ptr handler, - std::shared_ptr adminVerification, - std::uint32_t maxWsSendingQueueSize - ) - : stream_(std::move(socket)) - , ctx_(ctx) - , tagFactory_(std::cref(tagFactory)) - , dosGuard_(dosGuard) - , handler_(std::move(handler)) - , adminVerification_(std::move(adminVerification)) - , maxWsSendingQueueSize_(maxWsSendingQueueSize) - { - } - - /** - * @brief A helper function that is called when any error occurs. - * - * @param ec The error code - * @param message The message to include in the log - */ - void - fail(boost::system::error_code ec, char const* message) - { - if (ec == boost::asio::ssl::error::stream_truncated) - return; - - LOG(log_.info()) << "Detector failed (" << message << "): " << ec.message(); - } - - /** @brief Initiate the detection. */ - void - run() - { - boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30)); - async_detect_ssl(stream_, buffer_, boost::beast::bind_front_handler(&Detector::onDetect, shared_from_this())); - } - - /** - * @brief Handles detection result. - * - * @param ec The error code - * @param result true if SSL is detected; false otherwise - */ - void - onDetect(boost::beast::error_code ec, bool result) - { - if (ec) - return fail(ec, "detect"); - - std::string ip; - try { - ip = stream_.socket().remote_endpoint().address().to_string(); - } catch (std::exception const&) { - return fail(ec, "cannot get remote endpoint"); - } - - if (result) { - if (!ctx_) - return fail(ec, "SSL is not supported by this server"); - - std::make_shared>( - stream_.release_socket(), - ip, - adminVerification_, - *ctx_, - tagFactory_, - dosGuard_, - handler_, - std::move(buffer_), - maxWsSendingQueueSize_ - ) - ->run(); - return; - } - - std::make_shared>( - stream_.release_socket(), - ip, - adminVerification_, - tagFactory_, - dosGuard_, - handler_, - std::move(buffer_), - maxWsSendingQueueSize_ - ) - ->run(); - } +struct ServerTag { + virtual ~ServerTag() = default; }; -/** - * @brief The WebServer class. It creates server socket and start listening on it. - * - * Once there is client connection, it will accept it and pass the socket to Detector to detect ssl or plain. - * - * @tparam PlainSessionType The plain session to handle non-ssl connection. - * @tparam SslSessionType The SSL session to handle SSL connection. - * @tparam HandlerType The handler to process the request and return response. - */ -template < - template class PlainSessionType, - template class SslSessionType, - SomeServerHandler HandlerType> -class Server : public std::enable_shared_from_this> { - using std::enable_shared_from_this>::shared_from_this; +template +concept SomeServer = std::derived_from; +/** + * @brief Web server class. + */ +class Server : public ServerTag { +public: + /** + * @brief Check to perform for each new client connection. The check takes client ip as input and returns a Response + * if the check failed. Response will be sent to the client and the connection will be closed. + */ + using OnConnectCheck = std::function(Connection const&)>; + + /** + * @brief Hook called when any connection disconnects + */ + using OnDisconnectHook = impl::ConnectionHandler::OnDisconnectHook; + +private: util::Logger log_{"WebServer"}; - std::reference_wrapper ioc_; - std::optional ctx_; - util::TagDecoratorFactory tagFactory_; - std::reference_wrapper dosGuard_; - std::shared_ptr handler_; - tcp::acceptor acceptor_; - std::shared_ptr adminVerification_; - std::uint32_t maxWsSendingQueueSize_; + util::Logger perfLog_{"Performance"}; + + std::reference_wrapper ctx_; + std::optional sslContext_; + + util::TagDecoratorFactory tagDecoratorFactory_; + + impl::ConnectionHandler connectionHandler_; + boost::asio::ip::tcp::endpoint endpoint_; + + OnConnectCheck onConnectCheck_; + + bool running_{false}; public: /** - * @brief Create a new instance of the web server. + * @brief Construct a new Server object. * - * @param ioc The io_context to run the server on - * @param ctx The SSL context if any - * @param endpoint The endpoint to listen on - * @param tagFactory A factory that is used to generate tags to track requests and sessions - * @param dosGuard The denial of service guard to use - * @param handler The server handler to use - * @param adminVerification The admin verification strategy to use - * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket + * @param ctx The boost::asio::io_context to use. + * @param endpoint The endpoint to listen on. + * @param sslContext The SSL context to use (optional). + * @param processingPolicy The requests processing policy (parallel or sequential). + * @param parallelRequestLimit The limit of requests for one connection that can be processed in parallel. Only used + * if processingPolicy is parallel. + * @param tagDecoratorFactory The tag decorator factory. + * @param maxSubscriptionSendQueueSize The maximum size of the subscription send queue. + * @param onConnectCheck The check to perform on each connection. + * @param onDisconnectHook The hook to call on each disconnection. */ Server( - boost::asio::io_context& ioc, - std::optional ctx, - tcp::endpoint endpoint, - util::TagDecoratorFactory tagFactory, - dosguard::DOSGuardInterface& dosGuard, - std::shared_ptr handler, - std::shared_ptr adminVerification, - std::uint32_t maxWsSendingQueueSize - ) - : ioc_(std::ref(ioc)) - , ctx_(std::move(ctx)) - , tagFactory_(tagFactory) - , dosGuard_(std::ref(dosGuard)) - , handler_(std::move(handler)) - , acceptor_(boost::asio::make_strand(ioc)) - , adminVerification_(std::move(adminVerification)) - , maxWsSendingQueueSize_(maxWsSendingQueueSize) - { - boost::beast::error_code ec; + boost::asio::io_context& ctx, + boost::asio::ip::tcp::endpoint endpoint, + std::optional sslContext, + ProcessingPolicy processingPolicy, + std::optional parallelRequestLimit, + util::TagDecoratorFactory tagDecoratorFactory, + std::optional maxSubscriptionSendQueueSize, + OnConnectCheck onConnectCheck, + OnDisconnectHook onDisconnectHook + ); - acceptor_.open(endpoint.protocol(), ec); - if (ec) - return; + /** + * @brief Copy constructor is deleted. The Server couldn't be copied. + */ + Server(Server const&) = delete; - acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec); - if (ec) - return; + /** + * @brief Move constructor is deleted because connectionHandler_ contains references to some fields of the Server. + */ + Server(Server&&) = delete; - acceptor_.bind(endpoint, ec); - if (ec) { - LOG(log_.error()) << "Failed to bind to endpoint: " << endpoint << ". message: " << ec.message(); - throw std::runtime_error( - fmt::format("Failed to bind to endpoint: {}:{}", endpoint.address().to_string(), endpoint.port()) - ); - } - - acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); - if (ec) { - LOG(log_.error()) << "Failed to listen at endpoint: " << endpoint << ". message: " << ec.message(); - throw std::runtime_error( - fmt::format("Failed to listen at endpoint: {}:{}", endpoint.address().to_string(), endpoint.port()) - ); - } - } - - /** @brief Start accepting incoming connections. */ + /** + * @brief Set handler for GET requests. + * @note This method can't be called after run() is called. + * + * @param target The target of the request. + * @param handler The handler to set. + */ void - run() - { - doAccept(); - } + onGet(std::string const& target, MessageHandler handler); + + /** + * @brief Set handler for POST requests. + * @note This method can't be called after run() is called. + * + * @param target The target of the request. + * @param handler The handler to set. + */ + void + onPost(std::string const& target, MessageHandler handler); + + /** + * @brief Set handler for WebSocket requests. + * @note This method can't be called after run() is called. + * + * @param handler The handler to set. + */ + void + onWs(MessageHandler handler); + + /** + * @brief Run the server. + * + * @return std::nullopt if the server started successfully, otherwise an error message. + */ + std::optional + run(); + + /** + * @brief Stop the server. This method will asynchronously sleep unless all the users are disconnected. + * @note Stopping the server cause graceful shutdown of all connections. And rejecting new connections. + * + * @param yield The coroutine context. + */ + void + stop(boost::asio::yield_context yield); private: void - doAccept() - { - acceptor_.async_accept( - boost::asio::make_strand(ioc_.get()), - boost::beast::bind_front_handler(&Server::onAccept, shared_from_this()) - ); - } - - void - onAccept(boost::beast::error_code ec, tcp::socket socket) - { - if (!ec) { - auto ctxRef = - ctx_ ? std::optional>{ctx_.value()} : std::nullopt; - - std::make_shared>( - std::move(socket), - ctxRef, - std::cref(tagFactory_), - dosGuard_, - handler_, - adminVerification_, - maxWsSendingQueueSize_ - ) - ->run(); - } - - doAccept(); - } + handleConnection(boost::asio::ip::tcp::socket socket, boost::asio::yield_context yield); }; -/** @brief The final type of the HttpServer used by Clio. */ -template -using HttpServer = Server; - /** - * @brief A factory function that spawns a ready to use HTTP server. + * @brief Create a new Server. * - * @tparam HandlerType The type of handler to process the request - * @param config The config to create server - * @param ioc The server will run under this io_context - * @param dosGuard The dos guard to protect the server - * @param handler The handler to process the request - * @return The server instance + * @param config The configuration. + * @param onConnectCheck The check to perform on each client connection. + * @param onDisconnectHook The hook to call when client disconnects. + * @param context The boost::asio::io_context to use. + * + * @return The Server or an error message. */ -template -static std::shared_ptr> -makeHttpServer( +std::expected +makeServer( util::config::ClioConfigDefinition const& config, - boost::asio::io_context& ioc, - dosguard::DOSGuardInterface& dosGuard, - std::shared_ptr const& handler -) -{ - static util::Logger const log{"WebServer"}; // NOLINT(readability-identifier-naming) - - auto expectedSslContext = ng::impl::makeServerSslContext(config); - if (not expectedSslContext) { - LOG(log.error()) << "Failed to create SSL context: " << expectedSslContext.error(); - return nullptr; - } - - auto const serverConfig = config.getObject("server"); - auto const address = boost::asio::ip::make_address(serverConfig.get("ip")); - auto const port = serverConfig.get("port"); - - auto expectedAdminVerification = makeAdminVerificationStrategy(config); - if (not expectedAdminVerification.has_value()) { - LOG(log.error()) << expectedAdminVerification.error(); - throw std::logic_error{expectedAdminVerification.error()}; - } - - // If the transactions number is 200 per ledger, A client which subscribes everything will send 400+ feeds for - // each ledger. we allow user delay 3 ledgers by default - auto const maxWsSendingQueueSize = serverConfig.get("ws_max_sending_queue_size"); - - auto server = std::make_shared>( - ioc, - std::move(expectedSslContext).value(), - boost::asio::ip::tcp::endpoint{address, port}, - util::TagDecoratorFactory(config), - dosGuard, - handler, - std::move(expectedAdminVerification).value(), - maxWsSendingQueueSize - ); - - server->run(); - return server; -} + Server::OnConnectCheck onConnectCheck, + Server::OnDisconnectHook onDisconnectHook, + boost::asio::io_context& context +); } // namespace web diff --git a/src/web/SslHttpSession.hpp b/src/web/SslHttpSession.hpp deleted file mode 100644 index 43ee85e0..00000000 --- a/src/web/SslHttpSession.hpp +++ /dev/null @@ -1,188 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of clio: https://github.com/XRPLF/clio - Copyright (c) 2023, the clio developers. - - Permission to use, copy, modify, and distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#pragma once - -#include "util/Taggable.hpp" -#include "web/AdminVerificationStrategy.hpp" -#include "web/SslWsSession.hpp" -#include "web/dosguard/DOSGuardInterface.hpp" -#include "web/impl/HttpBase.hpp" -#include "web/interface/Concepts.hpp" -#include "web/interface/ConnectionBase.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -namespace web { - -using tcp = boost::asio::ip::tcp; - -/** - * @brief Represents a HTTPS connection established by a client. - * - * It will handle the upgrade to secure websocket, pass the ownership of the socket to the upgrade session. - * Otherwise, it will pass control to the base class. - * - * @tparam HandlerType The type of the server handler to use - */ -template -class SslHttpSession : public impl::HttpBase, - public std::enable_shared_from_this> { - boost::beast::ssl_stream stream_; - std::reference_wrapper tagFactory_; - std::uint32_t maxWsSendingQueueSize_; - -public: - /** - * @brief Create a new SSL session. - * - * @param socket The socket. Ownership is transferred to HttpSession - * @param ip Client's IP address - * @param adminVerification The admin verification strategy to use - * @param ctx The SSL context - * @param tagFactory A factory that is used to generate tags to track requests and sessions - * @param dosGuard The denial of service guard to use - * @param handler The server handler to use - * @param buffer Buffer with initial data received from the peer - * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket - */ - explicit SslHttpSession( - tcp::socket&& socket, - std::string const& ip, - std::shared_ptr const& adminVerification, - boost::asio::ssl::context& ctx, - std::reference_wrapper tagFactory, - std::reference_wrapper dosGuard, - std::shared_ptr const& handler, - boost::beast::flat_buffer buffer, - std::uint32_t maxWsSendingQueueSize - ) - : impl::HttpBase( - ip, - tagFactory, - adminVerification, - dosGuard, - handler, - std::move(buffer) - ) - , stream_(std::move(socket), ctx) - , tagFactory_(tagFactory) - , maxWsSendingQueueSize_(maxWsSendingQueueSize) - { - } - - ~SslHttpSession() override = default; - - /** @return The SSL stream. */ - boost::beast::ssl_stream& - stream() - { - return stream_; - } - - /** @brief Initiates the handshake. */ - void - run() - { - auto self = this->shared_from_this(); - boost::asio::dispatch(stream_.get_executor(), [self]() { - // Set the timeout. - boost::beast::get_lowest_layer(self->stream()).expires_after(std::chrono::seconds(30)); - - // Perform the SSL handshake - // Note, this is the buffered version of the handshake. - self->stream_.async_handshake( - boost::asio::ssl::stream_base::server, - self->buffer_.data(), - boost::beast::bind_front_handler(&SslHttpSession::onHandshake, self) - ); - }); - } - - /** - * @brief Handles the handshake. - * - * @param ec Error code if any - * @param bytesUsed The total amount of data read from the stream - */ - void - onHandshake(boost::beast::error_code ec, std::size_t bytesUsed) - { - if (ec) - return this->httpFail(ec, "handshake"); - - this->buffer_.consume(bytesUsed); - this->doRead(); - } - - /** @brief Closes the underlying connection. */ - void - doClose() - { - boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30)); - stream_.async_shutdown(boost::beast::bind_front_handler(&SslHttpSession::onShutdown, this->shared_from_this())); - } - - /** - * @brief Handles a connection shutdown. - * - * @param ec Error code if any - */ - void - onShutdown(boost::beast::error_code ec) - { - if (ec) - return this->httpFail(ec, "shutdown"); - // At this point the connection is closed gracefully - } - - /** @brief Upgrades connection to secure websocket. */ - void - upgrade() - { - std::make_shared>( - std::move(stream_), - this->clientIp, - tagFactory_, - this->dosGuard_, - this->handler_, - std::move(this->buffer_), - std::move(this->req_), - ConnectionBase::isAdmin(), - maxWsSendingQueueSize_ - ) - ->run(); - } -}; -} // namespace web diff --git a/src/web/SslWsSession.hpp b/src/web/SslWsSession.hpp deleted file mode 100644 index d5b317db..00000000 --- a/src/web/SslWsSession.hpp +++ /dev/null @@ -1,208 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of clio: https://github.com/XRPLF/clio - Copyright (c) 2023, the clio developers. - - Permission to use, copy, modify, and distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#pragma once - -#include "util/Taggable.hpp" -#include "web/dosguard/DOSGuardInterface.hpp" -#include "web/impl/WsBase.hpp" -#include "web/interface/ConnectionBase.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -namespace web { - -/** - * @brief Represents a secure websocket session. - * - * Majority of the operations are handled by the base class. - */ -template -class SslWsSession : public impl::WsBase { - using StreamType = boost::beast::websocket::stream>; - StreamType ws_; - -public: - /** - * @brief Create a new non-secure websocket session. - * - * @param stream The SSL stream. Ownership is transferred - * @param ip Client's IP address - * @param tagFactory A factory that is used to generate tags to track requests and sessions - * @param dosGuard The denial of service guard to use - * @param handler The server handler to use - * @param buffer Buffer with initial data received from the peer - * @param isAdmin Whether the connection has admin privileges - * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket - */ - explicit SslWsSession( - boost::beast::ssl_stream&& stream, - std::string ip, - std::reference_wrapper tagFactory, - std::reference_wrapper dosGuard, - std::shared_ptr const& handler, - boost::beast::flat_buffer&& buffer, - bool isAdmin, - std::uint32_t maxWsSendingQueueSize - ) - : impl::WsBase( - ip, - tagFactory, - dosGuard, - handler, - std::move(buffer), - maxWsSendingQueueSize - ) - , ws_(std::move(stream)) - { - ConnectionBase::isAdmin_ = isAdmin; // NOLINT(cppcoreguidelines-prefer-member-initializer) - } - - /** @return The secure websocket stream. */ - StreamType& - ws() - { - return ws_; - } -}; - -/** - * @brief The HTTPS upgrader class, upgrade from an HTTPS session to a secure websocket session. - * - * Pass the stream to the session class after upgrade. - */ -template -class SslWsUpgrader : public std::enable_shared_from_this> { - using std::enable_shared_from_this>::shared_from_this; - - boost::beast::ssl_stream https_; - boost::optional> parser_; - boost::beast::flat_buffer buffer_; - std::string ip_; - std::reference_wrapper tagFactory_; - std::reference_wrapper dosGuard_; - std::shared_ptr const handler_; - http::request req_; - bool isAdmin_; - std::uint32_t maxWsSendingQueueSize_; - -public: - /** - * @brief Create a new upgrader to secure websocket. - * - * @param stream The SSL stream. Ownership is transferred - * @param ip Client's IP address - * @param tagFactory A factory that is used to generate tags to track requests and sessions - * @param dosGuard The denial of service guard to use - * @param handler The server handler to use - * @param buffer Buffer with initial data received from the peer. Ownership is transferred - * @param request The request. Ownership is transferred - * @param isAdmin Whether the connection has admin privileges - * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket - */ - SslWsUpgrader( - boost::beast::ssl_stream stream, - std::string ip, - std::reference_wrapper tagFactory, - std::reference_wrapper dosGuard, - std::shared_ptr handler, - boost::beast::flat_buffer&& buffer, - http::request request, - bool isAdmin, - std::uint32_t maxWsSendingQueueSize - ) - : https_(std::move(stream)) - , buffer_(std::move(buffer)) - , ip_(std::move(ip)) - , tagFactory_(tagFactory) - , dosGuard_(dosGuard) - , handler_(std::move(handler)) - , req_(std::move(request)) - , isAdmin_(isAdmin) - , maxWsSendingQueueSize_(maxWsSendingQueueSize) - { - } - - ~SslWsUpgrader() = default; - - /** @brief Initiate the upgrade. */ - void - run() - { - boost::beast::get_lowest_layer(https_).expires_after(std::chrono::seconds(30)); - - boost::asio::dispatch( - https_.get_executor(), - boost::beast::bind_front_handler(&SslWsUpgrader::doUpgrade, shared_from_this()) - ); - } - -private: - void - doUpgrade() - { - parser_.emplace(); - - // Apply a reasonable limit to the allowed size of the body in bytes to prevent abuse. - static constexpr auto kMAX_BODY_SIZE = 10000; - parser_->body_limit(kMAX_BODY_SIZE); - - boost::beast::get_lowest_layer(https_).expires_after(std::chrono::seconds(30)); - onUpgrade(); - } - - void - onUpgrade() - { - if (!boost::beast::websocket::is_upgrade(req_)) - return; - - // Disable the timeout. The websocket::stream uses its own timeout settings. - boost::beast::get_lowest_layer(https_).expires_never(); - - std::make_shared>( - std::move(https_), - ip_, - tagFactory_, - dosGuard_, - handler_, - std::move(buffer_), - isAdmin_, - maxWsSendingQueueSize_ - ) - ->run(std::move(req_)); - } -}; -} // namespace web diff --git a/src/web/SubscriptionContext.cpp b/src/web/SubscriptionContext.cpp index d831d1a5..12dc1676 100644 --- a/src/web/SubscriptionContext.cpp +++ b/src/web/SubscriptionContext.cpp @@ -21,10 +21,14 @@ #include "util/Taggable.hpp" #include "web/SubscriptionContextInterface.hpp" -#include "web/interface/ConnectionBase.hpp" +#include +#include + +#include #include #include +#include #include #include @@ -32,22 +36,39 @@ namespace web { SubscriptionContext::SubscriptionContext( util::TagDecoratorFactory const& factory, - std::shared_ptr connection + impl::WsConnectionBase& connection, + std::optional maxSendQueueSize, + boost::asio::yield_context yield, + ErrorHandler errorHandler ) - : SubscriptionContextInterface{factory}, connection_{connection} + : web::SubscriptionContextInterface(factory) + , connection_(connection) + , maxSendQueueSize_(maxSendQueueSize) + , tasksGroup_(yield) + , yield_(yield) + , errorHandler_(std::move(errorHandler)) { } -SubscriptionContext::~SubscriptionContext() -{ - onDisconnect_(this); -} - void SubscriptionContext::send(std::shared_ptr message) { - if (auto connection = connection_.lock(); connection != nullptr) - connection->send(std::move(message)); + if (disconnected_) + return; + + if (maxSendQueueSize_.has_value() and tasksGroup_.size() >= *maxSendQueueSize_) { + tasksGroup_.spawn(yield_, [this](boost::asio::yield_context innerYield) { + connection_.get().close(innerYield); + }); + disconnected_ = true; + return; + } + + tasksGroup_.spawn(yield_, [this, message = std::move(message)](boost::asio::yield_context innerYield) { + auto const maybeError = connection_.get().sendBuffer(boost::asio::buffer(*message), innerYield); + if (maybeError.has_value() and errorHandler_(*maybeError, connection_)) + connection_.get().close(innerYield); + }); } void @@ -59,13 +80,21 @@ SubscriptionContext::onDisconnect(OnDisconnectSlot const& slot) void SubscriptionContext::setApiSubversion(uint32_t value) { - apiSubVersion_ = value; + apiSubversion_ = value; } uint32_t SubscriptionContext::apiSubversion() const { - return apiSubVersion_; + return apiSubversion_; +} + +void +SubscriptionContext::disconnect(boost::asio::yield_context yield) +{ + onDisconnect_(this); + disconnected_ = true; + tasksGroup_.asyncWait(yield); } } // namespace web diff --git a/src/web/SubscriptionContext.hpp b/src/web/SubscriptionContext.hpp index 2c64fa43..863c9f68 100644 --- a/src/web/SubscriptionContext.hpp +++ b/src/web/SubscriptionContext.hpp @@ -19,32 +19,55 @@ #pragma once +#include "util/CoroutineGroup.hpp" #include "util/Taggable.hpp" +#include "web/Connection.hpp" +#include "web/Error.hpp" #include "web/SubscriptionContextInterface.hpp" -#include "web/interface/Concepts.hpp" -#include "web/interface/ConnectionBase.hpp" +#include "web/impl/WsConnection.hpp" +#include +#include #include #include +#include #include +#include #include +#include #include namespace web { /** - * @brief A context of a WsBase connection for subscriptions. + * @brief Implementation of SubscriptionContextInterface. + * @note This class is designed to be used with SubscriptionManager. The class is safe to use from multiple threads. + * The method disconnect() must be called before the object is destroyed. */ -class SubscriptionContext : public SubscriptionContextInterface { - std::weak_ptr connection_; +class SubscriptionContext : public web::SubscriptionContextInterface { +public: + /** + * @brief Error handler definition. Error handler returns true if connection should be closed false otherwise. + */ + using ErrorHandler = std::function; + +private: + std::reference_wrapper connection_; + std::optional maxSendQueueSize_; + util::CoroutineGroup tasksGroup_; + boost::asio::yield_context yield_; + ErrorHandler errorHandler_; + boost::signals2::signal onDisconnect_; + std::atomic_bool disconnected_{false}; + /** * @brief The API version of the web stream client. * This is used to track the api version of this connection, which mainly is used by subscription. It is different * from the api version in Context, which is only used for the current request. */ - std::atomic_uint32_t apiSubVersion_ = 0; + std::atomic_uint32_t apiSubversion_ = 0u; public: /** @@ -52,17 +75,21 @@ public: * * @param factory The tag decorator factory to use to init taggable. * @param connection The connection for which the context is created. + * @param maxSendQueueSize The maximum size of the send queue. If the queue is full, the connection will be closed. + * @param yield The yield context to spawn sending coroutines. + * @param errorHandler The error handler. */ - SubscriptionContext(util::TagDecoratorFactory const& factory, std::shared_ptr connection); - - /** - * @brief Destroy the Subscription Context object - */ - ~SubscriptionContext() override; + SubscriptionContext( + util::TagDecoratorFactory const& factory, + impl::WsConnectionBase& connection, + std::optional maxSendQueueSize, + boost::asio::yield_context yield, + ErrorHandler errorHandler + ); /** * @brief Send message to the client - * @note This method will not do anything if the related connection got disconnected. + * @note This method does nothing after disconnected() was called. * * @param message The message to send. */ @@ -91,6 +118,15 @@ public: */ uint32_t apiSubversion() const override; + + /** + * @brief Notify the context that related connection is disconnected and wait for all the task to complete. + * @note This method must be called before the object is destroyed. + * + * @param yield The yield context to wait for all the tasks to complete. + */ + void + disconnect(boost::asio::yield_context yield); }; } // namespace web diff --git a/src/web/ng/impl/Concepts.hpp b/src/web/impl/Concepts.hpp similarity index 96% rename from src/web/ng/impl/Concepts.hpp rename to src/web/impl/Concepts.hpp index 7c0985d4..5b27189f 100644 --- a/src/web/ng/impl/Concepts.hpp +++ b/src/web/impl/Concepts.hpp @@ -24,7 +24,7 @@ #include -namespace web::ng::impl { +namespace web::impl { template concept IsTcpStream = std::is_same_v, boost::beast::tcp_stream>; @@ -32,4 +32,4 @@ concept IsTcpStream = std::is_same_v, boost::beast::tcp_stream>; template concept IsSslTcpStream = std::is_same_v, boost::asio::ssl::stream>; -} // namespace web::ng::impl +} // namespace web::impl diff --git a/src/web/ng/impl/ConnectionHandler.cpp b/src/web/impl/ConnectionHandler.cpp similarity index 97% rename from src/web/ng/impl/ConnectionHandler.cpp rename to src/web/impl/ConnectionHandler.cpp index 838c4bfb..f6e08e2f 100644 --- a/src/web/ng/impl/ConnectionHandler.cpp +++ b/src/web/impl/ConnectionHandler.cpp @@ -17,20 +17,20 @@ */ //============================================================================== -#include "web/ng/impl/ConnectionHandler.hpp" +#include "web/impl/ConnectionHandler.hpp" #include "util/Assert.hpp" #include "util/CoroutineGroup.hpp" #include "util/Taggable.hpp" #include "util/log/Logger.hpp" +#include "web/Connection.hpp" +#include "web/Error.hpp" +#include "web/MessageHandler.hpp" +#include "web/ProcessingPolicy.hpp" +#include "web/Request.hpp" +#include "web/Response.hpp" +#include "web/SubscriptionContext.hpp" #include "web/SubscriptionContextInterface.hpp" -#include "web/ng/Connection.hpp" -#include "web/ng/Error.hpp" -#include "web/ng/MessageHandler.hpp" -#include "web/ng/ProcessingPolicy.hpp" -#include "web/ng/Request.hpp" -#include "web/ng/Response.hpp" -#include "web/ng/SubscriptionContext.hpp" #include #include @@ -47,7 +47,7 @@ #include #include -namespace web::ng::impl { +namespace web::impl { namespace { @@ -387,4 +387,4 @@ ConnectionHandler::handleRequest( } } -} // namespace web::ng::impl +} // namespace web::impl diff --git a/src/web/ng/impl/ConnectionHandler.hpp b/src/web/impl/ConnectionHandler.hpp similarity index 95% rename from src/web/ng/impl/ConnectionHandler.hpp rename to src/web/impl/ConnectionHandler.hpp index e21b07bd..1c8f492d 100644 --- a/src/web/ng/impl/ConnectionHandler.hpp +++ b/src/web/impl/ConnectionHandler.hpp @@ -26,13 +26,13 @@ #include "util/prometheus/Gauge.hpp" #include "util/prometheus/Label.hpp" #include "util/prometheus/Prometheus.hpp" +#include "web/Connection.hpp" +#include "web/Error.hpp" +#include "web/MessageHandler.hpp" +#include "web/ProcessingPolicy.hpp" +#include "web/Request.hpp" +#include "web/Response.hpp" #include "web/SubscriptionContextInterface.hpp" -#include "web/ng/Connection.hpp" -#include "web/ng/Error.hpp" -#include "web/ng/MessageHandler.hpp" -#include "web/ng/ProcessingPolicy.hpp" -#include "web/ng/Request.hpp" -#include "web/ng/Response.hpp" #include #include @@ -47,7 +47,7 @@ #include #include -namespace web::ng::impl { +namespace web::impl { class ConnectionHandler { public: @@ -161,4 +161,4 @@ private: ); }; -} // namespace web::ng::impl +} // namespace web::impl diff --git a/src/web/ng/impl/ErrorHandling.cpp b/src/web/impl/ErrorHandling.cpp similarity index 97% rename from src/web/ng/impl/ErrorHandling.cpp rename to src/web/impl/ErrorHandling.cpp index 6e9a0540..785d98d7 100644 --- a/src/web/ng/impl/ErrorHandling.cpp +++ b/src/web/impl/ErrorHandling.cpp @@ -17,13 +17,13 @@ */ //============================================================================== -#include "web/ng/impl/ErrorHandling.hpp" +#include "web/impl/ErrorHandling.hpp" #include "rpc/Errors.hpp" #include "rpc/JS.hpp" #include "util/Assert.hpp" -#include "web/ng/Request.hpp" -#include "web/ng/Response.hpp" +#include "web/Request.hpp" +#include "web/Response.hpp" #include #include @@ -37,7 +37,7 @@ namespace http = boost::beast::http; -namespace web::ng::impl { +namespace web::impl { namespace { @@ -161,4 +161,4 @@ ErrorHelper::composeError(rpc::RippledError error) const return composeErrorImpl(error, rawRequest_, request_); } -} // namespace web::ng::impl +} // namespace web::impl diff --git a/src/web/impl/ErrorHandling.hpp b/src/web/impl/ErrorHandling.hpp index d200810c..1e5c650c 100644 --- a/src/web/impl/ErrorHandling.hpp +++ b/src/web/impl/ErrorHandling.hpp @@ -1,7 +1,7 @@ //------------------------------------------------------------------------------ /* This file is part of clio: https://github.com/XRPLF/clio - Copyright (c) 2023, the clio developers. + Copyright (c) 2024, the clio developers. Permission to use, copy, modify, and distribute this software for any purpose with or without fee is hereby granted, provided that the above @@ -20,9 +20,8 @@ #pragma once #include "rpc/Errors.hpp" -#include "rpc/JS.hpp" -#include "util/Assert.hpp" -#include "web/interface/ConnectionBase.hpp" +#include "web/Request.hpp" +#include "web/Response.hpp" #include #include @@ -31,11 +30,8 @@ #include #include -#include +#include #include -#include -#include -#include namespace web::impl { @@ -43,137 +39,76 @@ namespace web::impl { * @brief A helper that attempts to match rippled reporting mode HTTP errors as close as possible. */ class ErrorHelper { - std::shared_ptr connection_; + std::reference_wrapper rawRequest_; std::optional request_; public: - ErrorHelper( - std::shared_ptr const& connection, - std::optional request = std::nullopt - ) - : connection_{connection}, request_{std::move(request)} - { - } + /** + * @brief Construct a new Error Helper object + * + * @param rawRequest The request that caused the error. + * @param request The parsed request that caused the error. + */ + ErrorHelper(Request const& rawRequest, std::optional request = std::nullopt); - void - sendError(rpc::Status const& err) const - { - if (connection_->upgraded) { - connection_->send(boost::json::serialize(composeError(err))); - } else { - // Note: a collection of crutches to match rippled output follows - if (auto const clioCode = std::get_if(&err.code)) { - switch (*clioCode) { - case rpc::ClioError::RpcInvalidApiVersion: - connection_->send( - std::string{rpc::getErrorInfo(*clioCode).error}, boost::beast::http::status::bad_request - ); - break; - case rpc::ClioError::RpcCommandIsMissing: - connection_->send("Null method", boost::beast::http::status::bad_request); - break; - case rpc::ClioError::RpcCommandIsEmpty: - connection_->send("method is empty", boost::beast::http::status::bad_request); - break; - case rpc::ClioError::RpcCommandNotString: - connection_->send("method is not string", boost::beast::http::status::bad_request); - break; - case rpc::ClioError::RpcParamsUnparsable: - connection_->send("params unparsable", boost::beast::http::status::bad_request); - break; + /** + * @brief Make an error response from a status. + * + * @param err The status to make an error response from. + * @return + */ + [[nodiscard]] Response + makeError(rpc::Status const& err) const; - // others are not applicable but we want a compilation error next time we add one - case rpc::ClioError::RpcUnknownOption: - case rpc::ClioError::RpcMalformedCurrency: - case rpc::ClioError::RpcMalformedRequest: - case rpc::ClioError::RpcMalformedOwner: - case rpc::ClioError::RpcMalformedAddress: - case rpc::ClioError::RpcFieldNotFoundTransaction: - case rpc::ClioError::RpcMalformedOracleDocumentId: - case rpc::ClioError::RpcMalformedAuthorizedCredentials: - case rpc::ClioError::EtlConnectionError: - case rpc::ClioError::EtlRequestError: - case rpc::ClioError::EtlRequestTimeout: - case rpc::ClioError::EtlInvalidResponse: - ASSERT( - false, "Unknown rpc error code {}", static_cast(*clioCode) - ); // this should never happen - break; - } - } else { - connection_->send(boost::json::serialize(composeError(err)), boost::beast::http::status::bad_request); - } - } - } + /** + * @brief Make an internal error response. + * + * @return A response with an internal error. + */ + [[nodiscard]] Response + makeInternalError() const; - void - sendInternalError() const - { - connection_->send( - boost::json::serialize(composeError(rpc::RippledError::rpcINTERNAL)), - boost::beast::http::status::internal_server_error - ); - } + /** + * @brief Make a response for when the server is not ready. + * + * @return A response with a not ready error. + */ + [[nodiscard]] Response + makeNotReadyError() const; - void - sendNotReadyError() const - { - connection_->send( - boost::json::serialize(composeError(rpc::RippledError::rpcNOT_READY)), boost::beast::http::status::ok - ); - } + /** + * @brief Make a response for when the server is too busy. + * + * @return A response with a too busy error. + */ + [[nodiscard]] Response + makeTooBusyError() const; - void - sendTooBusyError() const - { - if (connection_->upgraded) { - connection_->send( - boost::json::serialize(rpc::makeError(rpc::RippledError::rpcTOO_BUSY)), boost::beast::http::status::ok - ); - } else { - connection_->send( - boost::json::serialize(rpc::makeError(rpc::RippledError::rpcTOO_BUSY)), - boost::beast::http::status::service_unavailable - ); - } - } + /** + * @brief Make a response when json parsing fails. + * + * @return A response with a json parsing error. + */ + [[nodiscard]] Response + makeJsonParsingError() const; - void - sendJsonParsingError() const - { - if (connection_->upgraded) { - connection_->send(boost::json::serialize(rpc::makeError(rpc::RippledError::rpcBAD_SYNTAX))); - } else { - connection_->send( - fmt::format("Unable to parse JSON from the request"), boost::beast::http::status::bad_request - ); - } - } + /** + * @brief Compose an error into json object from a status. + * + * @param error The status to compose into a json object. + * @return The composed json object. + */ + [[nodiscard]] boost::json::object + composeError(rpc::Status const& error) const; - boost::json::object - composeError(auto const& error) const - { - auto e = rpc::makeError(error); - - if (request_) { - auto const appendFieldIfExist = [&](auto const& field) { - if (request_->contains(field) and not request_->at(field).is_null()) - e[field] = request_->at(field); - }; - - appendFieldIfExist(JS(id)); - - if (connection_->upgraded) - appendFieldIfExist(JS(api_version)); - - e[JS(request)] = request_.value(); - } - - if (connection_->upgraded) { - return e; - } - return {{JS(result), e}}; - } + /** + * @brief Compose an error into json object from a rippled error. + * + * @param error The rippled error to compose into a json object. + * @return The composed json object. + */ + [[nodiscard]] boost::json::object + composeError(rpc::RippledError error) const; }; } // namespace web::impl diff --git a/src/web/impl/HttpBase.hpp b/src/web/impl/HttpBase.hpp deleted file mode 100644 index 5f3fa26c..00000000 --- a/src/web/impl/HttpBase.hpp +++ /dev/null @@ -1,327 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of clio: https://github.com/XRPLF/clio - Copyright (c) 2023, the clio developers. - - Permission to use, copy, modify, and distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#pragma once - -#include "rpc/Errors.hpp" -#include "util/Assert.hpp" -#include "util/Taggable.hpp" -#include "util/build/Build.hpp" -#include "util/log/Logger.hpp" -#include "util/prometheus/Http.hpp" -#include "web/AdminVerificationStrategy.hpp" -#include "web/SubscriptionContextInterface.hpp" -#include "web/dosguard/DOSGuardInterface.hpp" -#include "web/interface/Concepts.hpp" -#include "web/interface/ConnectionBase.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -namespace web::impl { - -static auto constexpr kHEALTH_CHECK_HTML = R"html( - - - Test page for Clio -

Clio Test

This page shows Clio http(s) connectivity is working.

- -)html"; - -using tcp = boost::asio::ip::tcp; - -/** - * @brief This is the implementation class for http sessions - * - * @tparam Derived The derived class - * @tparam HandlerType The handler class, will be called when a request is received. - */ -template