diff --git a/CMakeLists.txt b/CMakeLists.txt index cd1eb392..6b619491 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,6 +38,7 @@ src/backend/PostgresBackend.cpp ## RPC src/rpc/RPC.cpp src/rpc/RPCHelpers.cpp + src/rpc/Counters.cpp ## RPC Methods # Account src/rpc/handlers/AccountChannels.cpp diff --git a/src/rpc/Counters.cpp b/src/rpc/Counters.cpp new file mode 100644 index 00000000..246790ce --- /dev/null +++ b/src/rpc/Counters.cpp @@ -0,0 +1,76 @@ +#include + +namespace RPC +{ + +void +Counters::initializeCounter(std::string const& method) +{ + std::shared_lock lk(mutex_); + if(methodInfo_.count(method) == 0) + { + lk.unlock(); + std::unique_lock ulk(mutex_); + + // This calls the default constructor for methodInfo of the method. + methodInfo_[method]; + } +} + +void +Counters::rpcErrored(std::string const& method) +{ + initializeCounter(method); + + std::shared_lock lk(mutex_); + MethodInfo& counters = methodInfo_[method]; + counters.started++; + counters.errored++; +} + +void +Counters::rpcComplete( + std::string const& method, + std::chrono::microseconds const& rpcDuration) +{ + initializeCounter(method); + + std::shared_lock lk(mutex_); + MethodInfo& counters = methodInfo_[method]; + counters.started++; + counters.finished++; + counters.duration += rpcDuration.count(); +} + +void +Counters::rpcForwarded(std::string const& method) +{ + initializeCounter(method); + + std::shared_lock lk(mutex_); + MethodInfo& counters = methodInfo_[method]; + counters.forwarded++; +} + +boost::json::object +Counters::report() +{ + std::shared_lock lk(mutex_); + boost::json::object obj = {}; + + for (auto const& [method, info] : methodInfo_) + { + boost::json::object counters = {}; + counters["started"] = std::to_string(info.started); + counters["finished"] = std::to_string(info.finished); + counters["errored"] = std::to_string(info.errored); + counters["forwarded"] = std::to_string(info.forwarded); + counters["duration_us"] = std::to_string(info.duration); + + obj[method] = std::move(counters); + } + + return obj; +} + +} // namespace RPC \ No newline at end of file diff --git a/src/rpc/Counters.h b/src/rpc/Counters.h new file mode 100644 index 00000000..ed8eb994 --- /dev/null +++ b/src/rpc/Counters.h @@ -0,0 +1,53 @@ +#ifndef RPC_COUNTERS_H +#define RPC_COUNTERS_H + +#include +#include +#include +#include +#include + +namespace RPC +{ + +class Counters +{ +private: + struct MethodInfo + { + MethodInfo() = default; + + std::atomic_uint started{0}; + std::atomic_uint finished{0}; + std::atomic_uint errored{0}; + std::atomic_uint forwarded{0}; + std::atomic_uint duration{0}; + }; + + void + initializeCounter(std::string const& method); + + std::shared_mutex mutex_; + std::unordered_map methodInfo_; + +public: + Counters() = default; + + void + rpcErrored(std::string const& method); + + void + rpcComplete( + std::string const& method, + std::chrono::microseconds const& rpcDuration); + + void + rpcForwarded(std::string const& method); + + boost::json::object + report(); +}; + +} // namespace RPCs + +#endif // RPC_COUNTERS_H \ No newline at end of file diff --git a/src/rpc/RPC.cpp b/src/rpc/RPC.cpp index 9f0744ae..89d6f2a8 100644 --- a/src/rpc/RPC.cpp +++ b/src/rpc/RPC.cpp @@ -10,7 +10,8 @@ make_WsContext( std::shared_ptr const& subscriptions, std::shared_ptr const& balancer, std::shared_ptr const& session, - Backend::LedgerRange const& range) + Backend::LedgerRange const& range, + Counters& counters) { if (!request.contains("command")) return {}; @@ -18,7 +19,15 @@ make_WsContext( std::string command = request.at("command").as_string().c_str(); return Context{ - command, 1, request, backend, subscriptions, balancer, session, range}; + command, + 1, + request, + backend, + subscriptions, + balancer, + session, + range, + counters}; } std::optional @@ -27,7 +36,8 @@ make_HttpContext( std::shared_ptr const& backend, std::shared_ptr const& subscriptions, std::shared_ptr const& balancer, - Backend::LedgerRange const& range) + Backend::LedgerRange const& range, + RPC::Counters& counters) { if (!request.contains("method") || !request.at("method").is_string()) return {}; @@ -56,7 +66,8 @@ make_HttpContext( subscriptions, balancer, nullptr, - range}; + range, + counters}; } boost::json::object @@ -152,10 +163,14 @@ buildResponse(Context const& ctx) if (shouldForwardToRippled(ctx)) { auto res = ctx.balancer->forwardToRippled(ctx.params); + + ctx.counters.rpcForwarded(ctx.method); + if (!res) return Status{Error::rpcFAILED_TO_FORWARD}; return *res; } + if (ctx.method == "ping") return boost::json::object{}; diff --git a/src/rpc/RPC.h b/src/rpc/RPC.h index cbb27333..4ce270d7 100644 --- a/src/rpc/RPC.h +++ b/src/rpc/RPC.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,7 @@ struct Context std::shared_ptr subscriptions; std::shared_ptr session; Backend::LedgerRange const& range; + Counters& counters; Context( std::string const& command_, @@ -46,7 +48,8 @@ struct Context std::shared_ptr const& subscriptions_, std::shared_ptr const& balancer_, std::shared_ptr const& session_, - Backend::LedgerRange const& range_) + Backend::LedgerRange const& range_, + Counters& counters_) : method(command_) , version(version_) , params(params_) @@ -55,6 +58,7 @@ struct Context , balancer(balancer_) , session(session_) , range(range_) + , counters(counters_) { } }; @@ -133,7 +137,8 @@ make_WsContext( std::shared_ptr const& subscriptions, std::shared_ptr const& balancer, std::shared_ptr const& session, - Backend::LedgerRange const& range); + Backend::LedgerRange const& range, + Counters& counters); std::optional make_HttpContext( @@ -141,7 +146,8 @@ make_HttpContext( std::shared_ptr const& backend, std::shared_ptr const& subscriptions, std::shared_ptr const& balancer, - Backend::LedgerRange const& range); + Backend::LedgerRange const& range, + Counters& counters); Result buildResponse(Context const& ctx); diff --git a/src/rpc/handlers/ServerInfo.cpp b/src/rpc/handlers/ServerInfo.cpp index f6fb0453..eb97eaf8 100644 --- a/src/rpc/handlers/ServerInfo.cpp +++ b/src/rpc/handlers/ServerInfo.cpp @@ -21,9 +21,14 @@ doServerInfo(Context const& context) else { response["info"] = boost::json::object{}; - response["info"].as_object()["complete_ledgers"] = + boost::json::object& info = response["info"].as_object(); + + info["complete_ledgers"] = std::to_string(range->minSequence) + "-" + std::to_string(range->maxSequence); + + info["counters"] = boost::json::object{}; + info["counters"].as_object()["rpc"] = context.counters.report(); } auto serverInfoRippled = context.balancer->forwardToRippled(context.params); diff --git a/src/webserver/HttpBase.h b/src/webserver/HttpBase.h index c2896f7a..5a0a24fa 100644 --- a/src/webserver/HttpBase.h +++ b/src/webserver/HttpBase.h @@ -20,6 +20,7 @@ #include #include #include +#include namespace http = boost::beast::http; namespace net = boost::asio; @@ -71,6 +72,7 @@ handle_request( std::shared_ptr backend, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, std::string const& ip) { auto const httpResponse = [&req]( @@ -140,7 +142,7 @@ handle_request( RPC::make_error(RPC::Error::rpcNOT_READY)))); std::optional context = - RPC::make_HttpContext(request, backend, nullptr, balancer, *range); + RPC::make_HttpContext(request, backend, nullptr, balancer, *range, counters); if (!context) return send(httpResponse( @@ -151,11 +153,16 @@ handle_request( boost::json::object response{{"result", boost::json::object{}}}; boost::json::object& result = response["result"].as_object(); - + + auto start = std::chrono::system_clock::now(); auto v = RPC::buildResponse(*context); + auto end = std::chrono::system_clock::now(); + auto us = std::chrono::duration_cast( + end - start); if (auto status = std::get_if(&v)) { + counters.rpcErrored(context->method); auto error = RPC::make_error(*status); error["request"] = request; @@ -168,6 +175,7 @@ handle_request( } else { + counters.rpcComplete(context->method, us); result = std::get(v); result["status"] = "success"; result["validated"] = true; @@ -243,6 +251,7 @@ class HttpBase std::shared_ptr subscriptions_; std::shared_ptr balancer_; DOSGuard& dosGuard_; + RPC::Counters& counters_; send_lambda lambda_; protected: @@ -254,11 +263,13 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, boost::beast::flat_buffer buffer) : backend_(backend) , subscriptions_(subscriptions) , balancer_(balancer) , dosGuard_(dosGuard) + , counters_(counters) , lambda_(*this) , buffer_(std::move(buffer)) { @@ -308,14 +319,21 @@ public: backend_, subscriptions_, balancer_, - dosGuard_); + dosGuard_, + counters_); } auto ip = derived().ip(); // Send the response handle_request( - std::move(req_), lambda_, backend_, balancer_, dosGuard_, ip); + std::move(req_), + lambda_, + backend_, + balancer_, + dosGuard_, + counters_, + ip); } void diff --git a/src/webserver/HttpSession.h b/src/webserver/HttpSession.h index bcd53413..0da64f75 100644 --- a/src/webserver/HttpSession.h +++ b/src/webserver/HttpSession.h @@ -22,12 +22,14 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, boost::beast::flat_buffer buffer) : HttpBase( backend, subscriptions, balancer, dosGuard, + counters, std::move(buffer)) , stream_(std::move(socket)) { diff --git a/src/webserver/Listener.h b/src/webserver/Listener.h index 91673625..46bed133 100644 --- a/src/webserver/Listener.h +++ b/src/webserver/Listener.h @@ -27,6 +27,7 @@ class Detector std::shared_ptr subscriptions_; std::shared_ptr balancer_; DOSGuard& dosGuard_; + RPC::Counters& counters_; boost::beast::flat_buffer buffer_; public: @@ -36,13 +37,15 @@ public: std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr balancer, - DOSGuard& dosGuard) + DOSGuard& dosGuard, + RPC::Counters& counters) : stream_(std::move(socket)) , ctx_(ctx) , backend_(backend) , subscriptions_(subscriptions) , balancer_(balancer) , dosGuard_(dosGuard) + , counters_(counters) { } @@ -79,6 +82,7 @@ public: subscriptions_, balancer_, dosGuard_, + counters_, std::move(buffer_)) ->run(); return; @@ -91,6 +95,7 @@ public: subscriptions_, balancer_, dosGuard_, + counters_, std::move(buffer_)) ->run(); } @@ -104,7 +109,8 @@ make_websocket_session( std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr balancer, - DOSGuard& dosGuard) + DOSGuard& dosGuard, + RPC::Counters& counters) { std::make_shared( std::move(stream), @@ -112,6 +118,7 @@ make_websocket_session( subscriptions, balancer, dosGuard, + counters, std::move(buffer), std::move(req)) ->run(); @@ -125,7 +132,8 @@ make_websocket_session( std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr balancer, - DOSGuard& dosGuard) + DOSGuard& dosGuard, + RPC::Counters& counters) { std::make_shared( std::move(stream), @@ -133,6 +141,7 @@ make_websocket_session( subscriptions, balancer, dosGuard, + counters, std::move(buffer), std::move(req)) ->run(); @@ -152,6 +161,7 @@ class Listener std::shared_ptr subscriptions_; std::shared_ptr balancer_; DOSGuard& dosGuard_; + RPC::Counters counters_; public: Listener( @@ -243,7 +253,8 @@ private: backend_, subscriptions_, balancer_, - dosGuard_) + dosGuard_, + counters_) ->run(); } diff --git a/src/webserver/PlainWsSession.h b/src/webserver/PlainWsSession.h index 692266d8..21052c14 100644 --- a/src/webserver/PlainWsSession.h +++ b/src/webserver/PlainWsSession.h @@ -35,12 +35,14 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, boost::beast::flat_buffer&& buffer) : WsSession( backend, subscriptions, balancer, dosGuard, + counters, std::move(buffer)) , ws_(std::move(socket)) { @@ -75,6 +77,7 @@ class WsUpgrader : public std::enable_shared_from_this std::shared_ptr subscriptions_; std::shared_ptr balancer_; DOSGuard& dosGuard_; + RPC::Counters& counters_; http::request req_; public: @@ -84,12 +87,14 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, boost::beast::flat_buffer&& b) : http_(std::move(socket)) , backend_(backend) , subscriptions_(subscriptions) , balancer_(balancer) , dosGuard_(dosGuard) + , counters_(counters) , buffer_(std::move(b)) { } @@ -99,6 +104,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, boost::beast::flat_buffer&& b, http::request req) : http_(std::move(stream)) @@ -106,6 +112,7 @@ public: , subscriptions_(subscriptions) , balancer_(balancer) , dosGuard_(dosGuard) + , counters_(counters) , buffer_(std::move(b)) , req_(std::move(req)) { @@ -159,6 +166,7 @@ private: subscriptions_, balancer_, dosGuard_, + counters_, std::move(buffer_)) ->run(std::move(req_)); } diff --git a/src/webserver/SslHttpSession.h b/src/webserver/SslHttpSession.h index 5e7483ef..fa4a80bb 100644 --- a/src/webserver/SslHttpSession.h +++ b/src/webserver/SslHttpSession.h @@ -23,12 +23,14 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, boost::beast::flat_buffer buffer) : HttpBase( backend, subscriptions, balancer, dosGuard, + counters, std::move(buffer)) , stream_(std::move(socket), ctx) { diff --git a/src/webserver/SslWsSession.h b/src/webserver/SslWsSession.h index 28b7aa81..761eea13 100644 --- a/src/webserver/SslWsSession.h +++ b/src/webserver/SslWsSession.h @@ -33,8 +33,15 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, boost::beast::flat_buffer&& b) - : WsSession(backend, subscriptions, balancer, dosGuard, std::move(b)) + : WsSession( + backend, + subscriptions, + balancer, + dosGuard, + counters, + std::move(b)) , ws_(std::move(stream)) { } @@ -66,6 +73,7 @@ class SslWsUpgrader : public std::enable_shared_from_this std::shared_ptr subscriptions_; std::shared_ptr balancer_; DOSGuard& dosGuard_; + RPC::Counters& counters_; http::request req_; public: @@ -76,12 +84,14 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, boost::beast::flat_buffer&& b) : https_(std::move(socket), ctx) , backend_(backend) , subscriptions_(subscriptions) , balancer_(balancer) , dosGuard_(dosGuard) + , counters_(counters) , buffer_(std::move(b)) { } @@ -91,6 +101,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, boost::beast::flat_buffer&& b, http::request req) : https_(std::move(stream)) @@ -98,6 +109,7 @@ public: , subscriptions_(subscriptions) , balancer_(balancer) , dosGuard_(dosGuard) + , counters_(counters) , buffer_(std::move(b)) , req_(std::move(req)) { @@ -166,6 +178,7 @@ private: subscriptions_, balancer_, dosGuard_, + counters_, std::move(buffer_)) ->run(std::move(req_)); } diff --git a/src/webserver/WsBase.h b/src/webserver/WsBase.h index 955278c9..e7c04c9c 100644 --- a/src/webserver/WsBase.h +++ b/src/webserver/WsBase.h @@ -11,6 +11,7 @@ #include #include #include +#include #include namespace http = boost::beast::http; @@ -86,6 +87,7 @@ class WsSession : public WsBase, std::weak_ptr subscriptions_; std::shared_ptr balancer_; DOSGuard& dosGuard_; + RPC::Counters& counters_; std::mutex mtx_; std::queue messages_; @@ -95,11 +97,13 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard, + RPC::Counters& counters, boost::beast::flat_buffer&& buffer) : backend_(backend) , subscriptions_(subscriptions) , balancer_(balancer) , dosGuard_(dosGuard) + , counters_(counters) , buffer_(std::move(buffer)) { } @@ -238,7 +242,8 @@ public: subscriptions_.lock(), balancer_, shared_from_this(), - *range); + *range, + counters_); if (!context) return sendError(RPC::Error::rpcBAD_SYNTAX); @@ -250,10 +255,16 @@ public: boost::json::object& result = response["result"].as_object(); + auto start = std::chrono::system_clock::now(); auto v = RPC::buildResponse(*context); + auto end = std::chrono::system_clock::now(); + auto us = + std::chrono::duration_cast( + end - start); if (auto status = std::get_if(&v)) { + counters_.rpcErrored(context->method); auto error = RPC::make_error(*status); if (!id.is_null()) @@ -263,6 +274,7 @@ public: } else { + counters_.rpcComplete(context->method, us); result = std::get(v); } }