From 1b1a46c429102a7a17ab569b5c193afd9be5fda0 Mon Sep 17 00:00:00 2001 From: Ayaz Salikhov Date: Mon, 24 Nov 2025 16:17:45 +0000 Subject: [PATCH] feat: Handle prometheus requests in WorkQueue (#2790) --- src/app/ClioApplication.cpp | 2 +- src/app/WebHandlers.cpp | 52 +++++++++++++++++++++++------ src/app/WebHandlers.hpp | 8 +++-- tests/unit/app/WebHandlersTests.cpp | 5 ++- 4 files changed, 52 insertions(+), 15 deletions(-) diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index 23cc4dff..061b0814 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -182,7 +182,7 @@ ClioApplication::run(bool const useNgWebServer) return EXIT_FAILURE; } - httpServer->onGet("/metrics", MetricsHandler{adminVerifier}); + httpServer->onGet("/metrics", MetricsHandler{adminVerifier, workQueue}); httpServer->onGet("/health", HealthCheckHandler{}); httpServer->onGet("/cache_state", CacheStateHandler{cache}); auto requestHandler = RequestHandler{adminVerifier, handler}; diff --git a/src/app/WebHandlers.cpp b/src/app/WebHandlers.cpp index b0d99385..a84fd97b 100644 --- a/src/app/WebHandlers.cpp +++ b/src/app/WebHandlers.cpp @@ -19,7 +19,10 @@ #include "app/WebHandlers.hpp" +#include "rpc/Errors.hpp" +#include "rpc/WorkQueue.hpp" #include "util/Assert.hpp" +#include "util/CoroutineGroup.hpp" #include "util/prometheus/Http.hpp" #include "web/AdminVerificationStrategy.hpp" #include "web/SubscriptionContextInterface.hpp" @@ -31,6 +34,7 @@ #include #include +#include #include #include #include @@ -76,8 +80,8 @@ DisconnectHook::operator()(web::ng::Connection const& connection) dosguard_.get().decrement(connection.ip()); } -MetricsHandler::MetricsHandler(std::shared_ptr adminVerifier) - : adminVerifier_{std::move(adminVerifier)} +MetricsHandler::MetricsHandler(std::shared_ptr adminVerifier, rpc::WorkQueue& workQueue) + : adminVerifier_{std::move(adminVerifier)}, workQueue_{std::ref(workQueue)} { } @@ -86,19 +90,45 @@ MetricsHandler::operator()( web::ng::Request const& request, web::ng::ConnectionMetadata& connectionMetadata, web::SubscriptionContextPtr, - boost::asio::yield_context + boost::asio::yield_context yield ) { - auto const maybeHttpRequest = request.asHttpRequest(); - ASSERT(maybeHttpRequest.has_value(), "Got not a http request in Get"); - auto const& httpRequest = maybeHttpRequest->get(); + std::optional response; + util::CoroutineGroup coroutineGroup{yield, 1}; + auto const onTaskComplete = coroutineGroup.registerForeign(yield); + ASSERT(onTaskComplete.has_value(), "Coroutine group can't be full"); - // FIXME(#1702): Using veb server thread to handle prometheus request. Better to post on work queue. - auto maybeResponse = util::prometheus::handlePrometheusRequest( - httpRequest, adminVerifier_->isAdmin(httpRequest, connectionMetadata.ip()) + bool const postSuccessful = workQueue_.get().postCoro( + [this, &request, &response, &onTaskComplete = onTaskComplete.value(), &connectionMetadata]( + boost::asio::yield_context + ) mutable { + auto const maybeHttpRequest = request.asHttpRequest(); + ASSERT(maybeHttpRequest.has_value(), "Got not a http request in Get"); + auto const& httpRequest = maybeHttpRequest->get(); + + auto maybeResponse = util::prometheus::handlePrometheusRequest( + httpRequest, adminVerifier_->isAdmin(httpRequest, connectionMetadata.ip()) + ); + ASSERT(maybeResponse.has_value(), "Got unexpected request for Prometheus"); + response = web::ng::Response{std::move(maybeResponse).value(), request}; + // notify the coroutine group that the foreign task is done + onTaskComplete(); + }, + /* isWhiteListed= */ true, + rpc::WorkQueue::Priority::High ); - ASSERT(maybeResponse.has_value(), "Got unexpected request for Prometheus"); - return web::ng::Response{std::move(maybeResponse).value(), request}; + + if (!postSuccessful) { + return web::ng::Response{ + boost::beast::http::status::too_many_requests, rpc::makeError(rpc::RippledError::rpcTOO_BUSY), request + }; + } + + // Put the coroutine to sleep until the foreign task is done + coroutineGroup.asyncWait(yield); + ASSERT(response.has_value(), "Woke up coroutine without setting response"); + + return std::move(response).value(); } web::ng::Response diff --git a/src/app/WebHandlers.hpp b/src/app/WebHandlers.hpp index a70d0fca..cd5880ce 100644 --- a/src/app/WebHandlers.hpp +++ b/src/app/WebHandlers.hpp @@ -21,6 +21,7 @@ #include "data/LedgerCacheInterface.hpp" #include "rpc/Errors.hpp" +#include "rpc/WorkQueue.hpp" #include "util/log/Logger.hpp" #include "web/AdminVerificationStrategy.hpp" #include "web/SubscriptionContextInterface.hpp" @@ -119,20 +120,23 @@ public: */ class MetricsHandler { std::shared_ptr adminVerifier_; + std::reference_wrapper workQueue_; public: /** * @brief Construct a new MetricsHandler object * * @param adminVerifier The AdminVerificationStrategy to use for verifying the connection for admin access. + * @param workQueue The WorkQueue to use for handling the request. */ - MetricsHandler(std::shared_ptr adminVerifier); + MetricsHandler(std::shared_ptr adminVerifier, rpc::WorkQueue& workQueue); /** * @brief The call of the function object. * * @param request The request to handle. * @param connectionMetadata The connection metadata. + * @param yield The yield context. * @return The response to the request. */ web::ng::Response @@ -140,7 +144,7 @@ public: web::ng::Request const& request, web::ng::ConnectionMetadata& connectionMetadata, web::SubscriptionContextPtr, - boost::asio::yield_context + boost::asio::yield_context yield ); }; diff --git a/tests/unit/app/WebHandlersTests.cpp b/tests/unit/app/WebHandlersTests.cpp index c2177e9f..58b9f062 100644 --- a/tests/unit/app/WebHandlersTests.cpp +++ b/tests/unit/app/WebHandlersTests.cpp @@ -19,6 +19,7 @@ #include "app/WebHandlers.hpp" #include "rpc/Errors.hpp" +#include "rpc/WorkQueue.hpp" #include "util/AsioContextTestFixture.hpp" #include "util/MockLedgerCache.hpp" #include "util/MockPrometheus.hpp" @@ -122,7 +123,9 @@ struct MetricsHandlerTests : util::prometheus::WithPrometheus, SyncAsioContextTe std::make_shared>() }; - MetricsHandler metricsHandler{adminVerifier}; + rpc::WorkQueue workQueue{1}; + + MetricsHandler metricsHandler{adminVerifier, workQueue}; web::ng::Request request{http::request{http::verb::get, "/metrics", 11}}; };