fix: Add queue size limit for websocket (#1701)

For slow clients, we will disconnect with it if the message queue is too
long.

---------

Co-authored-by: Sergey Kuznetsov <skuznetsov@ripple.com>
This commit is contained in:
cyan317
2024-10-25 13:30:52 +01:00
committed by GitHub
parent f083c82557
commit 1c82d379d9
8 changed files with 132 additions and 96 deletions

View File

@@ -75,9 +75,9 @@
// For sequent policy request from one client connection will be processed one by one and the next one will not be read before // For sequent policy request from one client connection will be processed one by one and the next one will not be read before
// the previous one is processed. For parallel policy Clio will take all requests and process them in parallel and // the previous one is processed. For parallel policy Clio will take all requests and process them in parallel and
// send a reply for each request whenever it is ready. // send a reply for each request whenever it is ready.
"parallel_requests_limit": 10 // Optional parameter, used only if "processing_strategy" is "parallel". "parallel_requests_limit": 10, // Optional parameter, used only if "processing_strategy" is "parallel". It limits the number of requests for one client connection processed in parallel. Infinite if not specified.
It limits the number of requests for one client connection processed in parallel. Infinite if not specified. // Max number of responses to queue up before sent successfully. If a client's waiting queue is too long, the server will close the connection.
"ws_max_sending_queue_size": 1500
}, },
// Time in seconds for graceful shutdown. Defaults to 10 seconds. Not fully implemented yet. // Time in seconds for graceful shutdown. Defaults to 10 seconds. Not fully implemented yet.
"graceful_period": 10.0, "graceful_period": 10.0,

View File

@@ -30,6 +30,7 @@
#include <boost/beast/core/flat_buffer.hpp> #include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/core/tcp_stream.hpp> #include <boost/beast/core/tcp_stream.hpp>
#include <cstdint>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <string> #include <string>
@@ -52,6 +53,7 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
public std::enable_shared_from_this<HttpSession<HandlerType>> { public std::enable_shared_from_this<HttpSession<HandlerType>> {
boost::beast::tcp_stream stream_; boost::beast::tcp_stream stream_;
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_; std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
std::uint32_t maxWsSendingQueueSize_;
public: public:
/** /**
@@ -64,6 +66,7 @@ public:
* @param dosGuard The denial of service guard to use * @param dosGuard The denial of service guard to use
* @param handler The server handler to use * @param handler The server handler to use
* @param buffer Buffer with initial data received from the peer * @param buffer Buffer with initial data received from the peer
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/ */
explicit HttpSession( explicit HttpSession(
tcp::socket&& socket, tcp::socket&& socket,
@@ -72,7 +75,8 @@ public:
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory, std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard, std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
std::shared_ptr<HandlerType> const& handler, std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer buffer boost::beast::flat_buffer buffer,
std::uint32_t maxWsSendingQueueSize
) )
: impl::HttpBase<HttpSession, HandlerType>( : impl::HttpBase<HttpSession, HandlerType>(
ip, ip,
@@ -84,6 +88,7 @@ public:
) )
, stream_(std::move(socket)) , stream_(std::move(socket))
, tagFactory_(tagFactory) , tagFactory_(tagFactory)
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{ {
} }
@@ -128,7 +133,8 @@ public:
this->handler_, this->handler_,
std::move(this->buffer_), std::move(this->buffer_),
std::move(this->req_), std::move(this->req_),
ConnectionBase::isAdmin() ConnectionBase::isAdmin(),
maxWsSendingQueueSize_
) )
->run(); ->run();
} }

View File

