From 1c82d379d9716ceaff420d7744af6cd5eeb49f51 Mon Sep 17 00:00:00 2001 From: cyan317 <120398799+cindyyan317@users.noreply.github.com> Date: Fri, 25 Oct 2024 13:30:52 +0100 Subject: [PATCH] fix: Add queue size limit for websocket (#1701) For slow clients, we will disconnect with it if the message queue is too long. --------- Co-authored-by: Sergey Kuznetsov --- docs/examples/config/example-config.json | 6 +- src/web/HttpSession.hpp | 10 +++- src/web/PlainWsSession.hpp | 30 ++++++++-- src/web/Server.hpp | 40 +++++++++++-- src/web/SslHttpSession.hpp | 10 +++- src/web/SslWsSession.hpp | 29 ++++++++-- src/web/impl/WsBase.hpp | 32 +++++------ tests/unit/web/ServerTests.cpp | 71 +++++------------------- 8 files changed, 132 insertions(+), 96 deletions(-) diff --git a/docs/examples/config/example-config.json b/docs/examples/config/example-config.json index 6f3ff0c9..57969672 100644 --- a/docs/examples/config/example-config.json +++ b/docs/examples/config/example-config.json @@ -75,9 +75,9 @@ // For sequent policy request from one client connection will be processed one by one and the next one will not be read before // the previous one is processed. For parallel policy Clio will take all requests and process them in parallel and // send a reply for each request whenever it is ready. - "parallel_requests_limit": 10 // Optional parameter, used only if "processing_strategy" is "parallel". - It limits the number of requests for one client connection processed in parallel. Infinite if not specified. - + "parallel_requests_limit": 10, // Optional parameter, used only if "processing_strategy" is "parallel". It limits the number of requests for one client connection processed in parallel. Infinite if not specified. + // Max number of responses to queue up before sent successfully. If a client's waiting queue is too long, the server will close the connection. + "ws_max_sending_queue_size": 1500 }, // Time in seconds for graceful shutdown. Defaults to 10 seconds. Not fully implemented yet. "graceful_period": 10.0, diff --git a/src/web/HttpSession.hpp b/src/web/HttpSession.hpp index 0f01da37..fa4ab057 100644 --- a/src/web/HttpSession.hpp +++ b/src/web/HttpSession.hpp @@ -30,6 +30,7 @@ #include #include +#include #include #include #include @@ -52,6 +53,7 @@ class HttpSession : public impl::HttpBase, public std::enable_shared_from_this> { boost::beast::tcp_stream stream_; std::reference_wrapper tagFactory_; + std::uint32_t maxWsSendingQueueSize_; public: /** @@ -64,6 +66,7 @@ public: * @param dosGuard The denial of service guard to use * @param handler The server handler to use * @param buffer Buffer with initial data received from the peer + * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket */ explicit HttpSession( tcp::socket&& socket, @@ -72,7 +75,8 @@ public: std::reference_wrapper tagFactory, std::reference_wrapper dosGuard, std::shared_ptr const& handler, - boost::beast::flat_buffer buffer + boost::beast::flat_buffer buffer, + std::uint32_t maxWsSendingQueueSize ) : impl::HttpBase( ip, @@ -84,6 +88,7 @@ public: ) , stream_(std::move(socket)) , tagFactory_(tagFactory) + , maxWsSendingQueueSize_(maxWsSendingQueueSize) { } @@ -128,7 +133,8 @@ public: this->handler_, std::move(this->buffer_), std::move(this->req_), - ConnectionBase::isAdmin() + ConnectionBase::isAdmin(), + maxWsSendingQueueSize_ ) ->run(); } diff --git a/src/web/PlainWsSession.hpp b/src/web/PlainWsSession.hpp index 1d2da951..91e294da 100644 --- a/src/web/PlainWsSession.hpp +++ b/src/web/PlainWsSession.hpp @@ -62,7 +62,8 @@ public: * @param dosGuard The denial of service guard to use * @param handler The server handler to use * @param buffer Buffer with initial data received from the peer - * @param isAdmin Whether the connection has admin privileges + * @param isAdmin Whether the connection has admin privileges, + * @param maxSendingQueueSize The maximum size of the sending queue for websocket */ explicit PlainWsSession( boost::asio::ip::tcp::socket&& socket, @@ -71,9 +72,17 @@ public: std::reference_wrapper dosGuard, std::shared_ptr const& handler, boost::beast::flat_buffer&& buffer, - bool isAdmin + bool isAdmin, + std::uint32_t maxSendingQueueSize ) - : impl::WsBase(ip, tagFactory, dosGuard, handler, std::move(buffer)) + : impl::WsBase( + ip, + tagFactory, + dosGuard, + handler, + std::move(buffer), + maxSendingQueueSize + ) , ws_(std::move(socket)) { ConnectionBase::isAdmin_ = isAdmin; // NOLINT(cppcoreguidelines-prefer-member-initializer) @@ -107,6 +116,7 @@ class WsUpgrader : public std::enable_shared_from_this> std::string ip_; std::shared_ptr const handler_; bool isAdmin_; + std::uint32_t maxWsSendingQueueSize_; public: /** @@ -120,6 +130,7 @@ public: * @param buffer Buffer with initial data received from the peer. Ownership is transferred * @param request The request. Ownership is transferred * @param isAdmin Whether the connection has admin privileges + * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket */ WsUpgrader( boost::beast::tcp_stream&& stream, @@ -129,7 +140,8 @@ public: std::shared_ptr const& handler, boost::beast::flat_buffer&& buffer, http::request request, - bool isAdmin + bool isAdmin, + std::uint32_t maxWsSendingQueueSize ) : http_(std::move(stream)) , buffer_(std::move(buffer)) @@ -139,6 +151,7 @@ public: , ip_(std::move(ip)) , handler_(handler) , isAdmin_(isAdmin) + , maxWsSendingQueueSize_(maxWsSendingQueueSize) { } @@ -175,7 +188,14 @@ private: boost::beast::get_lowest_layer(http_).expires_never(); std::make_shared>( - http_.release_socket(), ip_, tagFactory_, dosGuard_, handler_, std::move(buffer_), isAdmin_ + http_.release_socket(), + ip_, + tagFactory_, + dosGuard_, + handler_, + std::move(buffer_), + isAdmin_, + maxWsSendingQueueSize_ ) ->run(std::move(req_)); } diff --git a/src/web/Server.hpp b/src/web/Server.hpp index beb27ac6..7a6a3dea 100644 --- a/src/web/Server.hpp +++ b/src/web/Server.hpp @@ -41,6 +41,7 @@ #include #include +#include #include #include #include @@ -84,6 +85,7 @@ class Detector : public std::enable_shared_from_this const handler_; boost::beast::flat_buffer buffer_; std::shared_ptr const adminVerification_; + std::uint32_t maxWsSendingQueueSize_; public: /** @@ -95,6 +97,7 @@ public: * @param dosGuard The denial of service guard to use * @param handler The server handler to use * @param adminVerification The admin verification strategy to use + * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket */ Detector( tcp::socket&& socket, @@ -102,7 +105,8 @@ public: std::reference_wrapper tagFactory, std::reference_wrapper dosGuard, std::shared_ptr handler, - std::shared_ptr adminVerification + std::shared_ptr adminVerification, + std::uint32_t maxWsSendingQueueSize ) : stream_(std::move(socket)) , ctx_(ctx) @@ -110,6 +114,7 @@ public: , dosGuard_(dosGuard) , handler_(std::move(handler)) , adminVerification_(std::move(adminVerification)) + , maxWsSendingQueueSize_(maxWsSendingQueueSize) { } @@ -167,14 +172,22 @@ public: tagFactory_, dosGuard_, handler_, - std::move(buffer_) + std::move(buffer_), + maxWsSendingQueueSize_ ) ->run(); return; } std::make_shared>( - stream_.release_socket(), ip, adminVerification_, tagFactory_, dosGuard_, handler_, std::move(buffer_) + stream_.release_socket(), + ip, + adminVerification_, + tagFactory_, + dosGuard_, + handler_, + std::move(buffer_), + maxWsSendingQueueSize_ ) ->run(); } @@ -204,6 +217,7 @@ class Server : public std::enable_shared_from_this handler_; tcp::acceptor acceptor_; std::shared_ptr adminVerification_; + std::uint32_t maxWsSendingQueueSize_; public: /** @@ -216,6 +230,7 @@ public: * @param dosGuard The denial of service guard to use * @param handler The server handler to use * @param adminPassword The optional password to verify admin role in requests + * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket */ Server( boost::asio::io_context& ioc, @@ -224,7 +239,8 @@ public: util::TagDecoratorFactory tagFactory, dosguard::DOSGuardInterface& dosGuard, std::shared_ptr handler, - std::optional adminPassword + std::optional adminPassword, + std::uint32_t maxWsSendingQueueSize ) : ioc_(std::ref(ioc)) , ctx_(std::move(ctx)) @@ -233,6 +249,7 @@ public: , handler_(std::move(handler)) , acceptor_(boost::asio::make_strand(ioc)) , adminVerification_(impl::make_AdminVerificationStrategy(std::move(adminPassword))) + , maxWsSendingQueueSize_(maxWsSendingQueueSize) { boost::beast::error_code ec; @@ -286,7 +303,13 @@ private: ctx_ ? std::optional>{ctx_.value()} : std::nullopt; std::make_shared>( - std::move(socket), ctxRef, std::cref(tagFactory_), dosGuard_, handler_, adminVerification_ + std::move(socket), + ctxRef, + std::cref(tagFactory_), + dosGuard_, + handler_, + adminVerification_, + maxWsSendingQueueSize_ ) ->run(); } @@ -348,6 +371,10 @@ make_HttpServer( throw std::logic_error("Admin config error, one method must be specified to authorize admin."); } + // If the transactions number is 200 per ledger, A client which subscribes everything will send 400+ feeds for + // each ledger. we allow user delay 3 ledgers by default + auto const maxWsSendingQueueSize = serverConfig.valueOr("ws_max_sending_queue_size", 1500); + auto server = std::make_shared>( ioc, std::move(expectedSslContext).value(), @@ -355,7 +382,8 @@ make_HttpServer( util::TagDecoratorFactory(config), dosGuard, handler, - std::move(adminPassword) + std::move(adminPassword), + maxWsSendingQueueSize ); server->run(); diff --git a/src/web/SslHttpSession.hpp b/src/web/SslHttpSession.hpp index d5590aba..82814e69 100644 --- a/src/web/SslHttpSession.hpp +++ b/src/web/SslHttpSession.hpp @@ -37,6 +37,7 @@ #include #include +#include #include #include #include @@ -59,6 +60,7 @@ class SslHttpSession : public impl::HttpBase, public std::enable_shared_from_this> { boost::beast::ssl_stream stream_; std::reference_wrapper tagFactory_; + std::uint32_t maxWsSendingQueueSize_; public: /** @@ -72,6 +74,7 @@ public: * @param dosGuard The denial of service guard to use * @param handler The server handler to use * @param buffer Buffer with initial data received from the peer + * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket */ explicit SslHttpSession( tcp::socket&& socket, @@ -81,7 +84,8 @@ public: std::reference_wrapper tagFactory, std::reference_wrapper dosGuard, std::shared_ptr const& handler, - boost::beast::flat_buffer buffer + boost::beast::flat_buffer buffer, + std::uint32_t maxWsSendingQueueSize ) : impl::HttpBase( ip, @@ -93,6 +97,7 @@ public: ) , stream_(std::move(socket), ctx) , tagFactory_(tagFactory) + , maxWsSendingQueueSize_(maxWsSendingQueueSize) { } @@ -173,7 +178,8 @@ public: this->handler_, std::move(this->buffer_), std::move(this->req_), - ConnectionBase::isAdmin() + ConnectionBase::isAdmin(), + maxWsSendingQueueSize_ ) ->run(); } diff --git a/src/web/SslWsSession.hpp b/src/web/SslWsSession.hpp index 1e33fc59..ee39e34c 100644 --- a/src/web/SslWsSession.hpp +++ b/src/web/SslWsSession.hpp @@ -36,6 +36,7 @@ #include #include +#include #include #include #include @@ -64,6 +65,7 @@ public: * @param handler The server handler to use * @param buffer Buffer with initial data received from the peer * @param isAdmin Whether the connection has admin privileges + * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket */ explicit SslWsSession( boost::beast::ssl_stream&& stream, @@ -72,9 +74,17 @@ public: std::reference_wrapper dosGuard, std::shared_ptr const& handler, boost::beast::flat_buffer&& buffer, - bool isAdmin + bool isAdmin, + std::uint32_t maxWsSendingQueueSize ) - : impl::WsBase(ip, tagFactory, dosGuard, handler, std::move(buffer)) + : impl::WsBase( + ip, + tagFactory, + dosGuard, + handler, + std::move(buffer), + maxWsSendingQueueSize + ) , ws_(std::move(stream)) { ConnectionBase::isAdmin_ = isAdmin; // NOLINT(cppcoreguidelines-prefer-member-initializer) @@ -106,6 +116,7 @@ class SslWsUpgrader : public std::enable_shared_from_this const handler_; http::request req_; bool isAdmin_; + std::uint32_t maxWsSendingQueueSize_; public: /** @@ -119,6 +130,7 @@ public: * @param buffer Buffer with initial data received from the peer. Ownership is transferred * @param request The request. Ownership is transferred * @param isAdmin Whether the connection has admin privileges + * @param maxWsSendingQueueSize The maximum size of the sending queue for websocket */ SslWsUpgrader( boost::beast::ssl_stream stream, @@ -128,7 +140,8 @@ public: std::shared_ptr handler, boost::beast::flat_buffer&& buffer, http::request request, - bool isAdmin + bool isAdmin, + std::uint32_t maxWsSendingQueueSize ) : https_(std::move(stream)) , buffer_(std::move(buffer)) @@ -138,6 +151,7 @@ public: , handler_(std::move(handler)) , req_(std::move(request)) , isAdmin_(isAdmin) + , maxWsSendingQueueSize_(maxWsSendingQueueSize) { } @@ -179,7 +193,14 @@ private: boost::beast::get_lowest_layer(https_).expires_never(); std::make_shared>( - std::move(https_), ip_, tagFactory_, dosGuard_, handler_, std::move(buffer_), isAdmin_ + std::move(https_), + ip_, + tagFactory_, + dosGuard_, + handler_, + std::move(buffer_), + isAdmin_, + maxWsSendingQueueSize_ ) ->run(std::move(req_)); } diff --git a/src/web/impl/WsBase.hpp b/src/web/impl/WsBase.hpp index bffd7bee..2d941bd7 100644 --- a/src/web/impl/WsBase.hpp +++ b/src/web/impl/WsBase.hpp @@ -23,9 +23,6 @@ #include "rpc/common/Types.hpp" #include "util/Taggable.hpp" #include "util/log/Logger.hpp" -#include "util/prometheus/Gauge.hpp" -#include "util/prometheus/Label.hpp" -#include "util/prometheus/Prometheus.hpp" #include "web/dosguard/DOSGuardInterface.hpp" #include "web/interface/Concepts.hpp" #include "web/interface/ConnectionBase.hpp" @@ -41,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -50,6 +48,7 @@ #include #include +#include #include #include #include @@ -74,14 +73,14 @@ template