mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-15 01:05:51 +00:00
feat: Add load balancer prometheus metrics (#2096)
Fix: https://github.com/XRPLF/clio/issues/2070
This commit is contained in:
@@ -28,12 +28,14 @@
|
|||||||
#include "rpc/Errors.hpp"
|
#include "rpc/Errors.hpp"
|
||||||
#include "util/Assert.hpp"
|
#include "util/Assert.hpp"
|
||||||
#include "util/CoroutineGroup.hpp"
|
#include "util/CoroutineGroup.hpp"
|
||||||
|
#include "util/Profiler.hpp"
|
||||||
#include "util/Random.hpp"
|
#include "util/Random.hpp"
|
||||||
#include "util/ResponseExpirationCache.hpp"
|
#include "util/ResponseExpirationCache.hpp"
|
||||||
#include "util/log/Logger.hpp"
|
#include "util/log/Logger.hpp"
|
||||||
#include "util/newconfig/ArrayView.hpp"
|
#include "util/newconfig/ArrayView.hpp"
|
||||||
#include "util/newconfig/ConfigDefinition.hpp"
|
#include "util/newconfig/ConfigDefinition.hpp"
|
||||||
#include "util/newconfig/ObjectView.hpp"
|
#include "util/newconfig/ObjectView.hpp"
|
||||||
|
#include "util/prometheus/Label.hpp"
|
||||||
|
|
||||||
#include <boost/asio/io_context.hpp>
|
#include <boost/asio/io_context.hpp>
|
||||||
#include <boost/asio/spawn.hpp>
|
#include <boost/asio/spawn.hpp>
|
||||||
@@ -58,9 +60,14 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
using namespace util::config;
|
using namespace util::config;
|
||||||
|
using namespace util::prometheus;
|
||||||
|
|
||||||
namespace etl {
|
namespace etl {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
std::vector<std::int64_t> const kHISTOGRAM_BUCKETS{1, 2, 5, 10, 20, 50, 100, 200, 500, 700, 1000};
|
||||||
|
}
|
||||||
|
|
||||||
std::shared_ptr<etlng::LoadBalancerInterface>
|
std::shared_ptr<etlng::LoadBalancerInterface>
|
||||||
LoadBalancer::makeLoadBalancer(
|
LoadBalancer::makeLoadBalancer(
|
||||||
ClioConfigDefinition const& config,
|
ClioConfigDefinition const& config,
|
||||||
@@ -84,6 +91,27 @@ LoadBalancer::LoadBalancer(
|
|||||||
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
|
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
|
||||||
SourceFactory sourceFactory
|
SourceFactory sourceFactory
|
||||||
)
|
)
|
||||||
|
: forwardedDurationHistogram_(PrometheusService::histogramInt(
|
||||||
|
"lb_forwarded_duration_milliseconds_histogram",
|
||||||
|
Labels(),
|
||||||
|
kHISTOGRAM_BUCKETS,
|
||||||
|
"The duration of processing forwarded requests"
|
||||||
|
))
|
||||||
|
, forwardedRetryCounter_(PrometheusService::counterInt(
|
||||||
|
"lb_forwarded_retry_counter",
|
||||||
|
Labels(),
|
||||||
|
"The number of retries before a forwarded request was successful. Initial attempt excluded"
|
||||||
|
))
|
||||||
|
, cacheTriedCounter_(PrometheusService::counterInt(
|
||||||
|
"lb_cache_tried_counter",
|
||||||
|
Labels(),
|
||||||
|
"The number of requests that we tried to serve from the cache"
|
||||||
|
))
|
||||||
|
, cacheMissCounter_(PrometheusService::counterInt(
|
||||||
|
"lb_cache_miss_counter",
|
||||||
|
Labels(),
|
||||||
|
"The number of requests that were not served from the cache"
|
||||||
|
))
|
||||||
{
|
{
|
||||||
auto const forwardingCacheTimeout = config.get<float>("forwarding.cache_timeout");
|
auto const forwardingCacheTimeout = config.get<float>("forwarding.cache_timeout");
|
||||||
if (forwardingCacheTimeout > 0.f) {
|
if (forwardingCacheTimeout > 0.f) {
|
||||||
@@ -170,11 +198,6 @@ LoadBalancer::LoadBalancer(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LoadBalancer::~LoadBalancer()
|
|
||||||
{
|
|
||||||
sources_.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<std::string>
|
std::vector<std::string>
|
||||||
LoadBalancer::loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter)
|
LoadBalancer::loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter)
|
||||||
{
|
{
|
||||||
@@ -242,6 +265,8 @@ LoadBalancer::forwardToRippled(
|
|||||||
auto const cmd = boost::json::value_to<std::string>(request.at("command"));
|
auto const cmd = boost::json::value_to<std::string>(request.at("command"));
|
||||||
|
|
||||||
if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) {
|
if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) {
|
||||||
|
++cacheTriedCounter_.get();
|
||||||
|
|
||||||
auto updater =
|
auto updater =
|
||||||
[this, &request, &clientIp, isAdmin](boost::asio::yield_context yield
|
[this, &request, &clientIp, isAdmin](boost::asio::yield_context yield
|
||||||
) -> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
|
) -> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
|
||||||
@@ -369,6 +394,8 @@ LoadBalancer::forwardToRippledImpl(
|
|||||||
boost::asio::yield_context yield
|
boost::asio::yield_context yield
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
++cacheMissCounter_.get();
|
||||||
|
|
||||||
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
|
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
|
||||||
std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
|
std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
|
||||||
|
|
||||||
@@ -379,11 +406,15 @@ LoadBalancer::forwardToRippledImpl(
|
|||||||
std::optional<boost::json::object> response;
|
std::optional<boost::json::object> response;
|
||||||
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
|
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
|
||||||
while (numAttempts < sources_.size()) {
|
while (numAttempts < sources_.size()) {
|
||||||
auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield);
|
auto [res, duration] =
|
||||||
|
util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); });
|
||||||
|
forwardedDurationHistogram_.get().observe(duration);
|
||||||
|
|
||||||
if (res) {
|
if (res) {
|
||||||
response = std::move(res).value();
|
response = std::move(res).value();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
++forwardedRetryCounter_.get();
|
||||||
error = std::max(error, res.error()); // Choose the best result between all sources
|
error = std::max(error, res.error()); // Choose the best result between all sources
|
||||||
|
|
||||||
sourceIdx = (sourceIdx + 1) % sources_.size();
|
sourceIdx = (sourceIdx + 1) % sources_.size();
|
||||||
|
|||||||
@@ -32,6 +32,8 @@
|
|||||||
#include "util/ResponseExpirationCache.hpp"
|
#include "util/ResponseExpirationCache.hpp"
|
||||||
#include "util/log/Logger.hpp"
|
#include "util/log/Logger.hpp"
|
||||||
#include "util/newconfig/ConfigDefinition.hpp"
|
#include "util/newconfig/ConfigDefinition.hpp"
|
||||||
|
#include "util/prometheus/Counter.hpp"
|
||||||
|
#include "util/prometheus/Histogram.hpp"
|
||||||
|
|
||||||
#include <boost/asio.hpp>
|
#include <boost/asio.hpp>
|
||||||
#include <boost/asio/io_context.hpp>
|
#include <boost/asio/io_context.hpp>
|
||||||
@@ -92,7 +94,12 @@ private:
|
|||||||
std::uint32_t downloadRanges_ =
|
std::uint32_t downloadRanges_ =
|
||||||
kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
|
kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
|
||||||
|
|
||||||
// Using mutext instead of atomic_bool because choosing a new source to
|
std::reference_wrapper<util::prometheus::HistogramInt> forwardedDurationHistogram_;
|
||||||
|
std::reference_wrapper<util::prometheus::CounterInt> forwardedRetryCounter_;
|
||||||
|
std::reference_wrapper<util::prometheus::CounterInt> cacheTriedCounter_;
|
||||||
|
std::reference_wrapper<util::prometheus::CounterInt> cacheMissCounter_;
|
||||||
|
|
||||||
|
// Using mutex instead of atomic_bool because choosing a new source to
|
||||||
// forward messages should be done with a mutual exclusion otherwise there will be a race condition
|
// forward messages should be done with a mutual exclusion otherwise there will be a race condition
|
||||||
util::Mutex<bool> hasForwardingSource_{false};
|
util::Mutex<bool> hasForwardingSource_{false};
|
||||||
|
|
||||||
@@ -147,8 +154,6 @@ public:
|
|||||||
SourceFactory sourceFactory = makeSource
|
SourceFactory sourceFactory = makeSource
|
||||||
);
|
);
|
||||||
|
|
||||||
~LoadBalancer() override;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Load the initial ledger, writing data to the queue.
|
* @brief Load the initial ledger, writing data to the queue.
|
||||||
* @note This function will retry indefinitely until the ledger is downloaded.
|
* @note This function will retry indefinitely until the ledger is downloaded.
|
||||||
|
|||||||
@@ -267,7 +267,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been
|
* @brief Get the sequence of the last scheduled ledger to publish, Be aware that the ledger may not have been
|
||||||
* published to network
|
* published to network
|
||||||
*/
|
*/
|
||||||
std::optional<uint32_t>
|
std::optional<uint32_t>
|
||||||
|
|||||||
Reference in New Issue
Block a user