@@ -62,7 +62,8 @@ public:
* @param dosGuard The denial of service guard to use * @param dosGuard The denial of service guard to use
* @param handler The server handler to use * @param handler The server handler to use
* @param buffer Buffer with initial data received from the peer * @param buffer Buffer with initial data received from the peer
* @param isAdmin Whether the connection has admin privileges * @param isAdmin Whether the connection has admin privileges,
* @param maxSendingQueueSize The maximum size of the sending queue for websocket
*/ */
explicit PlainWsSession( explicit PlainWsSession(
boost::asio::ip::tcp::socket&& socket, boost::asio::ip::tcp::socket&& socket,
@@ -71,9 +72,17 @@ public:
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard, std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
std::shared_ptr<HandlerType> const& handler, std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer&& buffer, boost::beast::flat_buffer&& buffer,
bool isAdmin bool isAdmin,
std::uint32_t maxSendingQueueSize
) )
: impl::WsBase<PlainWsSession, HandlerType>(ip, tagFactory, dosGuard, handler, std::move(buffer)) : impl::WsBase<PlainWsSession, HandlerType>(
ip,
tagFactory,
dosGuard,
handler,
std::move(buffer),
maxSendingQueueSize
)
, ws_(std::move(socket)) , ws_(std::move(socket))
{ {
ConnectionBase::isAdmin_ = isAdmin; // NOLINT(cppcoreguidelines-prefer-member-initializer) ConnectionBase::isAdmin_ = isAdmin; // NOLINT(cppcoreguidelines-prefer-member-initializer)
@@ -107,6 +116,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
std::string ip_; std::string ip_;
std::shared_ptr<HandlerType> const handler_; std::shared_ptr<HandlerType> const handler_;
bool isAdmin_; bool isAdmin_;
std::uint32_t maxWsSendingQueueSize_;
public: public:
/** /**
@@ -120,6 +130,7 @@ public:
* @param buffer Buffer with initial data received from the peer. Ownership is transferred * @param buffer Buffer with initial data received from the peer. Ownership is transferred
* @param request The request. Ownership is transferred * @param request The request. Ownership is transferred
* @param isAdmin Whether the connection has admin privileges * @param isAdmin Whether the connection has admin privileges
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/ */
WsUpgrader( WsUpgrader(
boost::beast::tcp_stream&& stream, boost::beast::tcp_stream&& stream,
@@ -129,7 +140,8 @@ public:
std::shared_ptr<HandlerType> const& handler, std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer&& buffer, boost::beast::flat_buffer&& buffer,
http::request<http::string_body> request, http::request<http::string_body> request,
bool isAdmin bool isAdmin,
std::uint32_t maxWsSendingQueueSize
) )
: http_(std::move(stream)) : http_(std::move(stream))
, buffer_(std::move(buffer)) , buffer_(std::move(buffer))
@@ -139,6 +151,7 @@ public:
, ip_(std::move(ip)) , ip_(std::move(ip))
, handler_(handler) , handler_(handler)
, isAdmin_(isAdmin) , isAdmin_(isAdmin)
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{ {
} }
@@ -175,7 +188,14 @@ private:
boost::beast::get_lowest_layer(http_).expires_never(); boost::beast::get_lowest_layer(http_).expires_never();
std::make_shared<PlainWsSession<HandlerType>>( std::make_shared<PlainWsSession<HandlerType>>(
http_.release_socket(), ip_, tagFactory_, dosGuard_, handler_, std::move(buffer_), isAdmin_ http_.release_socket(),
ip_,
tagFactory_,
dosGuard_,
handler_,
std::move(buffer_),
isAdmin_,
maxWsSendingQueueSize_
) )
->run(std::move(req_)); ->run(std::move(req_));
} }

View File

