mirror of
				https://github.com/XRPLF/clio.git
				synced 2025-11-04 11:55:51 +00:00 
			
		
		
		
	feat: Forwarding metrics (#2128)
Port of #2096 and #2103 into `release/2.4.1`.
This commit is contained in:
		@@ -28,12 +28,15 @@
 | 
			
		||||
#include "rpc/Errors.hpp"
 | 
			
		||||
#include "util/Assert.hpp"
 | 
			
		||||
#include "util/CoroutineGroup.hpp"
 | 
			
		||||
#include "util/Profiler.hpp"
 | 
			
		||||
#include "util/Random.hpp"
 | 
			
		||||
#include "util/ResponseExpirationCache.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/newconfig/ArrayView.hpp"
 | 
			
		||||
#include "util/newconfig/ConfigDefinition.hpp"
 | 
			
		||||
#include "util/newconfig/ObjectView.hpp"
 | 
			
		||||
#include "util/prometheus/Label.hpp"
 | 
			
		||||
#include "util/prometheus/Prometheus.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio/io_context.hpp>
 | 
			
		||||
#include <boost/asio/spawn.hpp>
 | 
			
		||||
@@ -57,6 +60,7 @@
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
using namespace util::config;
 | 
			
		||||
using util::prometheus::Labels;
 | 
			
		||||
 | 
			
		||||
namespace etl {
 | 
			
		||||
 | 
			
		||||
@@ -83,6 +87,34 @@ LoadBalancer::LoadBalancer(
 | 
			
		||||
    std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
 | 
			
		||||
    SourceFactory sourceFactory
 | 
			
		||||
)
 | 
			
		||||
    : forwardingCounters_{
 | 
			
		||||
          .successDuration = PrometheusService::counterInt(
 | 
			
		||||
              "forwarding_duration_milliseconds_counter",
 | 
			
		||||
              Labels({util::prometheus::Label{"status", "success"}}),
 | 
			
		||||
              "The duration of processing successful forwarded requests"
 | 
			
		||||
          ),
 | 
			
		||||
          .failDuration = PrometheusService::counterInt(
 | 
			
		||||
              "forwarding_duration_milliseconds_counter",
 | 
			
		||||
              Labels({util::prometheus::Label{"status", "fail"}}),
 | 
			
		||||
              "The duration of processing failed forwarded requests"
 | 
			
		||||
          ),
 | 
			
		||||
          .retries = PrometheusService::counterInt(
 | 
			
		||||
              "forwarding_retries_counter",
 | 
			
		||||
              Labels(),
 | 
			
		||||
              "The number of retries before a forwarded request was successful. Initial attempt excluded"
 | 
			
		||||
          ),
 | 
			
		||||
          .cacheHit = PrometheusService::counterInt(
 | 
			
		||||
              "forwarding_cache_hit_counter",
 | 
			
		||||
              Labels(),
 | 
			
		||||
              "The number of requests that we served from the cache"
 | 
			
		||||
          ),
 | 
			
		||||
          .cacheMiss = PrometheusService::counterInt(
 | 
			
		||||
              "forwarding_cache_miss_counter",
 | 
			
		||||
              Labels(),
 | 
			
		||||
              "The number of requests that were not served from the cache"
 | 
			
		||||
          )
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
{
 | 
			
		||||
    auto const forwardingCacheTimeout = config.get<float>("forwarding.cache_timeout");
 | 
			
		||||
    if (forwardingCacheTimeout > 0.f) {
 | 
			
		||||
@@ -241,9 +273,11 @@ LoadBalancer::forwardToRippled(
 | 
			
		||||
    auto const cmd = boost::json::value_to<std::string>(request.at("command"));
 | 
			
		||||
    if (forwardingCache_) {
 | 
			
		||||
        if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) {
 | 
			
		||||
            ++forwardingCounters_.cacheHit.get();
 | 
			
		||||
            return std::move(cachedResponse).value();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    ++forwardingCounters_.cacheMiss.get();
 | 
			
		||||
 | 
			
		||||
    ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
 | 
			
		||||
    std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
 | 
			
		||||
@@ -255,11 +289,15 @@ LoadBalancer::forwardToRippled(
 | 
			
		||||
    std::optional<boost::json::object> response;
 | 
			
		||||
    rpc::ClioError error = rpc::ClioError::EtlConnectionError;
 | 
			
		||||
    while (numAttempts < sources_.size()) {
 | 
			
		||||
        auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield);
 | 
			
		||||
        auto [res, duration] =
 | 
			
		||||
            util::timed([&]() { return sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); });
 | 
			
		||||
        if (res) {
 | 
			
		||||
            forwardingCounters_.successDuration.get() += duration;
 | 
			
		||||
            response = std::move(res).value();
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
        forwardingCounters_.failDuration.get() += duration;
 | 
			
		||||
        ++forwardingCounters_.retries.get();
 | 
			
		||||
        error = std::max(error, res.error());  // Choose the best result between all sources
 | 
			
		||||
 | 
			
		||||
        sourceIdx = (sourceIdx + 1) % sources_.size();
 | 
			
		||||
 
 | 
			
		||||
@@ -32,6 +32,7 @@
 | 
			
		||||
#include "util/ResponseExpirationCache.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/newconfig/ConfigDefinition.hpp"
 | 
			
		||||
#include "util/prometheus/Counter.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio.hpp>
 | 
			
		||||
#include <boost/asio/io_context.hpp>
 | 
			
		||||
@@ -44,8 +45,10 @@
 | 
			
		||||
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
 | 
			
		||||
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <concepts>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <expected>
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <optional>
 | 
			
		||||
#include <string>
 | 
			
		||||
@@ -91,6 +94,14 @@ private:
 | 
			
		||||
    std::uint32_t downloadRanges_ =
 | 
			
		||||
        kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
 | 
			
		||||
 | 
			
		||||
    struct ForwardingCounters {
 | 
			
		||||
        std::reference_wrapper<util::prometheus::CounterInt> successDuration;
 | 
			
		||||
        std::reference_wrapper<util::prometheus::CounterInt> failDuration;
 | 
			
		||||
        std::reference_wrapper<util::prometheus::CounterInt> retries;
 | 
			
		||||
        std::reference_wrapper<util::prometheus::CounterInt> cacheHit;
 | 
			
		||||
        std::reference_wrapper<util::prometheus::CounterInt> cacheMiss;
 | 
			
		||||
    } forwardingCounters_;
 | 
			
		||||
 | 
			
		||||
    // Using mutext instead of atomic_bool because choosing a new source to
 | 
			
		||||
    // forward messages should be done with a mutual exclusion otherwise there will be a race condition
 | 
			
		||||
    util::Mutex<bool> hasForwardingSource_{false};
 | 
			
		||||
 
 | 
			
		||||
@@ -34,6 +34,7 @@
 | 
			
		||||
#include "util/newconfig/ConfigFileJson.hpp"
 | 
			
		||||
#include "util/newconfig/ConfigValue.hpp"
 | 
			
		||||
#include "util/newconfig/Types.hpp"
 | 
			
		||||
#include "util/prometheus/Counter.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio/io_context.hpp>
 | 
			
		||||
#include <boost/asio/spawn.hpp>
 | 
			
		||||
@@ -59,6 +60,7 @@
 | 
			
		||||
using namespace etl;
 | 
			
		||||
using namespace util::config;
 | 
			
		||||
using testing::Return;
 | 
			
		||||
using namespace util::prometheus;
 | 
			
		||||
 | 
			
		||||
constexpr static auto const kTWO_SOURCES_LEDGER_RESPONSE = R"({
 | 
			
		||||
    "etl_sources": [
 | 
			
		||||
@@ -641,6 +643,71 @@ TEST_F(LoadBalancerForwardToRippledTests, source0Fails)
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct LoadBalancerForwardToRippledPrometheusTests : LoadBalancerForwardToRippledTests, WithMockPrometheus {};
 | 
			
		||||
 | 
			
		||||
TEST_F(LoadBalancerForwardToRippledPrometheusTests, forwardingCacheEnabled)
 | 
			
		||||
{
 | 
			
		||||
    configJson_.as_object()["forwarding"] = boost::json::object{{"cache_timeout", 10.}};
 | 
			
		||||
    EXPECT_CALL(sourceFactory_, makeSource).Times(2);
 | 
			
		||||
    auto loadBalancer = makeLoadBalancer();
 | 
			
		||||
 | 
			
		||||
    auto const request = boost::json::object{{"command", "server_info"}};
 | 
			
		||||
 | 
			
		||||
    auto& cacheHitCounter = makeMock<util::prometheus::CounterInt>("forwarding_cache_hit_counter", "");
 | 
			
		||||
    auto& cacheMissCounter = makeMock<CounterInt>("forwarding_cache_miss_counter", "");
 | 
			
		||||
    auto& successDurationCounter =
 | 
			
		||||
        makeMock<CounterInt>("forwarding_duration_milliseconds_counter", "{status=\"success\"}");
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(cacheMissCounter, add(1));
 | 
			
		||||
    EXPECT_CALL(cacheHitCounter, add(1)).Times(3);
 | 
			
		||||
    EXPECT_CALL(successDurationCounter, add(testing::_));
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(
 | 
			
		||||
        sourceFactory_.sourceAt(0),
 | 
			
		||||
        forwardToRippled(request, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_)
 | 
			
		||||
    )
 | 
			
		||||
        .WillOnce(Return(response_));
 | 
			
		||||
 | 
			
		||||
    runSpawn([&](boost::asio::yield_context yield) {
 | 
			
		||||
        EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
 | 
			
		||||
        EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
 | 
			
		||||
        EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
 | 
			
		||||
        EXPECT_EQ(loadBalancer->forwardToRippled(request, clientIP_, false, yield), response_);
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(LoadBalancerForwardToRippledPrometheusTests, source0Fails)
 | 
			
		||||
{
 | 
			
		||||
    EXPECT_CALL(sourceFactory_, makeSource).Times(2);
 | 
			
		||||
    auto loadBalancer = makeLoadBalancer();
 | 
			
		||||
 | 
			
		||||
    auto& cacheMissCounter = makeMock<CounterInt>("forwarding_cache_miss_counter", "");
 | 
			
		||||
    auto& retriesCounter = makeMock<CounterInt>("forwarding_retries_counter", "");
 | 
			
		||||
    auto& successDurationCounter =
 | 
			
		||||
        makeMock<CounterInt>("forwarding_duration_milliseconds_counter", "{status=\"success\"}");
 | 
			
		||||
    auto& failDurationCounter = makeMock<CounterInt>("forwarding_duration_milliseconds_counter", "{status=\"fail\"}");
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(cacheMissCounter, add(1));
 | 
			
		||||
    EXPECT_CALL(retriesCounter, add(1));
 | 
			
		||||
    EXPECT_CALL(successDurationCounter, add(testing::_));
 | 
			
		||||
    EXPECT_CALL(failDurationCounter, add(testing::_));
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(
 | 
			
		||||
        sourceFactory_.sourceAt(0),
 | 
			
		||||
        forwardToRippled(request_, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_)
 | 
			
		||||
    )
 | 
			
		||||
        .WillOnce(Return(std::unexpected{rpc::ClioError::EtlConnectionError}));
 | 
			
		||||
    EXPECT_CALL(
 | 
			
		||||
        sourceFactory_.sourceAt(1),
 | 
			
		||||
        forwardToRippled(request_, clientIP_, LoadBalancer::kUSER_FORWARDING_X_USER_VALUE, testing::_)
 | 
			
		||||
    )
 | 
			
		||||
        .WillOnce(Return(response_));
 | 
			
		||||
 | 
			
		||||
    runSpawn([&](boost::asio::yield_context yield) {
 | 
			
		||||
        EXPECT_EQ(loadBalancer->forwardToRippled(request_, clientIP_, false, yield), response_);
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct LoadBalancerForwardToRippledErrorTestBundle {
 | 
			
		||||
    std::string testName;
 | 
			
		||||
    rpc::ClioError firstSourceError;
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user