From d0ea9d20ab44c289ab15bf60561d24897605b94c Mon Sep 17 00:00:00 2001 From: CJ Cobb <46455409+cjcobb23@users.noreply.github.com> Date: Wed, 15 Jun 2022 17:17:15 -0400 Subject: [PATCH] Use separate IO context for socket IO (#168) * Keep track of number of requests currently being processed * Reject new requests when number of in flight requests exceeds a configurable limit * Track time spent between request arrival and start of request processing Signed-off-by: CJ Cobb Co-authored-by: natenichols --- CMakeLists.txt | 1 + src/main/main.cpp | 10 ++--- src/rpc/WorkQueue.cpp | 11 +++++ src/rpc/WorkQueue.h | 81 ++++++++++++++++++++++++++++++++++ src/webserver/DOSGuard.h | 6 +++ src/webserver/HttpBase.h | 49 +++++++++++++------- src/webserver/HttpSession.h | 2 + src/webserver/Listener.h | 33 ++++++++++++-- src/webserver/PlainWsSession.h | 8 ++++ src/webserver/SslHttpSession.h | 2 + src/webserver/SslWsSession.h | 8 ++++ src/webserver/WsBase.h | 26 +++++++---- 12 files changed, 202 insertions(+), 35 deletions(-) create mode 100644 src/rpc/WorkQueue.cpp create mode 100644 src/rpc/WorkQueue.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6a64d84f..9531049e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -58,6 +58,7 @@ target_sources(clio PRIVATE src/rpc/RPC.cpp src/rpc/RPCHelpers.cpp src/rpc/Counters.cpp + src/rpc/WorkQueue.cpp ## RPC Methods # Account src/rpc/handlers/AccountChannels.cpp diff --git a/src/main/main.cpp b/src/main/main.cpp index 80bad852..18203371 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -227,16 +227,16 @@ main(int argc, char* argv[]) ? std::optional>{ctx.value()} : std::nullopt; - auto const threads = config->contains("workers") - ? config->at("workers").as_int64() - : std::thread::hardware_concurrency(); + auto const threads = config->contains("io_threads") + ? config->at("io_threads").as_int64() + : 2; if (threads <= 0) { - BOOST_LOG_TRIVIAL(fatal) << "Workers is less than 0"; + BOOST_LOG_TRIVIAL(fatal) << "io_threads is less than 0"; return EXIT_FAILURE; } - BOOST_LOG_TRIVIAL(info) << "Number of workers = " << threads; + BOOST_LOG_TRIVIAL(info) << "Number of io threads = " << threads; // io context to handle all incoming requests, as well as other things // This is not the only io context in the application diff --git a/src/rpc/WorkQueue.cpp b/src/rpc/WorkQueue.cpp new file mode 100644 index 00000000..e270e820 --- /dev/null +++ b/src/rpc/WorkQueue.cpp @@ -0,0 +1,11 @@ +#include + +WorkQueue::WorkQueue(std::uint32_t numWorkers, uint32_t maxSize) +{ + if (maxSize != 0) + maxSize_ = maxSize; + while (--numWorkers) + { + threads_.emplace_back([this] { ioc_.run(); }); + } +} diff --git a/src/rpc/WorkQueue.h b/src/rpc/WorkQueue.h new file mode 100644 index 00000000..236dd9ad --- /dev/null +++ b/src/rpc/WorkQueue.h @@ -0,0 +1,81 @@ +#ifndef CLIO_WORK_QUEUE_H +#define CLIO_WORK_QUEUE_H + +#include +#include +#include +#include + +#include +#include +#include +#include + +class WorkQueue +{ + // these are cumulative for the lifetime of the process + std::atomic_uint64_t queued_ = 0; + std::atomic_uint64_t durationUs_ = 0; + + std::atomic_uint64_t curSize_ = 0; + uint32_t maxSize_ = std::numeric_limits::max(); + +public: + WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0); + + template + bool + postCoro(F&& f, bool isWhiteListed) + { + if (curSize_ >= maxSize_ && !isWhiteListed) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ + << " queue is full. rejecting job. current size = " << curSize_ + << " max size = " << maxSize_; + return false; + } + ++curSize_; + auto start = std::chrono::system_clock::now(); + // Each time we enqueue a job, we want to post a symmetrical job that + // will dequeue and run the job at the front of the job queue. + boost::asio::spawn( + ioc_, + [this, f = std::move(f), start](boost::asio::yield_context yield) { + auto run = std::chrono::system_clock::now(); + auto wait = + std::chrono::duration_cast( + run - start) + .count(); + // increment queued_ here, in the same place we implement + // durationUs_ + ++queued_; + durationUs_ += wait; + BOOST_LOG_TRIVIAL(debug) << "WorkQueue wait time = " << wait + << " queue size = " << curSize_; + f(yield); + --curSize_; + }); + return true; + } + + // TODO: this is not actually being called. Wait for application refactor + boost::json::object + report() + { + boost::json::object obj; + obj["queued"] = queued_; + obj["queued_duration_us"] = durationUs_; + obj["current_queue_size"] = curSize_; + obj["max_queue_size"] = maxSize_; + return obj; + } + +private: + std::vector threads_ = {}; + + boost::asio::io_context ioc_ = {}; + std::optional work_{ioc_}; +}; + +#endif // CLIO_WORK_QUEUE_H diff --git a/src/webserver/DOSGuard.h b/src/webserver/DOSGuard.h index 2a5c0f5f..90fcf6c0 100644 --- a/src/webserver/DOSGuard.h +++ b/src/webserver/DOSGuard.h @@ -92,6 +92,12 @@ public: }); } + bool + isWhiteListed(std::string const& ip) + { + return whitelist_.contains(ip); + } + bool isOk(std::string const& ip) { diff --git a/src/webserver/HttpBase.h b/src/webserver/HttpBase.h index 0b16f12b..82fec61e 100644 --- a/src/webserver/HttpBase.h +++ b/src/webserver/HttpBase.h @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -92,6 +93,7 @@ class HttpBase std::shared_ptr etl_; DOSGuard& dosGuard_; RPC::Counters& counters_; + WorkQueue& workQueue_; send_lambda lambda_; protected: @@ -146,6 +148,7 @@ public: std::shared_ptr etl, DOSGuard& dosGuard, RPC::Counters& counters, + WorkQueue& queue, boost::beast::flat_buffer buffer) : ioc_(ioc) , backend_(backend) @@ -154,6 +157,7 @@ public: , etl_(etl) , dosGuard_(dosGuard) , counters_(counters) + , workQueue_(queue) , lambda_(*this) , buffer_(std::move(buffer)) { @@ -208,7 +212,8 @@ public: balancer_, etl_, dosGuard_, - counters_); + counters_, + workQueue_); } auto ip = derived().ip(); @@ -220,22 +225,32 @@ public: // Requests are handed using coroutines. Here we spawn a coroutine // which will asynchronously handle a request. - boost::asio::spawn( - derived().stream().get_executor(), - [this, ip, session](boost::asio::yield_context yield) { - handle_request( - yield, - std::move(req_), - lambda_, - backend_, - subscriptions_, - balancer_, - etl_, - dosGuard_, - counters_, - *ip, - session); - }); + if (!workQueue_.postCoro( + [this, ip, session](boost::asio::yield_context yield) { + handle_request( + yield, + std::move(req_), + lambda_, + backend_, + subscriptions_, + balancer_, + etl_, + dosGuard_, + counters_, + *ip, + session); + }, + dosGuard_.isWhiteListed(*ip))) + { + http::response res{ + http::status::ok, req_.version()}; + res.set(http::field::server, "clio-server-v0.0.0"); + res.set(http::field::content_type, "application/json"); + res.keep_alive(req_.keep_alive()); + res.body() = "Server overloaded"; + res.prepare_payload(); + lambda_(std::move(res)); + } } void diff --git a/src/webserver/HttpSession.h b/src/webserver/HttpSession.h index e88769ea..1cdf5ed7 100644 --- a/src/webserver/HttpSession.h +++ b/src/webserver/HttpSession.h @@ -25,6 +25,7 @@ public: std::shared_ptr etl, DOSGuard& dosGuard, RPC::Counters& counters, + WorkQueue& queue, boost::beast::flat_buffer buffer) : HttpBase( ioc, @@ -34,6 +35,7 @@ public: etl, dosGuard, counters, + queue, std::move(buffer)) , stream_(std::move(socket)) { diff --git a/src/webserver/Listener.h b/src/webserver/Listener.h index fe8d2b89..2f7a2c3f 100644 --- a/src/webserver/Listener.h +++ b/src/webserver/Listener.h @@ -30,6 +30,7 @@ class Detector std::shared_ptr etl_; DOSGuard& dosGuard_; RPC::Counters& counters_; + WorkQueue& queue_; boost::beast::flat_buffer buffer_; public: @@ -42,7 +43,8 @@ public: std::shared_ptr balancer, std::shared_ptr etl, DOSGuard& dosGuard, - RPC::Counters& counters) + RPC::Counters& counters, + WorkQueue& queue) : ioc_(ioc) , stream_(std::move(socket)) , ctx_(ctx) @@ -52,6 +54,7 @@ public: , etl_(etl) , dosGuard_(dosGuard) , counters_(counters) + , queue_(queue) { } @@ -101,6 +104,7 @@ public: etl_, dosGuard_, counters_, + queue_, std::move(buffer_)) ->run(); return; @@ -116,6 +120,7 @@ public: etl_, dosGuard_, counters_, + queue_, std::move(buffer_)) ->run(); } @@ -132,7 +137,8 @@ make_websocket_session( std::shared_ptr balancer, std::shared_ptr etl, DOSGuard& dosGuard, - RPC::Counters& counters) + RPC::Counters& counters, + WorkQueue& queue) { std::make_shared( ioc, @@ -143,6 +149,7 @@ make_websocket_session( etl, dosGuard, counters, + queue, std::move(buffer), std::move(req)) ->run(); @@ -159,7 +166,8 @@ make_websocket_session( std::shared_ptr balancer, std::shared_ptr etl, DOSGuard& dosGuard, - RPC::Counters& counters) + RPC::Counters& counters, + WorkQueue& queue) { std::make_shared( ioc, @@ -170,6 +178,7 @@ make_websocket_session( etl, dosGuard, counters, + queue, std::move(buffer), std::move(req)) ->run(); @@ -190,11 +199,14 @@ class Listener std::shared_ptr balancer_; std::shared_ptr etl_; DOSGuard& dosGuard_; + WorkQueue queue_; RPC::Counters counters_; public: Listener( boost::asio::io_context& ioc, + uint32_t numWorkerThreads, + uint32_t maxQueueSize, std::optional> ctx, tcp::endpoint endpoint, std::shared_ptr backend, @@ -210,6 +222,7 @@ public: , balancer_(balancer) , etl_(etl) , dosGuard_(dosGuard) + , queue_(numWorkerThreads, maxQueueSize) { boost::beast::error_code ec; @@ -271,7 +284,8 @@ private: balancer_, etl_, dosGuard_, - counters_) + counters_, + queue_) ->run(); } @@ -306,8 +320,19 @@ make_HttpServer( auto const port = static_cast(serverConfig.at("port").as_int64()); + uint32_t numThreads = std::thread::hardware_concurrency(); + if (serverConfig.contains("workers")) + numThreads = serverConfig.at("workers").as_int64(); + uint32_t maxQueueSize = 0; // no max + if (serverConfig.contains("max_queue_size")) + maxQueueSize = serverConfig.at("max_queue_size").as_int64(); + BOOST_LOG_TRIVIAL(info) << __func__ << " Number of workers = " << numThreads + << ". Max queue size = " << maxQueueSize; + auto server = std::make_shared( ioc, + numThreads, + maxQueueSize, sslCtx, boost::asio::ip::tcp::endpoint{address, port}, backend, diff --git a/src/webserver/PlainWsSession.h b/src/webserver/PlainWsSession.h index c39324a2..045ed590 100644 --- a/src/webserver/PlainWsSession.h +++ b/src/webserver/PlainWsSession.h @@ -38,6 +38,7 @@ public: std::shared_ptr etl, DOSGuard& dosGuard, RPC::Counters& counters, + WorkQueue& queue, boost::beast::flat_buffer&& buffer) : WsSession( ioc, @@ -47,6 +48,7 @@ public: etl, dosGuard, counters, + queue, std::move(buffer)) , ws_(std::move(socket)) { @@ -91,6 +93,7 @@ class WsUpgrader : public std::enable_shared_from_this std::shared_ptr etl_; DOSGuard& dosGuard_; RPC::Counters& counters_; + WorkQueue& queue_; http::request req_; public: @@ -103,6 +106,7 @@ public: std::shared_ptr etl, DOSGuard& dosGuard, RPC::Counters& counters, + WorkQueue& queue, boost::beast::flat_buffer&& b) : ioc_(ioc) , http_(std::move(socket)) @@ -113,6 +117,7 @@ public: , etl_(etl) , dosGuard_(dosGuard) , counters_(counters) + , queue_(queue) { } WsUpgrader( @@ -124,6 +129,7 @@ public: std::shared_ptr etl, DOSGuard& dosGuard, RPC::Counters& counters, + WorkQueue& queue, boost::beast::flat_buffer&& b, http::request req) : ioc_(ioc) @@ -135,6 +141,7 @@ public: , etl_(etl) , dosGuard_(dosGuard) , counters_(counters) + , queue_(queue) , req_(std::move(req)) { } @@ -190,6 +197,7 @@ private: etl_, dosGuard_, counters_, + queue_, std::move(buffer_)) ->run(std::move(req_)); } diff --git a/src/webserver/SslHttpSession.h b/src/webserver/SslHttpSession.h index 19a700c2..8f763578 100644 --- a/src/webserver/SslHttpSession.h +++ b/src/webserver/SslHttpSession.h @@ -26,6 +26,7 @@ public: std::shared_ptr etl, DOSGuard& dosGuard, RPC::Counters& counters, + WorkQueue& queue, boost::beast::flat_buffer buffer) : HttpBase( ioc, @@ -35,6 +36,7 @@ public: etl, dosGuard, counters, + queue, std::move(buffer)) , stream_(std::move(socket), ctx) { diff --git a/src/webserver/SslWsSession.h b/src/webserver/SslWsSession.h index 5f34758e..92213076 100644 --- a/src/webserver/SslWsSession.h +++ b/src/webserver/SslWsSession.h @@ -36,6 +36,7 @@ public: std::shared_ptr etl, DOSGuard& dosGuard, RPC::Counters& counters, + WorkQueue& queue, boost::beast::flat_buffer&& b) : WsSession( ioc, @@ -45,6 +46,7 @@ public: etl, dosGuard, counters, + queue, std::move(b)) , ws_(std::move(stream)) { @@ -88,6 +90,7 @@ class SslWsUpgrader : public std::enable_shared_from_this std::shared_ptr etl_; DOSGuard& dosGuard_; RPC::Counters& counters_; + WorkQueue& queue_; http::request req_; public: @@ -101,6 +104,7 @@ public: std::shared_ptr etl, DOSGuard& dosGuard, RPC::Counters& counters, + WorkQueue& queue, boost::beast::flat_buffer&& b) : ioc_(ioc) , https_(std::move(socket), ctx) @@ -111,6 +115,7 @@ public: , etl_(etl) , dosGuard_(dosGuard) , counters_(counters) + , queue_(queue) { } SslWsUpgrader( @@ -122,6 +127,7 @@ public: std::shared_ptr etl, DOSGuard& dosGuard, RPC::Counters& counters, + WorkQueue& queue, boost::beast::flat_buffer&& b, http::request req) : ioc_(ioc) @@ -133,6 +139,7 @@ public: , etl_(etl) , dosGuard_(dosGuard) , counters_(counters) + , queue_(queue) , req_(std::move(req)) { } @@ -203,6 +210,7 @@ private: etl_, dosGuard_, counters_, + queue_, std::move(buffer_)) ->run(std::move(req_)); } diff --git a/src/webserver/WsBase.h b/src/webserver/WsBase.h index 2d4d313f..ba267ed6 100644 --- a/src/webserver/WsBase.h +++ b/src/webserver/WsBase.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -86,6 +87,7 @@ class WsSession : public WsBase, std::shared_ptr etl_; DOSGuard& dosGuard_; RPC::Counters& counters_; + WorkQueue& queue_; std::mutex mtx_; bool sending_ = false; @@ -115,6 +117,7 @@ public: std::shared_ptr etl, DOSGuard& dosGuard, RPC::Counters& counters, + WorkQueue& queue, boost::beast::flat_buffer&& buffer) : buffer_(std::move(buffer)) , ioc_(ioc) @@ -124,6 +127,7 @@ public: , etl_(etl) , dosGuard_(dosGuard) , counters_(counters) + , queue_(queue) { } virtual ~WsSession() @@ -364,25 +368,29 @@ public: BOOST_LOG_TRIVIAL(debug) << __func__ << " received request from ip = " << *ip; - if (!dosGuard_.isOk(*ip)) - { + auto sendError = [&](auto&& msg) { boost::json::object response; - response["error"] = "Too many requests. Slow down"; + response["error"] = std::move(msg); std::string responseStr = boost::json::serialize(response); BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << responseStr; dosGuard_.add(*ip, responseStr.size()); send(std::move(responseStr)); + }; + if (!dosGuard_.isOk(*ip)) + { + sendError("Too many requests. Slow down"); } else { - boost::asio::spawn( - derived().ws().get_executor(), - [m = std::move(msg), shared_this = shared_from_this()]( - boost::asio::yield_context yield) { - shared_this->handle_request(std::move(m), yield); - }); + if (!queue_.postCoro( + [m = std::move(msg), shared_this = shared_from_this()]( + boost::asio::yield_context yield) { + shared_this->handle_request(std::move(m), yield); + }, + dosGuard_.isWhiteListed(*ip))) + sendError("Server overloaded"); } do_read();