@@ -41,6 +41,7 @@
#include <fmt/core.h> #include <fmt/core.h>
#include <chrono> #include <chrono>
#include <cstdint>
#include <exception> #include <exception>
#include <functional> #include <functional>
#include <memory> #include <memory>
@@ -84,6 +85,7 @@ class Detector : public std::enable_shared_from_this<Detector<PlainSessionType,
std::shared_ptr<HandlerType> const handler_; std::shared_ptr<HandlerType> const handler_;
boost::beast::flat_buffer buffer_; boost::beast::flat_buffer buffer_;
std::shared_ptr<impl::AdminVerificationStrategy> const adminVerification_; std::shared_ptr<impl::AdminVerificationStrategy> const adminVerification_;
std::uint32_t maxWsSendingQueueSize_;
public: public:
/** /**
@@ -95,6 +97,7 @@ public:
* @param dosGuard The denial of service guard to use * @param dosGuard The denial of service guard to use
* @param handler The server handler to use * @param handler The server handler to use
* @param adminVerification The admin verification strategy to use * @param adminVerification The admin verification strategy to use
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/ */
Detector( Detector(
tcp::socket&& socket, tcp::socket&& socket,
@@ -102,7 +105,8 @@ public:
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory, std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard, std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
std::shared_ptr<HandlerType> handler, std::shared_ptr<HandlerType> handler,
std::shared_ptr<impl::AdminVerificationStrategy> adminVerification std::shared_ptr<impl::AdminVerificationStrategy> adminVerification,
std::uint32_t maxWsSendingQueueSize
) )
: stream_(std::move(socket)) : stream_(std::move(socket))
, ctx_(ctx) , ctx_(ctx)
@@ -110,6 +114,7 @@ public:
, dosGuard_(dosGuard) , dosGuard_(dosGuard)
, handler_(std::move(handler)) , handler_(std::move(handler))
, adminVerification_(std::move(adminVerification)) , adminVerification_(std::move(adminVerification))
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{ {
} }
@@ -167,14 +172,22 @@ public:
tagFactory_, tagFactory_,
dosGuard_, dosGuard_,
handler_, handler_,
std::move(buffer_) std::move(buffer_),
maxWsSendingQueueSize_
) )
->run(); ->run();
return; return;
} }
std::make_shared<PlainSessionType<HandlerType>>( std::make_shared<PlainSessionType<HandlerType>>(
stream_.release_socket(), ip, adminVerification_, tagFactory_, dosGuard_, handler_, std::move(buffer_) stream_.release_socket(),
ip,
adminVerification_,
tagFactory_,
dosGuard_,
handler_,
std::move(buffer_),
maxWsSendingQueueSize_
) )
->run(); ->run();
} }
@@ -204,6 +217,7 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
std::shared_ptr<HandlerType> handler_; std::shared_ptr<HandlerType> handler_;
tcp::acceptor acceptor_; tcp::acceptor acceptor_;
std::shared_ptr<impl::AdminVerificationStrategy> adminVerification_; std::shared_ptr<impl::AdminVerificationStrategy> adminVerification_;
std::uint32_t maxWsSendingQueueSize_;
public: public:
/** /**
@@ -216,6 +230,7 @@ public:
* @param dosGuard The denial of service guard to use * @param dosGuard The denial of service guard to use
* @param handler The server handler to use * @param handler The server handler to use
* @param adminPassword The optional password to verify admin role in requests * @param adminPassword The optional password to verify admin role in requests
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/ */
Server( Server(
boost::asio::io_context& ioc, boost::asio::io_context& ioc,
@@ -224,7 +239,8 @@ public:
util::TagDecoratorFactory tagFactory, util::TagDecoratorFactory tagFactory,
dosguard::DOSGuardInterface& dosGuard, dosguard::DOSGuardInterface& dosGuard,
std::shared_ptr<HandlerType> handler, std::shared_ptr<HandlerType> handler,
std::optional<std::string> adminPassword std::optional<std::string> adminPassword,
std::uint32_t maxWsSendingQueueSize
) )
: ioc_(std::ref(ioc)) : ioc_(std::ref(ioc))
, ctx_(std::move(ctx)) , ctx_(std::move(ctx))
@@ -233,6 +249,7 @@ public:
, handler_(std::move(handler)) , handler_(std::move(handler))
, acceptor_(boost::asio::make_strand(ioc)) , acceptor_(boost::asio::make_strand(ioc))
, adminVerification_(impl::make_AdminVerificationStrategy(std::move(adminPassword))) , adminVerification_(impl::make_AdminVerificationStrategy(std::move(adminPassword)))
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{ {
boost::beast::error_code ec; boost::beast::error_code ec;
@@ -286,7 +303,13 @@ private:
ctx_ ? std::optional<std::reference_wrapper<boost::asio::ssl::context>>{ctx_.value()} : std::nullopt; ctx_ ? std::optional<std::reference_wrapper<boost::asio::ssl::context>>{ctx_.value()} : std::nullopt;
std::make_shared<Detector<PlainSessionType, SslSessionType, HandlerType>>( std::make_shared<Detector<PlainSessionType, SslSessionType, HandlerType>>(
std::move(socket), ctxRef, std::cref(tagFactory_), dosGuard_, handler_, adminVerification_ std::move(socket),
ctxRef,
std::cref(tagFactory_),
dosGuard_,
handler_,
adminVerification_,
maxWsSendingQueueSize_
) )
->run(); ->run();
} }
@@ -348,6 +371,10 @@ make_HttpServer(
throw std::logic_error("Admin config error, one method must be specified to authorize admin."); throw std::logic_error("Admin config error, one method must be specified to authorize admin.");
} }
// If the transactions number is 200 per ledger, A client which subscribes everything will send 400+ feeds for
// each ledger. we allow user delay 3 ledgers by default
auto const maxWsSendingQueueSize = serverConfig.valueOr("ws_max_sending_queue_size", 1500);
auto server = std::make_shared<HttpServer<HandlerType>>( auto server = std::make_shared<HttpServer<HandlerType>>(
ioc, ioc,
std::move(expectedSslContext).value(), std::move(expectedSslContext).value(),
@@ -355,7 +382,8 @@ make_HttpServer(
util::TagDecoratorFactory(config), util::TagDecoratorFactory(config),
dosGuard, dosGuard,
handler, handler,
std::move(adminPassword) std::move(adminPassword),
maxWsSendingQueueSize
); );
server->run(); server->run();

View File

@@ -37,6 +37,7 @@
#include <chrono> #include <chrono>
#include <cstddef> #include <cstddef>
#include <cstdint>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <string> #include <string>
@@ -59,6 +60,7 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
public std::enable_shared_from_this<SslHttpSession<HandlerType>> { public std::enable_shared_from_this<SslHttpSession<HandlerType>> {
boost::beast::ssl_stream<boost::beast::tcp_stream> stream_; boost::beast::ssl_stream<boost::beast::tcp_stream> stream_;
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_; std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
std::uint32_t maxWsSendingQueueSize_;
public: public:
/** /**
@@ -72,6 +74,7 @@ public:
* @param dosGuard The denial of service guard to use * @param dosGuard The denial of service guard to use
* @param handler The server handler to use * @param handler The server handler to use
* @param buffer Buffer with initial data received from the peer * @param buffer Buffer with initial data received from the peer
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/ */
explicit SslHttpSession( explicit SslHttpSession(
tcp::socket&& socket, tcp::socket&& socket,
@@ -81,7 +84,8 @@ public:
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory, std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard, std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
std::shared_ptr<HandlerType> const& handler, std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer buffer boost::beast::flat_buffer buffer,
std::uint32_t maxWsSendingQueueSize
) )
: impl::HttpBase<SslHttpSession, HandlerType>( : impl::HttpBase<SslHttpSession, HandlerType>(
ip, ip,
@@ -93,6 +97,7 @@ public:
) )
, stream_(std::move(socket), ctx) , stream_(std::move(socket), ctx)
, tagFactory_(tagFactory) , tagFactory_(tagFactory)
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{ {
} }
@@ -173,7 +178,8 @@ public:
this->handler_, this->handler_,
std::move(this->buffer_), std::move(this->buffer_),
std::move(this->req_), std::move(this->req_),
ConnectionBase::isAdmin() ConnectionBase::isAdmin(),
maxWsSendingQueueSize_
) )
->run(); ->run();
} }

