Report RPC counts in server_info (#72)

This commit is contained in:
Nathan Nichols
2022-01-05 11:58:17 -06:00
committed by GitHub
parent 2f0b3235ee
commit 41e412302b
13 changed files with 240 additions and 18 deletions

76
src/rpc/Counters.cpp Normal file
View File

@@ -0,0 +1,76 @@
#include <rpc/Counters.h>
namespace RPC
{
void
Counters::initializeCounter(std::string const& method)
{
std::shared_lock lk(mutex_);
if(methodInfo_.count(method) == 0)
{
lk.unlock();
std::unique_lock ulk(mutex_);
// This calls the default constructor for methodInfo of the method.
methodInfo_[method];
}
}
void
Counters::rpcErrored(std::string const& method)
{
initializeCounter(method);
std::shared_lock lk(mutex_);
MethodInfo& counters = methodInfo_[method];
counters.started++;
counters.errored++;
}
void
Counters::rpcComplete(
std::string const& method,
std::chrono::microseconds const& rpcDuration)
{
initializeCounter(method);
std::shared_lock lk(mutex_);
MethodInfo& counters = methodInfo_[method];
counters.started++;
counters.finished++;
counters.duration += rpcDuration.count();
}
void
Counters::rpcForwarded(std::string const& method)
{
initializeCounter(method);
std::shared_lock lk(mutex_);
MethodInfo& counters = methodInfo_[method];
counters.forwarded++;
}
boost::json::object
Counters::report()
{
std::shared_lock lk(mutex_);
boost::json::object obj = {};
for (auto const& [method, info] : methodInfo_)
{
boost::json::object counters = {};
counters["started"] = std::to_string(info.started);
counters["finished"] = std::to_string(info.finished);
counters["errored"] = std::to_string(info.errored);
counters["forwarded"] = std::to_string(info.forwarded);
counters["duration_us"] = std::to_string(info.duration);
obj[method] = std::move(counters);
}
return obj;
}
} // namespace RPC

53
src/rpc/Counters.h Normal file
View File

@@ -0,0 +1,53 @@
#ifndef RPC_COUNTERS_H
#define RPC_COUNTERS_H
#include <chrono>
#include <cstdint>
#include <string>
#include <shared_mutex>
#include <boost/json.hpp>
namespace RPC
{
class Counters
{
private:
struct MethodInfo
{
MethodInfo() = default;
std::atomic_uint started{0};
std::atomic_uint finished{0};
std::atomic_uint errored{0};
std::atomic_uint forwarded{0};
std::atomic_uint duration{0};
};
void
initializeCounter(std::string const& method);
std::shared_mutex mutex_;
std::unordered_map<std::string, MethodInfo> methodInfo_;
public:
Counters() = default;
void
rpcErrored(std::string const& method);
void
rpcComplete(
std::string const& method,
std::chrono::microseconds const& rpcDuration);
void
rpcForwarded(std::string const& method);
boost::json::object
report();
};
} // namespace RPCs
#endif // RPC_COUNTERS_H

View File

@@ -10,7 +10,8 @@ make_WsContext(
std::shared_ptr<SubscriptionManager> const& subscriptions,
std::shared_ptr<ETLLoadBalancer> const& balancer,
std::shared_ptr<WsBase> const& session,
Backend::LedgerRange const& range)
Backend::LedgerRange const& range,
Counters& counters)
{
if (!request.contains("command"))
return {};
@@ -18,7 +19,15 @@ make_WsContext(
std::string command = request.at("command").as_string().c_str();
return Context{
command, 1, request, backend, subscriptions, balancer, session, range};
command,
1,
request,
backend,
subscriptions,
balancer,
session,
range,
counters};
}
std::optional<Context>
@@ -27,7 +36,8 @@ make_HttpContext(
std::shared_ptr<BackendInterface const> const& backend,
std::shared_ptr<SubscriptionManager> const& subscriptions,
std::shared_ptr<ETLLoadBalancer> const& balancer,
Backend::LedgerRange const& range)
Backend::LedgerRange const& range,
RPC::Counters& counters)
{
if (!request.contains("method") || !request.at("method").is_string())
return {};
@@ -56,7 +66,8 @@ make_HttpContext(
subscriptions,
balancer,
nullptr,
range};
range,
counters};
}
boost::json::object
@@ -152,10 +163,14 @@ buildResponse(Context const& ctx)
if (shouldForwardToRippled(ctx))
{
auto res = ctx.balancer->forwardToRippled(ctx.params);
ctx.counters.rpcForwarded(ctx.method);
if (!res)
return Status{Error::rpcFAILED_TO_FORWARD};
return *res;
}
if (ctx.method == "ping")
return boost::json::object{};

View File

