Use separate IO context for socket IO (#168)

* Keep track of number of requests currently being processed
* Reject new requests when number of in flight requests exceeds a
  configurable limit
* Track time spent between request arrival and start of request
  processing

Signed-off-by: CJ Cobb <ccobb@ripple.com>

Co-authored-by: natenichols <natenichols@cox.net>
This commit is contained in:
CJ Cobb
2022-06-15 17:17:15 -04:00
committed by GitHub
parent b45b34edb1
commit d0ea9d20ab
12 changed files with 202 additions and 35 deletions

View File

@@ -58,6 +58,7 @@ target_sources(clio PRIVATE
src/rpc/RPC.cpp
src/rpc/RPCHelpers.cpp
src/rpc/Counters.cpp
src/rpc/WorkQueue.cpp
## RPC Methods
# Account
src/rpc/handlers/AccountChannels.cpp

View File

@@ -227,16 +227,16 @@ main(int argc, char* argv[])
? std::optional<std::reference_wrapper<ssl::context>>{ctx.value()}
: std::nullopt;
auto const threads = config->contains("workers")
? config->at("workers").as_int64()
: std::thread::hardware_concurrency();
auto const threads = config->contains("io_threads")
? config->at("io_threads").as_int64()
: 2;
if (threads <= 0)
{
BOOST_LOG_TRIVIAL(fatal) << "Workers is less than 0";
BOOST_LOG_TRIVIAL(fatal) << "io_threads is less than 0";
return EXIT_FAILURE;
}
BOOST_LOG_TRIVIAL(info) << "Number of workers = " << threads;
BOOST_LOG_TRIVIAL(info) << "Number of io threads = " << threads;
// io context to handle all incoming requests, as well as other things
// This is not the only io context in the application

11
src/rpc/WorkQueue.cpp Normal file
View File

@@ -0,0 +1,11 @@
#include <rpc/WorkQueue.h>
WorkQueue::WorkQueue(std::uint32_t numWorkers, uint32_t maxSize)
{
if (maxSize != 0)
maxSize_ = maxSize;
while (--numWorkers)
{
threads_.emplace_back([this] { ioc_.run(); });
}
}

81
src/rpc/WorkQueue.h Normal file
View File

@@ -0,0 +1,81 @@
#ifndef CLIO_WORK_QUEUE_H
#define CLIO_WORK_QUEUE_H
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/json.hpp>
#include <boost/log/trivial.hpp>
#include <memory>
#include <queue>
#include <shared_mutex>
#include <thread>
class WorkQueue
{
// these are cumulative for the lifetime of the process
std::atomic_uint64_t queued_ = 0;
std::atomic_uint64_t durationUs_ = 0;
std::atomic_uint64_t curSize_ = 0;
uint32_t maxSize_ = std::numeric_limits<uint32_t>::max();
public:
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
template <typename F>
bool
postCoro(F&& f, bool isWhiteListed)
{
if (curSize_ >= maxSize_ && !isWhiteListed)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__
<< " queue is full. rejecting job. current size = " << curSize_
<< " max size = " << maxSize_;
return false;
}
++curSize_;
auto start = std::chrono::system_clock::now();
// Each time we enqueue a job, we want to post a symmetrical job that
// will dequeue and run the job at the front of the job queue.
boost::asio::spawn(
ioc_,
[this, f = std::move(f), start](boost::asio::yield_context yield) {
auto run = std::chrono::system_clock::now();
auto wait =
std::chrono::duration_cast<std::chrono::microseconds>(
run - start)
.count();
// increment queued_ here, in the same place we implement
// durationUs_
++queued_;
durationUs_ += wait;
BOOST_LOG_TRIVIAL(debug) << "WorkQueue wait time = " << wait
<< " queue size = " << curSize_;
f(yield);
--curSize_;
});
return true;
}
// TODO: this is not actually being called. Wait for application refactor
boost::json::object
report()
{
boost::json::object obj;
obj["queued"] = queued_;
obj["queued_duration_us"] = durationUs_;
obj["current_queue_size"] = curSize_;
obj["max_queue_size"] = maxSize_;
return obj;
}
private:
std::vector<std::thread> threads_ = {};
boost::asio::io_context ioc_ = {};
std::optional<boost::asio::io_context::work> work_{ioc_};
};
#endif // CLIO_WORK_QUEUE_H

View File

@@ -92,6 +92,12 @@ public:
});
}
bool
isWhiteListed(std::string const& ip)
{
return whitelist_.contains(ip);
}
bool
isOk(std::string const& ip)
{

View File

@@ -20,6 +20,7 @@
#include <rpc/Counters.h>
#include <rpc/RPC.h>
#include <rpc/WorkQueue.h>
#include <vector>
#include <webserver/DOSGuard.h>
@@ -92,6 +93,7 @@ class HttpBase
std::shared_ptr<ReportingETL const> etl_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
WorkQueue& workQueue_;
send_lambda lambda_;
protected:
@@ -146,6 +148,7 @@ public:
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer buffer)
: ioc_(ioc)
, backend_(backend)
@@ -154,6 +157,7 @@ public:
, etl_(etl)
, dosGuard_(dosGuard)
, counters_(counters)
, workQueue_(queue)
, lambda_(*this)
, buffer_(std::move(buffer))
{
@@ -208,7 +212,8 @@ public:
balancer_,
etl_,
dosGuard_,
counters_);
counters_,
workQueue_);
}
auto ip = derived().ip();
@@ -220,22 +225,32 @@ public:
// Requests are handed using coroutines. Here we spawn a coroutine
// which will asynchronously handle a request.
boost::asio::spawn(
derived().stream().get_executor(),
[this, ip, session](boost::asio::yield_context yield) {
handle_request(
yield,
std::move(req_),
lambda_,
backend_,
subscriptions_,
balancer_,
etl_,
dosGuard_,
counters_,
*ip,
session);
});
if (!workQueue_.postCoro(
[this, ip, session](boost::asio::yield_context yield) {
handle_request(
yield,
std::move(req_),
lambda_,
backend_,
subscriptions_,
balancer_,
etl_,
dosGuard_,
counters_,
*ip,
session);
},
dosGuard_.isWhiteListed(*ip)))
{
http::response<http::string_body> res{
http::status::ok, req_.version()};
res.set(http::field::server, "clio-server-v0.0.0");
res.set(http::field::content_type, "application/json");
res.keep_alive(req_.keep_alive());
res.body() = "Server overloaded";
res.prepare_payload();
lambda_(std::move(res));
}
}
void