View File

@@ -36,6 +36,7 @@
#include <boost/optional/optional.hpp> #include <boost/optional/optional.hpp>
#include <chrono> #include <chrono>
#include <cstdint>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <string> #include <string>
@@ -64,6 +65,7 @@ public:
* @param handler The server handler to use * @param handler The server handler to use
* @param buffer Buffer with initial data received from the peer * @param buffer Buffer with initial data received from the peer
* @param isAdmin Whether the connection has admin privileges * @param isAdmin Whether the connection has admin privileges
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/ */
explicit SslWsSession( explicit SslWsSession(
boost::beast::ssl_stream<boost::beast::tcp_stream>&& stream, boost::beast::ssl_stream<boost::beast::tcp_stream>&& stream,
@@ -72,9 +74,17 @@ public:
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard, std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
std::shared_ptr<HandlerType> const& handler, std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer&& buffer, boost::beast::flat_buffer&& buffer,
bool isAdmin bool isAdmin,
std::uint32_t maxWsSendingQueueSize
) )
: impl::WsBase<SslWsSession, HandlerType>(ip, tagFactory, dosGuard, handler, std::move(buffer)) : impl::WsBase<SslWsSession, HandlerType>(
ip,
tagFactory,
dosGuard,
handler,
std::move(buffer),
maxWsSendingQueueSize
)
, ws_(std::move(stream)) , ws_(std::move(stream))
{ {
ConnectionBase::isAdmin_ = isAdmin; // NOLINT(cppcoreguidelines-prefer-member-initializer) ConnectionBase::isAdmin_ = isAdmin; // NOLINT(cppcoreguidelines-prefer-member-initializer)
@@ -106,6 +116,7 @@ class SslWsUpgrader : public std::enable_shared_from_this<SslWsUpgrader<HandlerT
std::shared_ptr<HandlerType> const handler_; std::shared_ptr<HandlerType> const handler_;
http::request<http::string_body> req_; http::request<http::string_body> req_;
bool isAdmin_; bool isAdmin_;
std::uint32_t maxWsSendingQueueSize_;
public: public:
/** /**
@@ -119,6 +130,7 @@ public:
* @param buffer Buffer with initial data received from the peer. Ownership is transferred * @param buffer Buffer with initial data received from the peer. Ownership is transferred
* @param request The request. Ownership is transferred * @param request The request. Ownership is transferred
* @param isAdmin Whether the connection has admin privileges * @param isAdmin Whether the connection has admin privileges
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/ */
SslWsUpgrader( SslWsUpgrader(
boost::beast::ssl_stream<boost::beast::tcp_stream> stream, boost::beast::ssl_stream<boost::beast::tcp_stream> stream,
@@ -128,7 +140,8 @@ public:
std::shared_ptr<HandlerType> handler, std::shared_ptr<HandlerType> handler,
boost::beast::flat_buffer&& buffer, boost::beast::flat_buffer&& buffer,
http::request<http::string_body> request, http::request<http::string_body> request,
bool isAdmin bool isAdmin,
std::uint32_t maxWsSendingQueueSize
) )
: https_(std::move(stream)) : https_(std::move(stream))
, buffer_(std::move(buffer)) , buffer_(std::move(buffer))
@@ -138,6 +151,7 @@ public:
, handler_(std::move(handler)) , handler_(std::move(handler))
, req_(std::move(request)) , req_(std::move(request))
, isAdmin_(isAdmin) , isAdmin_(isAdmin)
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{ {
} }
@@ -179,7 +193,14 @@ private:
boost::beast::get_lowest_layer(https_).expires_never(); boost::beast::get_lowest_layer(https_).expires_never();
std::make_shared<SslWsSession<HandlerType>>( std::make_shared<SslWsSession<HandlerType>>(
std::move(https_), ip_, tagFactory_, dosGuard_, handler_, std::move(buffer_), isAdmin_ std::move(https_),
ip_,
tagFactory_,
dosGuard_,
handler_,
std::move(buffer_),
isAdmin_,
maxWsSendingQueueSize_
) )
->run(std::move(req_)); ->run(std::move(req_));
} }

