From 2327e81b0ba2cb9808c8690aa09c0d5ba8200c22 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Fri, 19 Dec 2025 15:26:55 +0000 Subject: [PATCH] fix: WorkQueue contention (#2866) Co-authored-by: Ayaz Salikhov --- benchmarks/CMakeLists.txt | 4 +- benchmarks/rpc/WorkQueueBenchmarks.cpp | 127 +++++++++++++++++++++++++ src/rpc/WorkQueue.cpp | 48 ++++------ src/rpc/WorkQueue.hpp | 23 +++++ 4 files changed, 171 insertions(+), 31 deletions(-) create mode 100644 benchmarks/rpc/WorkQueueBenchmarks.cpp diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index ec2d91eb9..e70ee2bb4 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -9,10 +9,12 @@ target_sources( util/async/ExecutionContextBenchmarks.cpp # Logger util/log/LoggerBenchmark.cpp + # WorkQueue + rpc/WorkQueueBenchmarks.cpp ) include(deps/gbench) target_include_directories(clio_benchmark PRIVATE .) -target_link_libraries(clio_benchmark PUBLIC clio_util benchmark::benchmark_main spdlog::spdlog) +target_link_libraries(clio_benchmark PUBLIC clio_util clio_rpc benchmark::benchmark_main spdlog::spdlog) set_target_properties(clio_benchmark PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) diff --git a/benchmarks/rpc/WorkQueueBenchmarks.cpp b/benchmarks/rpc/WorkQueueBenchmarks.cpp new file mode 100644 index 000000000..9b9ce6947 --- /dev/null +++ b/benchmarks/rpc/WorkQueueBenchmarks.cpp @@ -0,0 +1,127 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "rpc/WorkQueue.hpp" +#include "util/Assert.hpp" +#include "util/config/Array.hpp" +#include "util/config/ConfigConstraints.hpp" +#include "util/config/ConfigDefinition.hpp" +#include "util/config/ConfigValue.hpp" +#include "util/config/Types.hpp" +#include "util/log/Logger.hpp" +#include "util/prometheus/Prometheus.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace rpc; +using namespace util::config; + +namespace { + +auto const kCONFIG = ClioConfigDefinition{ + {"prometheus.compress_reply", ConfigValue{ConfigType::Boolean}.defaultValue(true)}, + {"prometheus.enabled", ConfigValue{ConfigType::Boolean}.defaultValue(true)}, + {"log.channels.[].channel", Array{ConfigValue{ConfigType::String}}}, + {"log.channels.[].level", Array{ConfigValue{ConfigType::String}}}, + {"log.level", ConfigValue{ConfigType::String}.defaultValue("info")}, + {"log.format", ConfigValue{ConfigType::String}.defaultValue(R"(%Y-%m-%d %H:%M:%S.%f %^%3!l:%n%$ - %v)")}, + {"log.is_async", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, + {"log.enable_console", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, + {"log.directory", ConfigValue{ConfigType::String}.optional()}, + {"log.rotation_size", ConfigValue{ConfigType::Integer}.defaultValue(2048).withConstraint(gValidateUint32)}, + {"log.directory_max_files", ConfigValue{ConfigType::Integer}.defaultValue(25).withConstraint(gValidateUint32)}, + {"log.tag_style", ConfigValue{ConfigType::String}.defaultValue("none")}, +}; + +// this should be a fixture but it did not work with Args very well +void +init() +{ + static std::once_flag kONCE; + std::call_once(kONCE, [] { + PrometheusService::init(kCONFIG); + (void)util::LogService::init(kCONFIG); + }); +} + +} // namespace + +static void +benchmarkWorkQueue(benchmark::State& state) +{ + init(); + + auto const total = static_cast(state.range(0)); + auto const numThreads = static_cast(state.range(1)); + auto const maxSize = static_cast(state.range(2)); + auto const delayMs = static_cast(state.range(3)); + + for (auto _ : state) { + std::atomic_size_t totalExecuted = 0uz; + std::atomic_size_t totalQueued = 0uz; + + state.PauseTiming(); + WorkQueue queue(numThreads, maxSize); + state.ResumeTiming(); + + for (auto i = 0uz; i < total; ++i) { + totalQueued += static_cast(queue.postCoro( + [&delayMs, &totalExecuted](auto yield) { + ++totalExecuted; + + boost::asio::steady_timer timer(yield.get_executor(), std::chrono::milliseconds{delayMs}); + timer.async_wait(yield); + }, + /* isWhiteListed = */ false + )); + } + + queue.stop(); + + ASSERT(totalExecuted == totalQueued, "Totals don't match"); + ASSERT(totalQueued <= total, "Queued more than requested"); + ASSERT(totalQueued >= maxSize, "Queued less than maxSize"); + } +} + +// Usage example: +/* + ./clio_benchmark \ + --benchmark_repetitions=10 \ + --benchmark_display_aggregates_only=true \ + --benchmark_min_time=1x \ + --benchmark_filter="WorkQueue" +*/ +// TODO: figure out what happens on 1 thread +BENCHMARK(benchmarkWorkQueue) + ->ArgsProduct({{1'000, 10'000, 100'000}, {2, 4, 8}, {0, 5'000}, {10, 100, 250}}) + ->Unit(benchmark::kMillisecond); diff --git a/src/rpc/WorkQueue.cpp b/src/rpc/WorkQueue.cpp index 34617dec3..b676fc64b 100644 --- a/src/rpc/WorkQueue.cpp +++ b/src/rpc/WorkQueue.cpp @@ -34,8 +34,8 @@ #include #include #include +#include #include -#include namespace rpc { @@ -122,7 +122,7 @@ WorkQueue::dispatcherLoop(boost::asio::yield_context yield) // all ongoing tasks must be completed before stopping fully while (not stopping_ or size() > 0) { - std::vector batch; + std::optional task; { auto state = dispatcherState_.lock(); @@ -130,43 +130,31 @@ WorkQueue::dispatcherLoop(boost::asio::yield_context yield) if (state->empty()) { state->isIdle = true; } else { - for (auto count = 0uz; count < kTAKE_HIGH_PRIO and not state->high.empty(); ++count) { - batch.push_back(std::move(state->high.front())); - state->high.pop(); - } - - if (not state->normal.empty()) { - batch.push_back(std::move(state->normal.front())); - state->normal.pop(); - } + task = state->popNext(); } } - if (not stopping_ and batch.empty()) { + if (not stopping_ and not task.has_value()) { waitTimer_.expires_at(std::chrono::steady_clock::time_point::max()); boost::system::error_code ec; waitTimer_.async_wait(yield[ec]); - } else { - for (auto task : std::move(batch)) { - util::spawn( - ioc_, - [this, spawnedAt = std::chrono::system_clock::now(), task = std::move(task)](auto yield) mutable { - auto const takenAt = std::chrono::system_clock::now(); - auto const waited = - std::chrono::duration_cast(takenAt - spawnedAt).count(); + } else if (task.has_value()) { + util::spawn( + ioc_, + [this, spawnedAt = std::chrono::system_clock::now(), task = std::move(*task)](auto yield) mutable { + auto const takenAt = std::chrono::system_clock::now(); + auto const waited = + std::chrono::duration_cast(takenAt - spawnedAt).count(); - ++queued_.get(); - durationUs_.get() += waited; - LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); + ++queued_.get(); + durationUs_.get() += waited; + LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); - task(yield); + task(yield); - --curSize_.get(); - } - ); - } - - boost::asio::post(ioc_.get_executor(), yield); // yield back to avoid hijacking the thread + --curSize_.get(); + } + ); } } diff --git a/src/rpc/WorkQueue.hpp b/src/rpc/WorkQueue.hpp index 702a173df..8fa466501 100644 --- a/src/rpc/WorkQueue.hpp +++ b/src/rpc/WorkQueue.hpp @@ -38,7 +38,9 @@ #include #include #include +#include #include +#include namespace rpc { @@ -79,6 +81,7 @@ private: QueueType normal; bool isIdle = false; + size_t highPriorityCounter = 0; void push(Priority priority, auto&& task) @@ -96,6 +99,26 @@ private: { return high.empty() and normal.empty(); } + + [[nodiscard]] std::optional + popNext() + { + if (not high.empty() and (highPriorityCounter < kTAKE_HIGH_PRIO or normal.empty())) { + auto task = std::move(high.front()); + high.pop(); + ++highPriorityCounter; + return task; + } + + if (not normal.empty()) { + auto task = std::move(normal.front()); + normal.pop(); + highPriorityCounter = 0; + return task; + } + + return std::nullopt; + } }; private: