From 2e5ab4e0e3d828e23242e5b35873b6efa78401c9 Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Fri, 31 Mar 2017 20:23:19 -0400 Subject: [PATCH] Make Websocket send queue configurable --- doc/rippled-example.cfg | 8 ++++++- src/ripple/rpc/impl/ServerHandlerImp.cpp | 1 + src/ripple/server/Port.h | 4 ++++ src/ripple/server/impl/BaseWSPeer.h | 5 +---- src/ripple/server/impl/Port.cpp | 28 ++++++++++++++++++++++++ 5 files changed, 41 insertions(+), 5 deletions(-) diff --git a/doc/rippled-example.cfg b/doc/rippled-example.cfg index d3a3293c29..b9cb9898cd 100644 --- a/doc/rippled-example.cfg +++ b/doc/rippled-example.cfg @@ -273,6 +273,12 @@ # keep rippled from connecting to other instances of rippled or # prevent RPC and WebSocket clients from connecting. # +# send_queue_limit = = [1..65535] +# +# A Websocket will disconnect when its send queue exceeds this limit. +# The default is 100. A larger value may help with erratic disconnects but +# may adversely affect server performance. +# # WebSocket permessage-deflate extension options # # These settings configure the optional permessage-deflate extension @@ -294,7 +300,7 @@ # extension negotiation. For precise definitions of these fields please see # the RFC 7692, "Compression Extensions for WebSocket": # https://tools.ietf.org/html/rfc7692 -# +# # compress_level = [0..9] # # When set, determines the amount of compression attempted, where 0 is diff --git a/src/ripple/rpc/impl/ServerHandlerImp.cpp b/src/ripple/rpc/impl/ServerHandlerImp.cpp index 81f5d62e41..338201a60e 100644 --- a/src/ripple/rpc/impl/ServerHandlerImp.cpp +++ b/src/ripple/rpc/impl/ServerHandlerImp.cpp @@ -832,6 +832,7 @@ to_Port(ParsedPort const& parsed, std::ostream& log) p.ssl_chain = parsed.ssl_chain; p.ssl_ciphers = parsed.ssl_ciphers; p.pmd_options = parsed.pmd_options; + p.ws_queue_limit = parsed.ws_queue_limit; return p; } diff --git a/src/ripple/server/Port.h b/src/ripple/server/Port.h index b6b2bc8fa2..527d26554b 100644 --- a/src/ripple/server/Port.h +++ b/src/ripple/server/Port.h @@ -58,6 +58,9 @@ struct Port // port in the range [0, 65535] where 0 means unlimited. int limit = 0; + // Websocket disconnects if send queue exceeds this limit + std::uint16_t ws_queue_limit; + // Returns `true` if any websocket protocols are specified bool websockets() const; @@ -87,6 +90,7 @@ struct ParsedPort std::string ssl_ciphers; beast::websocket::permessage_deflate pmd_options; int limit = 0; + std::uint16_t ws_queue_limit; boost::optional ip; boost::optional port; diff --git a/src/ripple/server/impl/BaseWSPeer.h b/src/ripple/server/impl/BaseWSPeer.h index 124026e5af..5e9946837b 100644 --- a/src/ripple/server/impl/BaseWSPeer.h +++ b/src/ripple/server/impl/BaseWSPeer.h @@ -217,21 +217,18 @@ void BaseWSPeer:: send(std::shared_ptr w) { - // Maximum send queue size - static std::size_t constexpr limit = 100; if(! strand_.running_in_this_thread()) return strand_.post(std::bind( &BaseWSPeer::send, impl().shared_from_this(), std::move(w))); if(do_close_) return; - if(wq_.size() >= limit) + if(wq_.size() > port().ws_queue_limit) { JLOG(this->j_.info()) << "closing slow client"; cr_.code = static_cast(4000); cr_.reason = "Client is too slow."; - static_assert(limit >= 1, ""); wq_.erase(std::next(wq_.begin()), wq_.end()); close(); return; diff --git a/src/ripple/server/impl/Port.cpp b/src/ripple/server/impl/Port.cpp index b574af160a..1d85885871 100644 --- a/src/ripple/server/impl/Port.cpp +++ b/src/ripple/server/impl/Port.cpp @@ -205,6 +205,34 @@ parse_Port (ParsedPort& port, Section const& section, std::ostream& log) } } + { + auto const result = section.find("send_queue_limit"); + if (result.second) + { + try + { + port.ws_queue_limit = + beast::lexicalCastThrow(result.first); + + // Queue must be greater than 0 + if (port.ws_queue_limit == 0) + Throw(); + } + catch (std::exception const&) + { + log << + "Invalid value '" << result.first << "' for key " << + "'send_queue_limit' in [" << section.name() << "]\n"; + Rethrow(); + } + } + else + { + // Default Websocket send queue size limit + port.ws_queue_limit = 100; + } + } + populate (section, "admin", log, port.admin_ip, true, {}); populate (section, "secure_gateway", log, port.secure_gateway_ip, false, port.admin_ip.get_value_or({}));