@@ -4,6 +4,7 @@
#include <ripple/protocol/ErrorCodes.h>
#include <boost/json.hpp>
#include <backend/BackendInterface.h>
#include <rpc/Counters.h>
#include <optional>
#include <string>
#include <variant>
@@ -37,6 +38,7 @@ struct Context
std::shared_ptr<SubscriptionManager> subscriptions;
std::shared_ptr<WsBase> session;
Backend::LedgerRange const& range;
Counters& counters;
Context(
std::string const& command_,
@@ -46,7 +48,8 @@ struct Context
std::shared_ptr<SubscriptionManager> const& subscriptions_,
std::shared_ptr<ETLLoadBalancer> const& balancer_,
std::shared_ptr<WsBase> const& session_,
Backend::LedgerRange const& range_)
Backend::LedgerRange const& range_,
Counters& counters_)
: method(command_)
, version(version_)
, params(params_)
@@ -55,6 +58,7 @@ struct Context
, balancer(balancer_)
, session(session_)
, range(range_)
, counters(counters_)
{
}
};
@@ -133,7 +137,8 @@ make_WsContext(
std::shared_ptr<SubscriptionManager> const& subscriptions,
std::shared_ptr<ETLLoadBalancer> const& balancer,
std::shared_ptr<WsBase> const& session,
Backend::LedgerRange const& range);
Backend::LedgerRange const& range,
Counters& counters);
std::optional<Context>
make_HttpContext(
@@ -141,7 +146,8 @@ make_HttpContext(
std::shared_ptr<BackendInterface const> const& backend,
std::shared_ptr<SubscriptionManager> const& subscriptions,
std::shared_ptr<ETLLoadBalancer> const& balancer,
Backend::LedgerRange const& range);
Backend::LedgerRange const& range,
Counters& counters);
Result
buildResponse(Context const& ctx);

View File

@@ -21,9 +21,14 @@ doServerInfo(Context const& context)
else
{
response["info"] = boost::json::object{};
response["info"].as_object()["complete_ledgers"] =
boost::json::object& info = response["info"].as_object();
info["complete_ledgers"] =
std::to_string(range->minSequence) + "-" +
std::to_string(range->maxSequence);
info["counters"] = boost::json::object{};
info["counters"].as_object()["rpc"] = context.counters.report();
}
auto serverInfoRippled = context.balancer->forwardToRippled(context.params);

View File

@@ -20,6 +20,7 @@
#include <rpc/RPC.h>
#include <vector>
#include <webserver/DOSGuard.h>
#include <rpc/Counters.h>
namespace http = boost::beast::http;
namespace net = boost::asio;
@@ -71,6 +72,7 @@ handle_request(
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
std::string const& ip)
{
auto const httpResponse = [&req](
@@ -140,7 +142,7 @@ handle_request(
RPC::make_error(RPC::Error::rpcNOT_READY))));
std::optional<RPC::Context> context =
RPC::make_HttpContext(request, backend, nullptr, balancer, *range);
RPC::make_HttpContext(request, backend, nullptr, balancer, *range, counters);
if (!context)
return send(httpResponse(
@@ -151,11 +153,16 @@ handle_request(
boost::json::object response{{"result", boost::json::object{}}};
boost::json::object& result = response["result"].as_object();
auto start = std::chrono::system_clock::now();
auto v = RPC::buildResponse(*context);
auto end = std::chrono::system_clock::now();
auto us = std::chrono::duration_cast<std::chrono::microseconds>(
end - start);
if (auto status = std::get_if<RPC::Status>(&v))
{
counters.rpcErrored(context->method);
auto error = RPC::make_error(*status);
error["request"] = request;
@@ -168,6 +175,7 @@ handle_request(
}
else
{
counters.rpcComplete(context->method, us);
result = std::get<boost::json::object>(v);
result["status"] = "success";
result["validated"] = true;
@@ -243,6 +251,7 @@ class HttpBase
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
send_lambda lambda_;
protected:
@@ -254,11 +263,13 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
boost::beast::flat_buffer buffer)
: backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, counters_(counters)
, lambda_(*this)
, buffer_(std::move(buffer))
{
@@ -308,14 +319,21 @@ public:
backend_,
subscriptions_,
balancer_,
dosGuard_);
dosGuard_,
counters_);
}
auto ip = derived().ip();
// Send the response
handle_request(
std::move(req_), lambda_, backend_, balancer_, dosGuard_, ip);
std::move(req_),
lambda_,
backend_,
balancer_,
dosGuard_,
counters_,
ip);
}
void

View File