View File

@@ -25,6 +25,7 @@ public:
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer buffer)
: HttpBase<HttpSession>(
ioc,
@@ -34,6 +35,7 @@ public:
etl,
dosGuard,
counters,
queue,
std::move(buffer))
, stream_(std::move(socket))
{

View File

@@ -30,6 +30,7 @@ class Detector
std::shared_ptr<ReportingETL const> etl_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
WorkQueue& queue_;
boost::beast::flat_buffer buffer_;
public:
@@ -42,7 +43,8 @@ public:
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters)
RPC::Counters& counters,
WorkQueue& queue)
: ioc_(ioc)
, stream_(std::move(socket))
, ctx_(ctx)
@@ -52,6 +54,7 @@ public:
, etl_(etl)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
{
}
@@ -101,6 +104,7 @@ public:
etl_,
dosGuard_,
counters_,
queue_,
std::move(buffer_))
->run();
return;
@@ -116,6 +120,7 @@ public:
etl_,
dosGuard_,
counters_,
queue_,
std::move(buffer_))
->run();
}
@@ -132,7 +137,8 @@ make_websocket_session(
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters)
RPC::Counters& counters,
WorkQueue& queue)
{
std::make_shared<WsUpgrader>(
ioc,
@@ -143,6 +149,7 @@ make_websocket_session(
etl,
dosGuard,
counters,
queue,
std::move(buffer),
std::move(req))
->run();
@@ -159,7 +166,8 @@ make_websocket_session(
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters)
RPC::Counters& counters,
WorkQueue& queue)
{
std::make_shared<SslWsUpgrader>(
ioc,
@@ -170,6 +178,7 @@ make_websocket_session(
etl,
dosGuard,
counters,
queue,
std::move(buffer),
std::move(req))
->run();
@@ -190,11 +199,14 @@ class Listener
std::shared_ptr<ETLLoadBalancer> balancer_;
std::shared_ptr<ReportingETL const> etl_;
DOSGuard& dosGuard_;
WorkQueue queue_;
RPC::Counters counters_;
public:
Listener(
boost::asio::io_context& ioc,
uint32_t numWorkerThreads,
uint32_t maxQueueSize,
std::optional<std::reference_wrapper<ssl::context>> ctx,
tcp::endpoint endpoint,
std::shared_ptr<BackendInterface const> backend,
@@ -210,6 +222,7 @@ public:
, balancer_(balancer)
, etl_(etl)
, dosGuard_(dosGuard)
, queue_(numWorkerThreads, maxQueueSize)
{
boost::beast::error_code ec;
@@ -271,7 +284,8 @@ private:
balancer_,
etl_,
dosGuard_,
counters_)
counters_,
queue_)
->run();
}
@@ -306,8 +320,19 @@ make_HttpServer(
auto const port =
static_cast<unsigned short>(serverConfig.at("port").as_int64());
uint32_t numThreads = std::thread::hardware_concurrency();
if (serverConfig.contains("workers"))
numThreads = serverConfig.at("workers").as_int64();
uint32_t maxQueueSize = 0; // no max
if (serverConfig.contains("max_queue_size"))
maxQueueSize = serverConfig.at("max_queue_size").as_int64();
BOOST_LOG_TRIVIAL(info) << __func__ << " Number of workers = " << numThreads
<< ". Max queue size = " << maxQueueSize;
auto server = std::make_shared<HttpServer>(
ioc,
numThreads,
maxQueueSize,
sslCtx,
boost::asio::ip::tcp::endpoint{address, port},
backend,

View File

@@ -38,6 +38,7 @@ public:
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer&& buffer)
: WsSession(
ioc,
@@ -47,6 +48,7 @@ public:
etl,
dosGuard,
counters,
queue,
std::move(buffer))
, ws_(std::move(socket))
{
@@ -91,6 +93,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader>
std::shared_ptr<ReportingETL const> etl_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
WorkQueue& queue_;
http::request<http::string_body> req_;
public:
@@ -103,6 +106,7 @@ public:
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer&& b)
: ioc_(ioc)
, http_(std::move(socket))
@@ -113,6 +117,7 @@ public:
, etl_(etl)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
{
}
WsUpgrader(
@@ -124,6 +129,7 @@ public:
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer&& b,
http::request<http::string_body> req)
: ioc_(ioc)
@@ -135,6 +141,7 @@ public:
, etl_(etl)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
, req_(std::move(req))
{
}
@@ -190,6 +197,7 @@ private:
etl_,
dosGuard_,
counters_,
queue_,
std::move(buffer_))
->run(std::move(req_));
}

View File

@@ -26,6 +26,7 @@ public:
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer buffer)
: HttpBase<SslHttpSession>(
ioc,
@@ -35,6 +36,7 @@ public:
etl,
dosGuard,
counters,
queue,
std::move(buffer))
, stream_(std::move(socket), ctx)
{

View File

@@ -36,6 +36,7 @@ public:
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer&& b)
: WsSession(
ioc,
@@ -45,6 +46,7 @@ public:
etl,
dosGuard,
counters,
queue,
std::move(b))
, ws_(std::move(stream))
{
@@ -88,6 +90,7 @@ class SslWsUpgrader : public std::enable_shared_from_this<SslWsUpgrader>
std::shared_ptr<ReportingETL const> etl_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
WorkQueue& queue_;
http::request<http::string_body> req_;
public:
@@ -101,6 +104,7 @@ public:
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer&& b)
: ioc_(ioc)
, https_(std::move(socket), ctx)
@@ -111,6 +115,7 @@ public:
, etl_(etl)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
{
}
SslWsUpgrader(
@@ -122,6 +127,7 @@ public:
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer&& b,
http::request<http::string_body> req)
: ioc_(ioc)
@@ -133,6 +139,7 @@ public:
, etl_(etl)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
, req_(std::move(req))
{
}
@@ -203,6 +210,7 @@ private:
etl_,
dosGuard_,
counters_,
queue_,
std::move(buffer_))
->run(std::move(req_));
}

