diff --git a/benchmarks/rpc/WorkQueueBenchmarks.cpp b/benchmarks/rpc/WorkQueueBenchmarks.cpp index 51578c448..a5128abcc 100644 --- a/benchmarks/rpc/WorkQueueBenchmarks.cpp +++ b/benchmarks/rpc/WorkQueueBenchmarks.cpp @@ -29,13 +29,18 @@ #include #include +#include +#include +#include #include #include #include #include #include #include +#include +#include using namespace rpc; using namespace util::config; @@ -75,36 +80,56 @@ benchmarkWorkQueue(benchmark::State& state) { init(); - auto const total = static_cast(state.range(0)); - auto const numThreads = static_cast(state.range(1)); - auto const maxSize = static_cast(state.range(2)); - auto const delayMs = static_cast(state.range(3)); + auto const wqThreads = static_cast(state.range(0)); + auto const maxQueueSize = static_cast(state.range(1)); + auto const clientThreads = static_cast(state.range(2)); + auto const itemsPerClient = static_cast(state.range(3)); + auto const clientProcessingMs = static_cast(state.range(4)); for (auto _ : state) { std::atomic_size_t totalExecuted = 0uz; std::atomic_size_t totalQueued = 0uz; state.PauseTiming(); - WorkQueue queue(numThreads, maxSize); + WorkQueue queue(wqThreads, maxQueueSize); state.ResumeTiming(); - for (auto i = 0uz; i < total; ++i) { - totalQueued += static_cast(queue.postCoro( - [&delayMs, &totalExecuted](auto yield) { - ++totalExecuted; + std::vector threads; + threads.reserve(clientThreads); - boost::asio::steady_timer timer(yield.get_executor(), std::chrono::milliseconds{delayMs}); - timer.async_wait(yield); - }, - /* isWhiteListed = */ false - )); + for (auto t = 0uz; t < clientThreads; ++t) { + threads.emplace_back([&] { + for (auto i = 0uz; i < itemsPerClient; ++i) { + totalQueued += static_cast(queue.postCoro( + [&clientProcessingMs, &totalExecuted](auto yield) { + ++totalExecuted; + + boost::asio::steady_timer timer( + yield.get_executor(), std::chrono::milliseconds{clientProcessingMs} + ); + timer.async_wait(yield); + + std::this_thread::sleep_for(std::chrono::microseconds{10}); + }, + /* isWhiteListed = */ false + )); + } + }); } + for (auto& t : threads) + t.join(); + queue.stop(); ASSERT(totalExecuted == totalQueued, "Totals don't match"); - ASSERT(totalQueued <= total, "Queued more than requested"); - ASSERT(totalQueued >= maxSize, "Queued less than maxSize"); + ASSERT(totalQueued <= itemsPerClient * clientThreads, "Queued more than requested"); + + if (maxQueueSize == 0) { + ASSERT(totalQueued == itemsPerClient * clientThreads, "Queued exactly the expected amount"); + } else { + ASSERT(totalQueued >= std::min(maxQueueSize, itemsPerClient * clientThreads), "Queued less than expected"); + } } } @@ -118,5 +143,5 @@ benchmarkWorkQueue(benchmark::State& state) */ // TODO: figure out what happens on 1 thread BENCHMARK(benchmarkWorkQueue) - ->ArgsProduct({{1'000, 10'000, 100'000}, {2, 4, 8}, {0, 5'000}, {10, 100, 250}}) + ->ArgsProduct({{2, 4, 8, 16}, {0, 5'000}, {4, 8, 16}, {1'000, 10'000}, {10, 100, 250}}) ->Unit(benchmark::kMillisecond); diff --git a/src/rpc/WorkQueue.cpp b/src/rpc/WorkQueue.cpp index b676fc64b..a0a968823 100644 --- a/src/rpc/WorkQueue.cpp +++ b/src/rpc/WorkQueue.cpp @@ -25,9 +25,7 @@ #include "util/prometheus/Label.hpp" #include "util/prometheus/Prometheus.hpp" -#include #include -#include #include #include @@ -39,6 +37,27 @@ namespace rpc { +void +WorkQueue::OneTimeCallable::setCallable(std::function func) +{ + func_ = std::move(func); +} + +void +WorkQueue::OneTimeCallable::operator()() +{ + if (not called_) { + func_(); + called_ = true; + } +} + +WorkQueue::OneTimeCallable:: +operator bool() const +{ + return func_.operator bool(); +} + WorkQueue::WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize) : queued_{PrometheusService::counterInt( "work_queue_queued_total_number", @@ -56,8 +75,6 @@ WorkQueue::WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t "The current number of tasks in the queue" )} , ioc_{numWorkers} - , strand_{ioc_.get_executor()} - , waitTimer_(ioc_) { if (maxSize != 0) maxSize_ = maxSize; @@ -77,12 +94,14 @@ WorkQueue::~WorkQueue() void WorkQueue::startProcessing() { - util::spawn(strand_, [this](auto yield) { - ASSERT(not hasDispatcher_, "Dispatcher already running"); + ASSERT(not processingStarted_, "Attempt to start processing work queue more than once"); + processingStarted_ = true; - hasDispatcher_ = true; - dispatcherLoop(yield); - }); + // Spawn workers for all tasks that were queued before processing started + auto const numTasks = size(); + for (auto i = 0uz; i < numTasks; ++i) { + util::spawn(ioc_, [this](auto yield) { executeTask(yield); }); + } } bool @@ -98,93 +117,28 @@ WorkQueue::postCoro(TaskType func, bool isWhiteListed, Priority priority) return false; } - ++curSize_.get(); - auto needsWakeup = false; - { - auto state = dispatcherState_.lock(); - - needsWakeup = std::exchange(state->isIdle, false); - + auto state = queueState_.lock(); state->push(priority, std::move(func)); } - if (needsWakeup) - boost::asio::post(strand_, [this] { waitTimer_.cancel(); }); + ++curSize_.get(); + + if (not processingStarted_) + return true; + + util::spawn(ioc_, [this](auto yield) { executeTask(yield); }); return true; } -void -WorkQueue::dispatcherLoop(boost::asio::yield_context yield) -{ - LOG(log_.info()) << "WorkQueue dispatcher starting"; - - // all ongoing tasks must be completed before stopping fully - while (not stopping_ or size() > 0) { - std::optional task; - - { - auto state = dispatcherState_.lock(); - - if (state->empty()) { - state->isIdle = true; - } else { - task = state->popNext(); - } - } - - if (not stopping_ and not task.has_value()) { - waitTimer_.expires_at(std::chrono::steady_clock::time_point::max()); - boost::system::error_code ec; - waitTimer_.async_wait(yield[ec]); - } else if (task.has_value()) { - util::spawn( - ioc_, - [this, spawnedAt = std::chrono::system_clock::now(), task = std::move(*task)](auto yield) mutable { - auto const takenAt = std::chrono::system_clock::now(); - auto const waited = - std::chrono::duration_cast(takenAt - spawnedAt).count(); - - ++queued_.get(); - durationUs_.get() += waited; - LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); - - task(yield); - - --curSize_.get(); - } - ); - } - } - - LOG(log_.info()) << "WorkQueue dispatcher shutdown requested - time to execute onTasksComplete"; - - { - auto onTasksComplete = onQueueEmpty_.lock(); - ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true."); - onTasksComplete->operator()(); - } - - LOG(log_.info()) << "WorkQueue dispatcher finished"; -} - void WorkQueue::requestStop(std::function onQueueEmpty) { auto handler = onQueueEmpty_.lock(); - *handler = std::move(onQueueEmpty); + handler->setCallable(std::move(onQueueEmpty)); stopping_ = true; - auto needsWakeup = false; - - { - auto state = dispatcherState_.lock(); - needsWakeup = std::exchange(state->isIdle, false); - } - - if (needsWakeup) - boost::asio::post(strand_, [this] { waitTimer_.cancel(); }); } void @@ -194,6 +148,12 @@ WorkQueue::stop() requestStop(); ioc_.join(); + + { + auto onTasksComplete = onQueueEmpty_.lock(); + ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true."); + onTasksComplete->operator()(); + } } WorkQueue @@ -227,4 +187,29 @@ WorkQueue::size() const return curSize_.get().value(); } +void +WorkQueue::executeTask(boost::asio::yield_context yield) +{ + std::optional taskWithTimestamp; + { + auto state = queueState_.lock(); + taskWithTimestamp = state->popNext(); + } + + ASSERT( + taskWithTimestamp.has_value(), + "Queue should not be empty as we spawn a coro with executeTask for each postCoro." + ); + auto const takenAt = std::chrono::system_clock::now(); + auto const waited = + std::chrono::duration_cast(takenAt - taskWithTimestamp->queuedAt).count(); + + ++queued_.get(); + durationUs_.get() += waited; + LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); + + taskWithTimestamp->task(yield); + --curSize_.get(); +} + } // namespace rpc diff --git a/src/rpc/WorkQueue.hpp b/src/rpc/WorkQueue.hpp index 8fa466501..30ea4d7b3 100644 --- a/src/rpc/WorkQueue.hpp +++ b/src/rpc/WorkQueue.hpp @@ -25,15 +25,12 @@ #include "util/prometheus/Counter.hpp" #include "util/prometheus/Gauge.hpp" -#include #include -#include -#include #include -#include #include #include +#include #include #include #include @@ -64,7 +61,13 @@ struct Reportable { */ class WorkQueue : public Reportable { using TaskType = std::function; - using QueueType = std::queue; + + struct TaskWithTimestamp { + TaskType task; + std::chrono::system_clock::time_point queuedAt; + }; + + using QueueType = std::queue; public: /** @@ -76,22 +79,21 @@ public: }; private: - struct DispatcherState { + struct QueueState { QueueType high; QueueType normal; - bool isIdle = false; size_t highPriorityCounter = 0; void - push(Priority priority, auto&& task) + push(Priority priority, TaskType&& task) { auto& queue = [this, priority] -> QueueType& { if (priority == Priority::High) return high; return normal; }(); - queue.push(std::forward(task)); + queue.push(TaskWithTimestamp{.task = std::move(task), .queuedAt = std::chrono::system_clock::now()}); } [[nodiscard]] bool @@ -100,21 +102,21 @@ private: return high.empty() and normal.empty(); } - [[nodiscard]] std::optional + [[nodiscard]] std::optional popNext() { if (not high.empty() and (highPriorityCounter < kTAKE_HIGH_PRIO or normal.empty())) { - auto task = std::move(high.front()); + auto taskWithTimestamp = std::move(high.front()); high.pop(); ++highPriorityCounter; - return task; + return taskWithTimestamp; } if (not normal.empty()) { - auto task = std::move(normal.front()); + auto taskWithTimestamp = std::move(normal.front()); normal.pop(); highPriorityCounter = 0; - return task; + return taskWithTimestamp; } return std::nullopt; @@ -133,14 +135,26 @@ private: util::Logger log_{"RPC"}; boost::asio::thread_pool ioc_; - boost::asio::strand strand_; - bool hasDispatcher_ = false; std::atomic_bool stopping_; + std::atomic_bool processingStarted_{false}; - util::Mutex> onQueueEmpty_; - util::Mutex dispatcherState_; - boost::asio::steady_timer waitTimer_; + class OneTimeCallable { + std::function func_; + bool called_{false}; + + public: + void + setCallable(std::function func); + + void + operator()(); + + explicit + operator bool() const; + }; + util::Mutex onQueueEmpty_; + util::Mutex queueState_; public: struct DontStartProcessingTag {}; @@ -234,7 +248,7 @@ public: private: void - dispatcherLoop(boost::asio::yield_context yield); + executeTask(boost::asio::yield_context yield); }; } // namespace rpc diff --git a/tests/unit/rpc/WorkQueueTests.cpp b/tests/unit/rpc/WorkQueueTests.cpp index d32422e6c..c716c808d 100644 --- a/tests/unit/rpc/WorkQueueTests.cpp +++ b/tests/unit/rpc/WorkQueueTests.cpp @@ -18,6 +18,7 @@ //============================================================================== #include "rpc/WorkQueue.hpp" +#include "util/MockAssert.hpp" #include "util/MockPrometheus.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigValue.hpp" @@ -29,10 +30,12 @@ #include #include +#include #include #include #include #include +#include #include using namespace util; @@ -111,7 +114,32 @@ TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded) EXPECT_TRUE(unblocked); } -struct WorkQueuePriorityTest : WithPrometheus, virtual ::testing::Test { +struct WorkQueueDelayedStartTest : WithPrometheus { + WorkQueue queue{WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100}; +}; + +TEST_F(WorkQueueDelayedStartTest, WaitTimeIncludesDelayBeforeStartProcessing) +{ + std::atomic_bool taskExecuted = false; + + ASSERT_TRUE(queue.postCoro( + [&taskExecuted](auto /* yield */) { taskExecuted = true; }, + /* isWhiteListed = */ true + )); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + queue.startProcessing(); + queue.stop(); + + EXPECT_TRUE(taskExecuted); + + auto const report = queue.report(); + auto const durationUs = report.at("queued_duration_us").as_uint64(); + + EXPECT_GE(durationUs, 50000u) << "Wait time should include the delay before startProcessing"; +} + +struct WorkQueuePriorityTest : WithPrometheus { WorkQueue queue{WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100}; }; @@ -207,11 +235,7 @@ TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingOnLastTask) queue.stop(); } -struct WorkQueueMockPrometheusTest : WithMockPrometheus, RPCWorkQueueTestBase { - WorkQueueMockPrometheusTest() : RPCWorkQueueTestBase(/* workers = */ 1, /*maxQueueSize = */ 2) - { - } -}; +struct WorkQueueMockPrometheusTest : WithMockPrometheus {}; TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) { @@ -221,17 +245,40 @@ TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) std::binary_semaphore semaphore{0}; - EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0)).WillRepeatedly(::testing::Return(1)); + EXPECT_CALL(curSizeMock, value()) + .WillOnce(::testing::Return(0)) // in startProcessing + .WillOnce(::testing::Return(0)); // first check in postCoro EXPECT_CALL(curSizeMock, add(1)); EXPECT_CALL(queuedMock, add(1)); EXPECT_CALL(durationMock, add(::testing::Ge(0))).WillOnce([&](auto) { EXPECT_CALL(curSizeMock, add(-1)); - EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0)); semaphore.release(); }); + // Note: the queue is not in the fixture because above expectations must be setup before startProcessing runs + WorkQueue queue(/* numWorkers = */ 4, /* maxSize = */ 2); auto const res = queue.postCoro([&](auto /* yield */) { semaphore.acquire(); }, /* isWhiteListed = */ false); ASSERT_TRUE(res); queue.stop(); } + +// Note: not using EXPECT_CLIO_ASSERT_FAIL because exception is swallowed by the WQ context +// TODO [https://github.com/XRPLF/clio/issues/2906]: Enable the test once we figure out a better way to do it without +// using up >2 minutes of CI time +struct WorkQueueDeathTest : WorkQueueMockPrometheusTest, common::util::WithMockAssert {}; +TEST_F(WorkQueueDeathTest, DISABLED_ExecuteTaskAssertsWhenQueueIsEmpty) +{ + [[maybe_unused]] auto& queuedMock = makeMock("work_queue_queued_total_number", ""); + [[maybe_unused]] auto& durationMock = makeMock("work_queue_cumulative_tasks_duration_us", ""); + auto& curSizeMock = makeMock("work_queue_current_size", ""); + + EXPECT_CALL(curSizeMock, value()).WillRepeatedly(::testing::Return(1)); // lie about the size + EXPECT_DEATH( + { + WorkQueue queue(WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 2); + queue.startProcessing(); // the actual queue is empty which will lead to assertion failure + }, + ".*" + ); +}