diff --git a/src/etlng/LoadBalancer.cpp b/src/etlng/LoadBalancer.cpp index 7acbcd14..2bddb158 100644 --- a/src/etlng/LoadBalancer.cpp +++ b/src/etlng/LoadBalancer.cpp @@ -29,12 +29,15 @@ #include "rpc/Errors.hpp" #include "util/Assert.hpp" #include "util/CoroutineGroup.hpp" +#include "util/Profiler.hpp" #include "util/Random.hpp" #include "util/ResponseExpirationCache.hpp" #include "util/log/Logger.hpp" #include "util/newconfig/ArrayView.hpp" #include "util/newconfig/ConfigDefinition.hpp" #include "util/newconfig/ObjectView.hpp" +#include "util/prometheus/Label.hpp" +#include "util/prometheus/Prometheus.hpp" #include #include @@ -59,6 +62,7 @@ #include using namespace util::config; +using util::prometheus::Labels; namespace etlng { @@ -85,6 +89,33 @@ LoadBalancer::LoadBalancer( std::shared_ptr validatedLedgers, SourceFactory sourceFactory ) + : forwardingCounters_{ + .successDuration = PrometheusService::counterInt( + "forwarding_duration_milliseconds_counter", + Labels({util::prometheus::Label{"status", "success"}}), + "The duration of processing successful forwarded requests" + ), + .failDuration = PrometheusService::counterInt( + "forwarding_duration_milliseconds_counter", + Labels({util::prometheus::Label{"status", "fail"}}), + "The duration of processing failed forwarded requests" + ), + .retries = PrometheusService::counterInt( + "forwarding_retries_counter", + Labels(), + "The number of retries before a forwarded request was successful. Initial attempt excluded" + ), + .cacheHit = PrometheusService::counterInt( + "forwarding_cache_hit_counter", + Labels(), + "The number of requests that we served from the cache" + ), + .cacheMiss = PrometheusService::counterInt( + "forwarding_cache_miss_counter", + Labels(), + "The number of requests that were not served from the cache" + ) + } { auto const forwardingCacheTimeout = config.get("forwarding.cache_timeout"); if (forwardingCacheTimeout > 0.f) { @@ -170,11 +201,6 @@ LoadBalancer::LoadBalancer( } } -LoadBalancer::~LoadBalancer() -{ - sources_.clear(); -} - std::vector LoadBalancer::loadInitialLedger( uint32_t sequence, @@ -246,9 +272,11 @@ LoadBalancer::forwardToRippled( auto const cmd = boost::json::value_to(request.at("command")); if (forwardingCache_ and forwardingCache_->shouldCache(cmd)) { + bool servedFromCache = true; auto updater = - [this, &request, &clientIp, isAdmin](boost::asio::yield_context yield + [this, &request, &clientIp, &servedFromCache, isAdmin](boost::asio::yield_context yield ) -> std::expected { + servedFromCache = false; auto result = forwardToRippledImpl(request, clientIp, isAdmin, yield); if (result.has_value()) { return util::ResponseExpirationCache::EntryData{ @@ -266,6 +294,9 @@ LoadBalancer::forwardToRippled( std::move(updater), [](util::ResponseExpirationCache::EntryData const& entry) { return not entry.response.contains("error"); } ); + if (servedFromCache) { + ++forwardingCounters_.cacheHit.get(); + } if (result.has_value()) { return std::move(result).value(); } @@ -373,6 +404,8 @@ LoadBalancer::forwardToRippledImpl( boost::asio::yield_context yield ) { + ++forwardingCounters_.cacheMiss.get(); + ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); @@ -383,11 +416,16 @@ LoadBalancer::forwardToRippledImpl( std::optional response; rpc::ClioError error = rpc::ClioError::EtlConnectionError; while (numAttempts < sources_.size()) { - auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); + auto [res, duration] = + util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); }); + if (res) { + forwardingCounters_.successDuration.get() += duration; response = std::move(res).value(); break; } + forwardingCounters_.failDuration.get() += duration; + ++forwardingCounters_.retries.get(); error = std::max(error, res.error()); // Choose the best result between all sources sourceIdx = (sourceIdx + 1) % sources_.size(); diff --git a/src/etlng/LoadBalancer.hpp b/src/etlng/LoadBalancer.hpp index fc84ad54..3d95ac68 100644 --- a/src/etlng/LoadBalancer.hpp +++ b/src/etlng/LoadBalancer.hpp @@ -32,6 +32,7 @@ #include "util/ResponseExpirationCache.hpp" #include "util/log/Logger.hpp" #include "util/newconfig/ConfigDefinition.hpp" +#include "util/prometheus/Counter.hpp" #include #include @@ -92,6 +93,14 @@ private: std::uint32_t downloadRanges_ = kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */ + struct ForwardingCounters { + std::reference_wrapper successDuration; + std::reference_wrapper failDuration; + std::reference_wrapper retries; + std::reference_wrapper cacheHit; + std::reference_wrapper cacheMiss; + } forwardingCounters_; + // Using mutext instead of atomic_bool because choosing a new source to // forward messages should be done with a mutual exclusion otherwise there will be a race condition util::Mutex hasForwardingSource_{false}; @@ -147,8 +156,6 @@ public: SourceFactory sourceFactory = makeSource ); - ~LoadBalancer() override; - /** * @brief Load the initial ledger, writing data to the queue. * @note This function will retry indefinitely until the ledger is downloaded. diff --git a/tests/unit/etlng/LoadBalancerTests.cpp b/tests/unit/etlng/LoadBalancerTests.cpp index 4ddfc7e3..2b31b1cd 100644 --- a/tests/unit/etlng/LoadBalancerTests.cpp +++ b/tests/unit/etlng/LoadBalancerTests.cpp @@ -61,6 +61,7 @@ using namespace etlng; using namespace util::config; using testing::Return; +using namespace util::prometheus; namespace { @@ -668,6 +669,71 @@ TEST_F(LoadBalancerForwardToRippledNgTests, source0Fails) }); } +struct LoadBalancerForwardToRippledPrometheusNgTests : LoadBalancerForwardToRippledNgTests, WithMockPrometheus {}; + +TEST_F(LoadBalancerForwardToRippledPrometheusNgTests, forwardingCacheEnabled) +{ + configJson_.as_object()["forwarding"] = boost::json::object{{"cache_timeout", 10.}}; + EXPECT_CALL(sourceFactory_, makeSource).Times(2); + auto loadBalancer = makeLoadBalancer(); + + auto const request = boost::json::object{{"command", "server_info"}}; + + auto& cacheHitCounter = makeMock("forwarding_cache_hit_counter", ""); + auto& cacheMissCounter = makeMock("forwarding_cache_miss_counter", ""); + auto& successDurationCounter = + makeMock("forwarding_duration_milliseconds_counter", "{status=\"success\"}"); + + EXPECT_CALL(cacheMissCounter, add(1)); + EXPECT_CALL(cacheHitCounter, add(1)).Times(3); + EXPECT_CALL(successDurationCounter, add(testing::_)); + + EXPECT_CALL( + sourceFactory_.sourceAt(0), + forwardToRippled(request, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_) + ) + .WillOnce(Return(response_)); + + runSpawn([&](boost::asio::yield_context yield) { + EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_); + EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_); + EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_); + EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_); + }); +} + +TEST_F(LoadBalancerForwardToRippledPrometheusNgTests, source0Fails) +{ + EXPECT_CALL(sourceFactory_, makeSource).Times(2); + auto loadBalancer = makeLoadBalancer(); + + auto& cacheMissCounter = makeMock("forwarding_cache_miss_counter", ""); + auto& retriesCounter = makeMock("forwarding_retries_counter", ""); + auto& successDurationCounter = + makeMock("forwarding_duration_milliseconds_counter", "{status=\"success\"}"); + auto& failDurationCounter = makeMock("forwarding_duration_milliseconds_counter", "{status=\"fail\"}"); + + EXPECT_CALL(cacheMissCounter, add(1)); + EXPECT_CALL(retriesCounter, add(1)); + EXPECT_CALL(successDurationCounter, add(testing::_)); + EXPECT_CALL(failDurationCounter, add(testing::_)); + + EXPECT_CALL( + sourceFactory_.sourceAt(0), + forwardToRippled(request_, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_) + ) + .WillOnce(Return(std::unexpected{rpc::ClioError::EtlConnectionError})); + EXPECT_CALL( + sourceFactory_.sourceAt(1), + forwardToRippled(request_, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_) + ) + .WillOnce(Return(response_)); + + runSpawn([&](boost::asio::yield_context yield) { + EXPECT_EQ(loadBalancer->forwardToRippled(request_, clientIP_, false, yield), response_); + }); +} + struct LoadBalancerForwardToRippledErrorNgTestBundle { std::string testName; rpc::ClioError firstSourceError;