View File

@@ -12,6 +12,7 @@
#include <etl/ReportingETL.h>
#include <rpc/Counters.h>
#include <rpc/RPC.h>
#include <rpc/WorkQueue.h>
#include <subscriptions/Message.h>
#include <subscriptions/SubscriptionManager.h>
#include <webserver/DOSGuard.h>
@@ -86,6 +87,7 @@ class WsSession : public WsBase,
std::shared_ptr<ReportingETL const> etl_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
WorkQueue& queue_;
std::mutex mtx_;
bool sending_ = false;
@@ -115,6 +117,7 @@ public:
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer&& buffer)
: buffer_(std::move(buffer))
, ioc_(ioc)
@@ -124,6 +127,7 @@ public:
, etl_(etl)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
{
}
virtual ~WsSession()
@@ -364,25 +368,29 @@ public:
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " received request from ip = " << *ip;
if (!dosGuard_.isOk(*ip))
{
auto sendError = [&](auto&& msg) {
boost::json::object response;
response["error"] = "Too many requests. Slow down";
response["error"] = std::move(msg);
std::string responseStr = boost::json::serialize(response);
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << responseStr;
dosGuard_.add(*ip, responseStr.size());
send(std::move(responseStr));
};
if (!dosGuard_.isOk(*ip))
{
sendError("Too many requests. Slow down");
}
else
{
boost::asio::spawn(
derived().ws().get_executor(),
[m = std::move(msg), shared_this = shared_from_this()](
boost::asio::yield_context yield) {
shared_this->handle_request(std::move(m), yield);
});
if (!queue_.postCoro(
[m = std::move(msg), shared_this = shared_from_this()](
boost::asio::yield_context yield) {
shared_this->handle_request(std::move(m), yield);
},
dosGuard_.isWhiteListed(*ip)))
sendError("Server overloaded");
}
do_read();