mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-20 19:56:00 +00:00
feat: Implement load balancer metrics in etlng (#2119)
This commit is contained in:
@@ -29,12 +29,15 @@
|
||||
#include "rpc/Errors.hpp"
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/CoroutineGroup.hpp"
|
||||
#include "util/Profiler.hpp"
|
||||
#include "util/Random.hpp"
|
||||
#include "util/ResponseExpirationCache.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/newconfig/ArrayView.hpp"
|
||||
#include "util/newconfig/ConfigDefinition.hpp"
|
||||
#include "util/newconfig/ObjectView.hpp"
|
||||
#include "util/prometheus/Label.hpp"
|
||||
#include "util/prometheus/Prometheus.hpp"
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
@@ -59,6 +62,7 @@
|
||||
#include <vector>
|
||||
|
||||
using namespace util::config;
|
||||
using util::prometheus::Labels;
|
||||
|
||||
namespace etlng {
|
||||
|
||||
@@ -85,6 +89,33 @@ LoadBalancer::LoadBalancer(
|
||||
std::shared_ptr<etl::NetworkValidatedLedgersInterface> validatedLedgers,
|
||||
SourceFactory sourceFactory
|
||||
)
|
||||
: forwardingCounters_{
|
||||
.successDuration = PrometheusService::counterInt(
|
||||
"forwarding_duration_milliseconds_counter",
|
||||
Labels({util::prometheus::Label{"status", "success"}}),
|
||||
"The duration of processing successful forwarded requests"
|
||||
),
|
||||
.failDuration = PrometheusService::counterInt(
|
||||
"forwarding_duration_milliseconds_counter",
|
||||
Labels({util::prometheus::Label{"status", "fail"}}),
|
||||
"The duration of processing failed forwarded requests"
|
||||
),
|
||||
.retries = PrometheusService::counterInt(
|
||||
"forwarding_retries_counter",
|
||||
Labels(),
|
||||
"The number of retries before a forwarded request was successful. Initial attempt excluded"
|
||||
),
|
||||
.cacheHit = PrometheusService::counterInt(
|
||||
"forwarding_cache_hit_counter",
|
||||
Labels(),
|
||||
"The number of requests that we served from the cache"
|
||||
),
|
||||
.cacheMiss = PrometheusService::counterInt(
|
||||
"forwarding_cache_miss_counter",
|
||||
Labels(),
|
||||
"The number of requests that were not served from the cache"
|
||||
)
|
||||
}
|
||||
{
|
||||
auto const forwardingCacheTimeout = config.get<float>("forwarding.cache_timeout");
|
||||
if (forwardingCacheTimeout > 0.f) {
|
||||
@@ -170,11 +201,6 @@ LoadBalancer::LoadBalancer(
|
||||
}
|
||||
}
|
||||
|
||||
LoadBalancer::~LoadBalancer()
|
||||
{
|
||||
sources_.clear();
|
||||
}
|
||||
|
||||
std::vector<std::string>
|
||||
LoadBalancer::loadInitialLedger(
|
||||
uint32_t sequence,
|
||||
@@ -246,9 +272,11 @@ LoadBalancer::forwardToRippled(
|
||||
auto const cmd = boost::json::value_to<std::string>(request.at("command"));
|
||||
|
||||
if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) {
|
||||
bool servedFromCache = true;
|
||||
auto updater =
|
||||
[this, &request, &clientIp, isAdmin](boost::asio::yield_context yield
|
||||
[this, &request, &clientIp, &servedFromCache, isAdmin](boost::asio::yield_context yield
|
||||
) -> std::expected<util::ResponseExpirationCache::EntryData, util::ResponseExpirationCache::Error> {
|
||||
servedFromCache = false;
|
||||
auto result = forwardToRippledImpl(request, clientIp, isAdmin, yield);
|
||||
if (result.has_value()) {
|
||||
return util::ResponseExpirationCache::EntryData{
|
||||
@@ -266,6 +294,9 @@ LoadBalancer::forwardToRippled(
|
||||
std::move(updater),
|
||||
[](util::ResponseExpirationCache::EntryData const& entry) { return not entry.response.contains("error"); }
|
||||
);
|
||||
if (servedFromCache) {
|
||||
++forwardingCounters_.cacheHit.get();
|
||||
}
|
||||
if (result.has_value()) {
|
||||
return std::move(result).value();
|
||||
}
|
||||
@@ -373,6 +404,8 @@ LoadBalancer::forwardToRippledImpl(
|
||||
boost::asio::yield_context yield
|
||||
)
|
||||
{
|
||||
++forwardingCounters_.cacheMiss.get();
|
||||
|
||||
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
|
||||
std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
|
||||
|
||||
@@ -383,11 +416,16 @@ LoadBalancer::forwardToRippledImpl(
|
||||
std::optional<boost::json::object> response;
|
||||
rpc::ClioError error = rpc::ClioError::EtlConnectionError;
|
||||
while (numAttempts < sources_.size()) {
|
||||
auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield);
|
||||
auto [res, duration] =
|
||||
util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); });
|
||||
|
||||
if (res) {
|
||||
forwardingCounters_.successDuration.get() += duration;
|
||||
response = std::move(res).value();
|
||||
break;
|
||||
}
|
||||
forwardingCounters_.failDuration.get() += duration;
|
||||
++forwardingCounters_.retries.get();
|
||||
error = std::max(error, res.error()); // Choose the best result between all sources
|
||||
|
||||
sourceIdx = (sourceIdx + 1) % sources_.size();
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
#include "util/ResponseExpirationCache.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/newconfig/ConfigDefinition.hpp"
|
||||
#include "util/prometheus/Counter.hpp"
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
@@ -92,6 +93,14 @@ private:
|
||||
std::uint32_t downloadRanges_ =
|
||||
kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
|
||||
|
||||
struct ForwardingCounters {
|
||||
std::reference_wrapper<util::prometheus::CounterInt> successDuration;
|
||||
std::reference_wrapper<util::prometheus::CounterInt> failDuration;
|
||||
std::reference_wrapper<util::prometheus::CounterInt> retries;
|
||||
std::reference_wrapper<util::prometheus::CounterInt> cacheHit;
|
||||
std::reference_wrapper<util::prometheus::CounterInt> cacheMiss;
|
||||
} forwardingCounters_;
|
||||
|
||||
// Using mutext instead of atomic_bool because choosing a new source to
|
||||
// forward messages should be done with a mutual exclusion otherwise there will be a race condition
|
||||
util::Mutex<bool> hasForwardingSource_{false};
|
||||
@@ -147,8 +156,6 @@ public:
|
||||
SourceFactory sourceFactory = makeSource
|
||||
);
|
||||
|
||||
~LoadBalancer() override;
|
||||
|
||||
/**
|
||||
* @brief Load the initial ledger, writing data to the queue.
|
||||
* @note This function will retry indefinitely until the ledger is downloaded.
|
||||
|
||||
@@ -61,6 +61,7 @@
|
||||
using namespace etlng;
|
||||
using namespace util::config;
|
||||
using testing::Return;
|
||||
using namespace util::prometheus;
|
||||
|
||||
namespace {
|
||||
|
||||
@@ -668,6 +669,71 @@ TEST_F(LoadBalancerForwardToRippledNgTests, source0Fails)
|
||||
});
|
||||
}
|
||||
|
||||
struct LoadBalancerForwardToRippledPrometheusNgTests : LoadBalancerForwardToRippledNgTests, WithMockPrometheus {};
|
||||
|
||||
TEST_F(LoadBalancerForwardToRippledPrometheusNgTests, forwardingCacheEnabled)
|
||||
{
|
||||
configJson_.as_object()["forwarding"] = boost::json::object{{"cache_timeout", 10.}};
|
||||
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
|
||||
auto loadBalancer = makeLoadBalancer();
|
||||
|
||||
auto const request = boost::json::object{{"command", "server_info"}};
|
||||
|
||||
auto& cacheHitCounter = makeMock<CounterInt>("forwarding_cache_hit_counter", "");
|
||||
auto& cacheMissCounter = makeMock<CounterInt>("forwarding_cache_miss_counter", "");
|
||||
auto& successDurationCounter =
|
||||
makeMock<CounterInt>("forwarding_duration_milliseconds_counter", "{status=\"success\"}");
|
||||
|
||||
EXPECT_CALL(cacheMissCounter, add(1));
|
||||
EXPECT_CALL(cacheHitCounter, add(1)).Times(3);
|
||||
EXPECT_CALL(successDurationCounter, add(testing::_));
|
||||
|
||||
EXPECT_CALL(
|
||||
sourceFactory_.sourceAt(0),
|
||||
forwardToRippled(request, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_)
|
||||
)
|
||||
.WillOnce(Return(response_));
|
||||
|
||||
runSpawn([&](boost::asio::yield_context yield) {
|
||||
EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
|
||||
EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
|
||||
EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
|
||||
EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(LoadBalancerForwardToRippledPrometheusNgTests, source0Fails)
|
||||
{
|
||||
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
|
||||
auto loadBalancer = makeLoadBalancer();
|
||||
|
||||
auto& cacheMissCounter = makeMock<CounterInt>("forwarding_cache_miss_counter", "");
|
||||
auto& retriesCounter = makeMock<CounterInt>("forwarding_retries_counter", "");
|
||||
auto& successDurationCounter =
|
||||
makeMock<CounterInt>("forwarding_duration_milliseconds_counter", "{status=\"success\"}");
|
||||
auto& failDurationCounter = makeMock<CounterInt>("forwarding_duration_milliseconds_counter", "{status=\"fail\"}");
|
||||
|
||||
EXPECT_CALL(cacheMissCounter, add(1));
|
||||
EXPECT_CALL(retriesCounter, add(1));
|
||||
EXPECT_CALL(successDurationCounter, add(testing::_));
|
||||
EXPECT_CALL(failDurationCounter, add(testing::_));
|
||||
|
||||
EXPECT_CALL(
|
||||
sourceFactory_.sourceAt(0),
|
||||
forwardToRippled(request_, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_)
|
||||
)
|
||||
.WillOnce(Return(std::unexpected{rpc::ClioError::EtlConnectionError}));
|
||||
EXPECT_CALL(
|
||||
sourceFactory_.sourceAt(1),
|
||||
forwardToRippled(request_, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_)
|
||||
)
|
||||
.WillOnce(Return(response_));
|
||||
|
||||
runSpawn([&](boost::asio::yield_context yield) {
|
||||
EXPECT_EQ(loadBalancer->forwardToRippled(request_, clientIP_, false, yield), response_);
|
||||
});
|
||||
}
|
||||
|
||||
struct LoadBalancerForwardToRippledErrorNgTestBundle {
|
||||
std::string testName;
|
||||
rpc::ClioError firstSourceError;
|
||||
|
||||
Reference in New Issue
Block a user