View File

@@ -23,9 +23,6 @@
#include "rpc/common/Types.hpp" #include "rpc/common/Types.hpp"
#include "util/Taggable.hpp" #include "util/Taggable.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "web/dosguard/DOSGuardInterface.hpp" #include "web/dosguard/DOSGuardInterface.hpp"
#include "web/interface/Concepts.hpp" #include "web/interface/Concepts.hpp"
#include "web/interface/ConnectionBase.hpp" #include "web/interface/ConnectionBase.hpp"
@@ -41,6 +38,7 @@
#include <boost/beast/http/status.hpp> #include <boost/beast/http/status.hpp>
#include <boost/beast/http/string_body.hpp> #include <boost/beast/http/string_body.hpp>
#include <boost/beast/version.hpp> #include <boost/beast/version.hpp>
#include <boost/beast/websocket/error.hpp>
#include <boost/beast/websocket/rfc6455.hpp> #include <boost/beast/websocket/rfc6455.hpp>
#include <boost/beast/websocket/stream_base.hpp> #include <boost/beast/websocket/stream_base.hpp>
#include <boost/core/ignore_unused.hpp> #include <boost/core/ignore_unused.hpp>
@@ -50,6 +48,7 @@
#include <xrpl/protocol/ErrorCodes.h> #include <xrpl/protocol/ErrorCodes.h>
#include <cstddef> #include <cstddef>
#include <cstdint>
#include <exception> #include <exception>
#include <functional> #include <functional>
#include <memory> #include <memory>
@@ -74,14 +73,14 @@ template <template <typename> typename Derived, SomeServerHandler HandlerType>
class WsBase : public ConnectionBase, public std::enable_shared_from_this<WsBase<Derived, HandlerType>> { class WsBase : public ConnectionBase, public std::enable_shared_from_this<WsBase<Derived, HandlerType>> {
using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this; using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this;
std::reference_wrapper<util::prometheus::GaugeInt> messagesLength_;
boost::beast::flat_buffer buffer_; boost::beast::flat_buffer buffer_;
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_; std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
bool sending_ = false; bool sending_ = false;
std::queue<std::shared_ptr<std::string>> messages_; std::queue<std::shared_ptr<std::string>> messages_;
std::shared_ptr<HandlerType> const handler_; std::shared_ptr<HandlerType> const handler_;
std::uint32_t maxSendingQueueSize_;
protected: protected:
util::Logger log_{"WebServer"}; util::Logger log_{"WebServer"};
util::Logger perfLog_{"Performance"}; util::Logger perfLog_{"Performance"};
@@ -90,9 +89,8 @@ protected:
wsFail(boost::beast::error_code ec, char const* what) wsFail(boost::beast::error_code ec, char const* what)
{ {
// Don't log if the WebSocket stream was gracefully closed at both endpoints // Don't log if the WebSocket stream was gracefully closed at both endpoints
if (ec != boost::beast::websocket::error::closed) { if (ec != boost::beast::websocket::error::closed)
LOG(perfLog_.error()) << tag() << ": " << what << ": " << ec.message() << ": " << ec.value(); LOG(log_.error()) << tag() << ": " << what << ": " << ec.message() << ": " << ec.value();
}
if (!ec_ && ec != boost::asio::error::operation_aborted) { if (!ec_ && ec != boost::asio::error::operation_aborted) {
ec_ = ec; ec_ = ec;
@@ -106,17 +104,14 @@ public:
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory, std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard, std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
std::shared_ptr<HandlerType> const& handler, std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer&& buffer boost::beast::flat_buffer&& buffer,
std::uint32_t maxSendingQueueSize
) )
: ConnectionBase(tagFactory, ip) : ConnectionBase(tagFactory, ip)
, messagesLength_(PrometheusService::gaugeInt(
"ws_messages_length",
util::prometheus::Labels(),
"The total length of messages in the queue"
))
, buffer_(std::move(buffer)) , buffer_(std::move(buffer))
, dosGuard_(dosGuard) , dosGuard_(dosGuard)
, handler_(handler) , handler_(handler)
, maxSendingQueueSize_(maxSendingQueueSize)
{ {
upgraded = true; // NOLINT (cppcoreguidelines-pro-type-member-init) upgraded = true; // NOLINT (cppcoreguidelines-pro-type-member-init)
@@ -126,8 +121,6 @@ public:
~WsBase() override ~WsBase() override
{ {
LOG(perfLog_.debug()) << tag() << "session closed"; LOG(perfLog_.debug()) << tag() << "session closed";
if (!messages_.empty())
messagesLength_.get() -= messages_.size();
dosGuard_.get().decrement(clientIp); dosGuard_.get().decrement(clientIp);
} }
@@ -151,7 +144,6 @@ public:
onWrite(boost::system::error_code ec, std::size_t) onWrite(boost::system::error_code ec, std::size_t)
{ {
messages_.pop(); messages_.pop();
--messagesLength_.get();
sending_ = false; sending_ = false;
if (ec) { if (ec) {
wsFail(ec, "Failed to write"); wsFail(ec, "Failed to write");
@@ -181,8 +173,12 @@ public:
boost::asio::dispatch( boost::asio::dispatch(
derived().ws().get_executor(), derived().ws().get_executor(),
[this, self = derived().shared_from_this(), msg = std::move(msg)]() { [this, self = derived().shared_from_this(), msg = std::move(msg)]() {
if (messages_.size() > maxSendingQueueSize_) {
wsFail(boost::asio::error::timed_out, "Client is too slow");
return;
}
messages_.push(msg); messages_.push(msg);
++messagesLength_.get();
maybeSendNext(); maybeSendNext();
} }
); );

View File

@@ -24,7 +24,6 @@
#include "util/TestWebSocketClient.hpp" #include "util/TestWebSocketClient.hpp"
#include "util/TmpFile.hpp" #include "util/TmpFile.hpp"
#include "util/config/Config.hpp" #include "util/config/Config.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Label.hpp" #include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp" #include "util/prometheus/Prometheus.hpp"
#include "web/Server.hpp" #include "web/Server.hpp"
@@ -45,7 +44,6 @@
#include <boost/json/value.hpp> #include <boost/json/value.hpp>
#include <boost/system/system_error.hpp> #include <boost/system/system_error.hpp>
#include <fmt/core.h> #include <fmt/core.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <test_data/SslCert.hpp> #include <test_data/SslCert.hpp>
@@ -152,8 +150,6 @@ private:
std::optional<std::thread> runner; std::optional<std::thread> runner;
}; };
struct WebServerTestsWithMockPrometheus : WebServerTest, prometheus::WithMockPrometheus {};
class EchoExecutor { class EchoExecutor {
public: public:
void void
@@ -214,7 +210,7 @@ makeServerSync(
} // namespace } // namespace
TEST_F(WebServerTestsWithMockPrometheus, Http) TEST_F(WebServerTest, Http)
{ {
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
auto const server = makeServerSync(cfg, ctx, dosGuard, e); auto const server = makeServerSync(cfg, ctx, dosGuard, e);
@@ -222,13 +218,8 @@ TEST_F(WebServerTestsWithMockPrometheus, Http)
EXPECT_EQ(res, R"({"Hello":1})"); EXPECT_EQ(res, R"({"Hello":1})");
} }
TEST_F(WebServerTestsWithMockPrometheus, Ws) TEST_F(WebServerTest, Ws)
{ {
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
auto const server = makeServerSync(cfg, ctx, dosGuard, e); auto const server = makeServerSync(cfg, ctx, dosGuard, e);
WebSocketSyncClient wsClient; WebSocketSyncClient wsClient;
@@ -238,7 +229,7 @@ TEST_F(WebServerTestsWithMockPrometheus, Ws)
wsClient.disconnect(); wsClient.disconnect();
} }
TEST_F(WebServerTestsWithMockPrometheus, HttpInternalError) TEST_F(WebServerTest, HttpInternalError)
{ {
auto e = std::make_shared<ExceptionExecutor>(); auto e = std::make_shared<ExceptionExecutor>();
auto const server = makeServerSync(cfg, ctx, dosGuard, e); auto const server = makeServerSync(cfg, ctx, dosGuard, e);
@@ -249,13 +240,8 @@ TEST_F(WebServerTestsWithMockPrometheus, HttpInternalError)
); );
} }
TEST_F(WebServerTestsWithMockPrometheus, WsInternalError) TEST_F(WebServerTest, WsInternalError)
{ {
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
auto e = std::make_shared<ExceptionExecutor>(); auto e = std::make_shared<ExceptionExecutor>();
auto const server = makeServerSync(cfg, ctx, dosGuard, e); auto const server = makeServerSync(cfg, ctx, dosGuard, e);
WebSocketSyncClient wsClient; WebSocketSyncClient wsClient;
@@ -268,13 +254,8 @@ TEST_F(WebServerTestsWithMockPrometheus, WsInternalError)
); );
} }
TEST_F(WebServerTestsWithMockPrometheus, WsInternalErrorNotJson) TEST_F(WebServerTest, WsInternalErrorNotJson)
{ {
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
auto e = std::make_shared<ExceptionExecutor>(); auto e = std::make_shared<ExceptionExecutor>();
auto const server = makeServerSync(cfg, ctx, dosGuard, e); auto const server = makeServerSync(cfg, ctx, dosGuard, e);
WebSocketSyncClient wsClient; WebSocketSyncClient wsClient;
@@ -287,7 +268,7 @@ TEST_F(WebServerTestsWithMockPrometheus, WsInternalErrorNotJson)
); );
} }
TEST_F(WebServerTestsWithMockPrometheus, IncompleteSslConfig) TEST_F(WebServerTest, IncompleteSslConfig)
{ {
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
@@ -298,7 +279,7 @@ TEST_F(WebServerTestsWithMockPrometheus, IncompleteSslConfig)
EXPECT_EQ(server, nullptr); EXPECT_EQ(server, nullptr);
} }
TEST_F(WebServerTestsWithMockPrometheus, WrongSslConfig) TEST_F(WebServerTest, WrongSslConfig)
{ {
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
@@ -310,7 +291,7 @@ TEST_F(WebServerTestsWithMockPrometheus, WrongSslConfig)
EXPECT_EQ(server, nullptr); EXPECT_EQ(server, nullptr);
} }
TEST_F(WebServerTestsWithMockPrometheus, Https) TEST_F(WebServerTest, Https)
{ {
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
cfg = Config{addSslConfig(generateJSONWithDynamicPort(port))}; cfg = Config{addSslConfig(generateJSONWithDynamicPort(port))};
@@ -319,13 +300,8 @@ TEST_F(WebServerTestsWithMockPrometheus, Https)
EXPECT_EQ(res, R"({"Hello":1})"); EXPECT_EQ(res, R"({"Hello":1})");
} }
TEST_F(WebServerTestsWithMockPrometheus, Wss) TEST_F(WebServerTest, Wss)
{ {
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
cfg = Config{addSslConfig(generateJSONWithDynamicPort(port))}; cfg = Config{addSslConfig(generateJSONWithDynamicPort(port))};
auto server = makeServerSync(cfg, ctx, dosGuard, e); auto server = makeServerSync(cfg, ctx, dosGuard, e);
@@ -336,7 +312,7 @@ TEST_F(WebServerTestsWithMockPrometheus, Wss)
wsClient.disconnect(); wsClient.disconnect();
} }
TEST_F(WebServerTestsWithMockPrometheus, HttpRequestOverload) TEST_F(WebServerTest, HttpRequestOverload)
{ {
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
auto const server = makeServerSync(cfg, ctx, dosGuardOverload, e); auto const server = makeServerSync(cfg, ctx, dosGuardOverload, e);
@@ -349,13 +325,8 @@ TEST_F(WebServerTestsWithMockPrometheus, HttpRequestOverload)
); );
} }
TEST_F(WebServerTestsWithMockPrometheus, WsRequestOverload) TEST_F(WebServerTest, WsRequestOverload)
{ {
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1)).Times(2);
EXPECT_CALL(wsMessagesCounterMock, add(-1)).Times(2);
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
auto const server = makeServerSync(cfg, ctx, dosGuardOverload, e); auto const server = makeServerSync(cfg, ctx, dosGuardOverload, e);
WebSocketSyncClient wsClient; WebSocketSyncClient wsClient;
@@ -373,7 +344,7 @@ TEST_F(WebServerTestsWithMockPrometheus, WsRequestOverload)
); );
} }
TEST_F(WebServerTestsWithMockPrometheus, HttpPayloadOverload) TEST_F(WebServerTest, HttpPayloadOverload)
{ {
std::string const s100(100, 'a'); std::string const s100(100, 'a');
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
@@ -385,13 +356,8 @@ TEST_F(WebServerTestsWithMockPrometheus, HttpPayloadOverload)
); );
} }
TEST_F(WebServerTestsWithMockPrometheus, WsPayloadOverload) TEST_F(WebServerTest, WsPayloadOverload)
{ {
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
std::string const s100(100, 'a'); std::string const s100(100, 'a');
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
auto server = makeServerSync(cfg, ctx, dosGuardOverload, e); auto server = makeServerSync(cfg, ctx, dosGuardOverload, e);
@@ -405,7 +371,7 @@ TEST_F(WebServerTestsWithMockPrometheus, WsPayloadOverload)
); );
} }
TEST_F(WebServerTestsWithMockPrometheus, WsTooManyConnection) TEST_F(WebServerTest, WsTooManyConnection)
{ {
auto e = std::make_shared<EchoExecutor>(); auto e = std::make_shared<EchoExecutor>();
auto server = makeServerSync(cfg, ctx, dosGuardOverload, e); auto server = makeServerSync(cfg, ctx, dosGuardOverload, e);
@@ -511,17 +477,10 @@ struct WebServerAdminTestParams {
std::string expectedResponse; std::string expectedResponse;
}; };
class WebServerAdminTest : public WebServerTest, class WebServerAdminTest : public WebServerTest, public ::testing::WithParamInterface<WebServerAdminTestParams> {};
public ::testing::WithParamInterface<WebServerAdminTestParams>,
public prometheus::WithMockPrometheus {};
TEST_P(WebServerAdminTest, WsAdminCheck) TEST_P(WebServerAdminTest, WsAdminCheck)
{ {
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
auto e = std::make_shared<AdminCheckExecutor>(); auto e = std::make_shared<AdminCheckExecutor>();
Config const serverConfig{boost::json::parse(GetParam().config)}; Config const serverConfig{boost::json::parse(GetParam().config)};
auto server = makeServerSync(serverConfig, ctx, dosGuardOverload, e); auto server = makeServerSync(serverConfig, ctx, dosGuardOverload, e);