@@ -22,12 +22,14 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
boost::beast::flat_buffer buffer)
: HttpBase<HttpSession>(
backend,
subscriptions,
balancer,
dosGuard,
counters,
std::move(buffer))
, stream_(std::move(socket))
{

View File

@@ -27,6 +27,7 @@ class Detector
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
boost::beast::flat_buffer buffer_;
public:
@@ -36,13 +37,15 @@ public:
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
DOSGuard& dosGuard,
RPC::Counters& counters)
: stream_(std::move(socket))
, ctx_(ctx)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, counters_(counters)
{
}
@@ -79,6 +82,7 @@ public:
subscriptions_,
balancer_,
dosGuard_,
counters_,
std::move(buffer_))
->run();
return;
@@ -91,6 +95,7 @@ public:
subscriptions_,
balancer_,
dosGuard_,
counters_,
std::move(buffer_))
->run();
}
@@ -104,7 +109,8 @@ make_websocket_session(
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
DOSGuard& dosGuard,
RPC::Counters& counters)
{
std::make_shared<WsUpgrader>(
std::move(stream),
@@ -112,6 +118,7 @@ make_websocket_session(
subscriptions,
balancer,
dosGuard,
counters,
std::move(buffer),
std::move(req))
->run();
@@ -125,7 +132,8 @@ make_websocket_session(
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
DOSGuard& dosGuard,
RPC::Counters& counters)
{
std::make_shared<SslWsUpgrader>(
std::move(stream),
@@ -133,6 +141,7 @@ make_websocket_session(
subscriptions,
balancer,
dosGuard,
counters,
std::move(buffer),
std::move(req))
->run();
@@ -152,6 +161,7 @@ class Listener
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
RPC::Counters counters_;
public:
Listener(
@@ -243,7 +253,8 @@ private:
backend_,
subscriptions_,
balancer_,
dosGuard_)
dosGuard_,
counters_)
->run();
}

View File

@@ -35,12 +35,14 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
boost::beast::flat_buffer&& buffer)
: WsSession(
backend,
subscriptions,
balancer,
dosGuard,
counters,
std::move(buffer))
, ws_(std::move(socket))
{
@@ -75,6 +77,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader>
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
http::request<http::string_body> req_;
public:
@@ -84,12 +87,14 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
boost::beast::flat_buffer&& b)
: http_(std::move(socket))
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, counters_(counters)
, buffer_(std::move(b))
{
}
@@ -99,6 +104,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
boost::beast::flat_buffer&& b,
http::request<http::string_body> req)
: http_(std::move(stream))
@@ -106,6 +112,7 @@ public:
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, counters_(counters)
, buffer_(std::move(b))
, req_(std::move(req))
{
@@ -159,6 +166,7 @@ private:
subscriptions_,
balancer_,
dosGuard_,
counters_,
std::move(buffer_))
->run(std::move(req_));
}

View File

@@ -23,12 +23,14 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
boost::beast::flat_buffer buffer)
: HttpBase<SslHttpSession>(
backend,
subscriptions,
balancer,
dosGuard,
counters,
std::move(buffer))
, stream_(std::move(socket), ctx)
{

View File

@@ -33,8 +33,15 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
boost::beast::flat_buffer&& b)
: WsSession(backend, subscriptions, balancer, dosGuard, std::move(b))
: WsSession(
backend,
subscriptions,
balancer,
dosGuard,
counters,
std::move(b))
, ws_(std::move(stream))
{
}
@@ -66,6 +73,7 @@ class SslWsUpgrader : public std::enable_shared_from_this<SslWsUpgrader>
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
http::request<http::string_body> req_;
public:
@@ -76,12 +84,14 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
boost::beast::flat_buffer&& b)
: https_(std::move(socket), ctx)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, counters_(counters)
, buffer_(std::move(b))
{
}
@@ -91,6 +101,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
boost::beast::flat_buffer&& b,
http::request<http::string_body> req)
: https_(std::move(stream))
@@ -98,6 +109,7 @@ public:
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, counters_(counters)
, buffer_(std::move(b))
, req_(std::move(req))
{
@@ -166,6 +178,7 @@ private:
subscriptions_,
balancer_,
dosGuard_,
counters_,
std::move(buffer_))
->run(std::move(req_));
}

View File

@@ -11,6 +11,7 @@
#include <etl/ETLSource.h>
#include <rpc/RPC.h>
#include <webserver/DOSGuard.h>
#include <rpc/Counters.h>
#include <subscriptions/SubscriptionManager.h>
namespace http = boost::beast::http;
@@ -86,6 +87,7 @@ class WsSession : public WsBase,
std::weak_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
std::mutex mtx_;
std::queue<std::string> messages_;
@@ -95,11 +97,13 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
RPC::Counters& counters,
boost::beast::flat_buffer&& buffer)
: backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, counters_(counters)
, buffer_(std::move(buffer))
{
}
@@ -238,7 +242,8 @@ public:
subscriptions_.lock(),
balancer_,
shared_from_this(),
*range);
*range,
counters_);
if (!context)
return sendError(RPC::Error::rpcBAD_SYNTAX);
@@ -250,10 +255,16 @@ public:
boost::json::object& result =
response["result"].as_object();
auto start = std::chrono::system_clock::now();
auto v = RPC::buildResponse(*context);
auto end = std::chrono::system_clock::now();
auto us =
std::chrono::duration_cast<std::chrono::microseconds>(
end - start);
if (auto status = std::get_if<RPC::Status>(&v))
{
counters_.rpcErrored(context->method);
auto error = RPC::make_error(*status);
if (!id.is_null())
@@ -263,6 +274,7 @@ public:
}
else
{
counters_.rpcComplete(context->method, us);
result = std::get<boost::json::object>(v);
}
}