mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Make Websocket send queue configurable
This commit is contained in:
committed by
Scott Schurr
parent
00c60d408a
commit
2e5ab4e0e3
@@ -273,6 +273,12 @@
|
|||||||
# keep rippled from connecting to other instances of rippled or
|
# keep rippled from connecting to other instances of rippled or
|
||||||
# prevent RPC and WebSocket clients from connecting.
|
# prevent RPC and WebSocket clients from connecting.
|
||||||
#
|
#
|
||||||
|
# send_queue_limit = = [1..65535]
|
||||||
|
#
|
||||||
|
# A Websocket will disconnect when its send queue exceeds this limit.
|
||||||
|
# The default is 100. A larger value may help with erratic disconnects but
|
||||||
|
# may adversely affect server performance.
|
||||||
|
#
|
||||||
# WebSocket permessage-deflate extension options
|
# WebSocket permessage-deflate extension options
|
||||||
#
|
#
|
||||||
# These settings configure the optional permessage-deflate extension
|
# These settings configure the optional permessage-deflate extension
|
||||||
@@ -294,7 +300,7 @@
|
|||||||
# extension negotiation. For precise definitions of these fields please see
|
# extension negotiation. For precise definitions of these fields please see
|
||||||
# the RFC 7692, "Compression Extensions for WebSocket":
|
# the RFC 7692, "Compression Extensions for WebSocket":
|
||||||
# https://tools.ietf.org/html/rfc7692
|
# https://tools.ietf.org/html/rfc7692
|
||||||
#
|
#
|
||||||
# compress_level = [0..9]
|
# compress_level = [0..9]
|
||||||
#
|
#
|
||||||
# When set, determines the amount of compression attempted, where 0 is
|
# When set, determines the amount of compression attempted, where 0 is
|
||||||
|
|||||||
@@ -832,6 +832,7 @@ to_Port(ParsedPort const& parsed, std::ostream& log)
|
|||||||
p.ssl_chain = parsed.ssl_chain;
|
p.ssl_chain = parsed.ssl_chain;
|
||||||
p.ssl_ciphers = parsed.ssl_ciphers;
|
p.ssl_ciphers = parsed.ssl_ciphers;
|
||||||
p.pmd_options = parsed.pmd_options;
|
p.pmd_options = parsed.pmd_options;
|
||||||
|
p.ws_queue_limit = parsed.ws_queue_limit;
|
||||||
|
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,6 +58,9 @@ struct Port
|
|||||||
// port in the range [0, 65535] where 0 means unlimited.
|
// port in the range [0, 65535] where 0 means unlimited.
|
||||||
int limit = 0;
|
int limit = 0;
|
||||||
|
|
||||||
|
// Websocket disconnects if send queue exceeds this limit
|
||||||
|
std::uint16_t ws_queue_limit;
|
||||||
|
|
||||||
// Returns `true` if any websocket protocols are specified
|
// Returns `true` if any websocket protocols are specified
|
||||||
bool websockets() const;
|
bool websockets() const;
|
||||||
|
|
||||||
@@ -87,6 +90,7 @@ struct ParsedPort
|
|||||||
std::string ssl_ciphers;
|
std::string ssl_ciphers;
|
||||||
beast::websocket::permessage_deflate pmd_options;
|
beast::websocket::permessage_deflate pmd_options;
|
||||||
int limit = 0;
|
int limit = 0;
|
||||||
|
std::uint16_t ws_queue_limit;
|
||||||
|
|
||||||
boost::optional<boost::asio::ip::address> ip;
|
boost::optional<boost::asio::ip::address> ip;
|
||||||
boost::optional<std::uint16_t> port;
|
boost::optional<std::uint16_t> port;
|
||||||
|
|||||||
@@ -217,21 +217,18 @@ void
|
|||||||
BaseWSPeer<Handler, Impl>::
|
BaseWSPeer<Handler, Impl>::
|
||||||
send(std::shared_ptr<WSMsg> w)
|
send(std::shared_ptr<WSMsg> w)
|
||||||
{
|
{
|
||||||
// Maximum send queue size
|
|
||||||
static std::size_t constexpr limit = 100;
|
|
||||||
if(! strand_.running_in_this_thread())
|
if(! strand_.running_in_this_thread())
|
||||||
return strand_.post(std::bind(
|
return strand_.post(std::bind(
|
||||||
&BaseWSPeer::send, impl().shared_from_this(),
|
&BaseWSPeer::send, impl().shared_from_this(),
|
||||||
std::move(w)));
|
std::move(w)));
|
||||||
if(do_close_)
|
if(do_close_)
|
||||||
return;
|
return;
|
||||||
if(wq_.size() >= limit)
|
if(wq_.size() > port().ws_queue_limit)
|
||||||
{
|
{
|
||||||
JLOG(this->j_.info()) <<
|
JLOG(this->j_.info()) <<
|
||||||
"closing slow client";
|
"closing slow client";
|
||||||
cr_.code = static_cast<beast::websocket::close_code::value>(4000);
|
cr_.code = static_cast<beast::websocket::close_code::value>(4000);
|
||||||
cr_.reason = "Client is too slow.";
|
cr_.reason = "Client is too slow.";
|
||||||
static_assert(limit >= 1, "");
|
|
||||||
wq_.erase(std::next(wq_.begin()), wq_.end());
|
wq_.erase(std::next(wq_.begin()), wq_.end());
|
||||||
close();
|
close();
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -205,6 +205,34 @@ parse_Port (ParsedPort& port, Section const& section, std::ostream& log)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
auto const result = section.find("send_queue_limit");
|
||||||
|
if (result.second)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
port.ws_queue_limit =
|
||||||
|
beast::lexicalCastThrow<std::uint16_t>(result.first);
|
||||||
|
|
||||||
|
// Queue must be greater than 0
|
||||||
|
if (port.ws_queue_limit == 0)
|
||||||
|
Throw<std::exception>();
|
||||||
|
}
|
||||||
|
catch (std::exception const&)
|
||||||
|
{
|
||||||
|
log <<
|
||||||
|
"Invalid value '" << result.first << "' for key " <<
|
||||||
|
"'send_queue_limit' in [" << section.name() << "]\n";
|
||||||
|
Rethrow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Default Websocket send queue size limit
|
||||||
|
port.ws_queue_limit = 100;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
populate (section, "admin", log, port.admin_ip, true, {});
|
populate (section, "admin", log, port.admin_ip, true, {});
|
||||||
populate (section, "secure_gateway", log, port.secure_gateway_ip, false,
|
populate (section, "secure_gateway", log, port.secure_gateway_ip, false,
|
||||||
port.admin_ip.get_value_or({}));
|
port.admin_ip.get_value_or({}));
|
||||||
|
|||||||
Reference in New Issue
Block a user