From 7eff1e6a9e3487210219b57f94f72ee14b549446 Mon Sep 17 00:00:00 2001 From: Sergey Kuznetsov Date: Thu, 15 May 2025 12:34:43 +0100 Subject: [PATCH] feat: Add forwarding metrics --- src/etl/LoadBalancer.cpp | 40 ++++++++++++++++- src/etl/LoadBalancer.hpp | 11 +++++ tests/unit/etl/LoadBalancerTests.cpp | 67 ++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 1 deletion(-) diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index fa0010a0b..cc8367cff 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -28,12 +28,15 @@ #include "rpc/Errors.hpp" #include "util/Assert.hpp" #include "util/CoroutineGroup.hpp" +#include "util/Profiler.hpp" #include "util/Random.hpp" #include "util/ResponseExpirationCache.hpp" #include "util/log/Logger.hpp" #include "util/newconfig/ArrayView.hpp" #include "util/newconfig/ConfigDefinition.hpp" #include "util/newconfig/ObjectView.hpp" +#include "util/prometheus/Label.hpp" +#include "util/prometheus/Prometheus.hpp" #include #include @@ -57,6 +60,7 @@ #include using namespace util::config; +using util::prometheus::Labels; namespace etl { @@ -83,6 +87,34 @@ LoadBalancer::LoadBalancer( std::shared_ptr validatedLedgers, SourceFactory sourceFactory ) + : forwardingCounters_{ + .successDuration = PrometheusService::counterInt( + "forwarding_duration_milliseconds_counter", + Labels({util::prometheus::Label{"status", "success"}}), + "The duration of processing successful forwarded requests" + ), + .failDuration = PrometheusService::counterInt( + "forwarding_duration_milliseconds_counter", + Labels({util::prometheus::Label{"status", "fail"}}), + "The duration of processing failed forwarded requests" + ), + .retries = PrometheusService::counterInt( + "forwarding_retries_counter", + Labels(), + "The number of retries before a forwarded request was successful. Initial attempt excluded" + ), + .cacheHit = PrometheusService::counterInt( + "forwarding_cache_hit_counter", + Labels(), + "The number of requests that we served from the cache" + ), + .cacheMiss = PrometheusService::counterInt( + "forwarding_cache_miss_counter", + Labels(), + "The number of requests that were not served from the cache" + ) + } + { auto const forwardingCacheTimeout = config.get("forwarding.cache_timeout"); if (forwardingCacheTimeout > 0.f) { @@ -241,9 +273,11 @@ LoadBalancer::forwardToRippled( auto const cmd = boost::json::value_to(request.at("command")); if (forwardingCache_) { if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) { + ++forwardingCounters_.cacheHit.get(); return std::move(cachedResponse).value(); } } + ++forwardingCounters_.cacheMiss.get(); ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); @@ -255,11 +289,15 @@ LoadBalancer::forwardToRippled( std::optional response; rpc::ClioError error = rpc::ClioError::EtlConnectionError; while (numAttempts < sources_.size()) { - auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); + auto [res, duration] = + util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); }); if (res) { + forwardingCounters_.successDuration.get() += duration; response = std::move(res).value(); break; } + forwardingCounters_.failDuration.get() += duration; + ++forwardingCounters_.retries.get(); error = std::max(error, res.error()); // Choose the best result between all sources sourceIdx = (sourceIdx + 1) % sources_.size(); diff --git a/src/etl/LoadBalancer.hpp b/src/etl/LoadBalancer.hpp index 169636685..87c4fb870 100644 --- a/src/etl/LoadBalancer.hpp +++ b/src/etl/LoadBalancer.hpp @@ -32,6 +32,7 @@ #include "util/ResponseExpirationCache.hpp" #include "util/log/Logger.hpp" #include "util/newconfig/ConfigDefinition.hpp" +#include "util/prometheus/Counter.hpp" #include #include @@ -44,8 +45,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -91,6 +94,14 @@ private: std::uint32_t downloadRanges_ = kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */ + struct ForwardingCounters { + std::reference_wrapper successDuration; + std::reference_wrapper failDuration; + std::reference_wrapper retries; + std::reference_wrapper cacheHit; + std::reference_wrapper cacheMiss; + } forwardingCounters_; + // Using mutext instead of atomic_bool because choosing a new source to // forward messages should be done with a mutual exclusion otherwise there will be a race condition util::Mutex hasForwardingSource_{false}; diff --git a/tests/unit/etl/LoadBalancerTests.cpp b/tests/unit/etl/LoadBalancerTests.cpp index 95d0216a0..5829b56e6 100644 --- a/tests/unit/etl/LoadBalancerTests.cpp +++ b/tests/unit/etl/LoadBalancerTests.cpp @@ -34,6 +34,7 @@ #include "util/newconfig/ConfigFileJson.hpp" #include "util/newconfig/ConfigValue.hpp" #include "util/newconfig/Types.hpp" +#include "util/prometheus/Counter.hpp" #include #include @@ -59,6 +60,7 @@ using namespace etl; using namespace util::config; using testing::Return; +using namespace util::prometheus; constexpr static auto const kTWO_SOURCES_LEDGER_RESPONSE = R"({ "etl_sources": [ @@ -641,6 +643,71 @@ TEST_F(LoadBalancerForwardToRippledTests, source0Fails) }); } +struct LoadBalancerForwardToRippledPrometheusTests : LoadBalancerForwardToRippledTests, WithMockPrometheus {}; + +TEST_F(LoadBalancerForwardToRippledPrometheusTests, forwardingCacheEnabled) +{ + configJson_.as_object()["forwarding"] = boost::json::object{{"cache_timeout", 10.}}; + EXPECT_CALL(sourceFactory_, makeSource).Times(2); + auto loadBalancer = makeLoadBalancer(); + + auto const request = boost::json::object{{"command", "server_info"}}; + + auto& cacheHitCounter = makeMock("forwarding_cache_hit_counter", ""); + auto& cacheMissCounter = makeMock("forwarding_cache_miss_counter", ""); + auto& successDurationCounter = + makeMock("forwarding_duration_milliseconds_counter", "{status=\"success\"}"); + + EXPECT_CALL(cacheMissCounter, add(1)); + EXPECT_CALL(cacheHitCounter, add(1)).Times(3); + EXPECT_CALL(successDurationCounter, add(testing::_)); + + EXPECT_CALL( + sourceFactory_.sourceAt(0), + forwardToRippled(request, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_) + ) + .WillOnce(Return(response_)); + + runSpawn([&](boost::asio::yield_context yield) { + EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_); + EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_); + EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_); + EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_); + }); +} + +TEST_F(LoadBalancerForwardToRippledPrometheusTests, source0Fails) +{ + EXPECT_CALL(sourceFactory_, makeSource).Times(2); + auto loadBalancer = makeLoadBalancer(); + + auto& cacheMissCounter = makeMock("forwarding_cache_miss_counter", ""); + auto& retriesCounter = makeMock("forwarding_retries_counter", ""); + auto& successDurationCounter = + makeMock("forwarding_duration_milliseconds_counter", "{status=\"success\"}"); + auto& failDurationCounter = makeMock("forwarding_duration_milliseconds_counter", "{status=\"fail\"}"); + + EXPECT_CALL(cacheMissCounter, add(1)); + EXPECT_CALL(retriesCounter, add(1)); + EXPECT_CALL(successDurationCounter, add(testing::_)); + EXPECT_CALL(failDurationCounter, add(testing::_)); + + EXPECT_CALL( + sourceFactory_.sourceAt(0), + forwardToRippled(request_, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_) + ) + .WillOnce(Return(std::unexpected{rpc::ClioError::EtlConnectionError})); + EXPECT_CALL( + sourceFactory_.sourceAt(1), + forwardToRippled(request_, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_) + ) + .WillOnce(Return(response_)); + + runSpawn([&](boost::asio::yield_context yield) { + EXPECT_EQ(loadBalancer->forwardToRippled(request_, clientIP_, false, yield), response_); + }); +} + struct LoadBalancerForwardToRippledErrorTestBundle { std::string testName; rpc::ClioError firstSourceError;