From c973e99f4b099b0bf5453ea6e8841c7bb527640e Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Fri, 7 Nov 2025 17:42:55 +0000 Subject: [PATCH] feat: WorkQueue priorities (#2721) Co-authored-by: Sergey Kuznetsov Co-authored-by: Ayaz Salikhov --- src/rpc/Counters.cpp | 2 +- src/rpc/Counters.hpp | 4 +- src/rpc/WorkQueue.cpp | 177 ++++++++++++++++++++++----- src/rpc/WorkQueue.hpp | 171 ++++++++++++++++---------- src/util/CMakeLists.txt | 2 + tests/common/util/MockPrometheus.hpp | 1 + tests/unit/rpc/CountersTests.cpp | 21 ++-- tests/unit/rpc/WorkQueueTests.cpp | 122 +++++++++++++----- 8 files changed, 362 insertions(+), 138 deletions(-) diff --git a/src/rpc/Counters.cpp b/src/rpc/Counters.cpp index afb6b489..a69e3cef 100644 --- a/src/rpc/Counters.cpp +++ b/src/rpc/Counters.cpp @@ -102,7 +102,7 @@ Counters::getMethodInfo(std::string const& method) return it->second; } -Counters::Counters(WorkQueue const& wq) +Counters::Counters(Reportable const& wq) : tooBusyCounter_( PrometheusService::counterInt( "rpc_error_total_number", diff --git a/src/rpc/Counters.hpp b/src/rpc/Counters.hpp index 962c89b7..77237333 100644 --- a/src/rpc/Counters.hpp +++ b/src/rpc/Counters.hpp @@ -66,7 +66,7 @@ class Counters { CounterType unknownCommandCounter_; CounterType internalErrorCounter_; - std::reference_wrapper workQueue_; + std::reference_wrapper workQueue_; std::chrono::time_point startupTime_; public: @@ -75,7 +75,7 @@ public: * * @param wq The work queue to operate on */ - Counters(WorkQueue const& wq); + Counters(Reportable const& wq); /** * @brief A factory function that creates a new counters instance. diff --git a/src/rpc/WorkQueue.cpp b/src/rpc/WorkQueue.cpp index 64412066..121b0d26 100644 --- a/src/rpc/WorkQueue.cpp +++ b/src/rpc/WorkQueue.cpp @@ -19,40 +19,28 @@ #include "rpc/WorkQueue.hpp" +#include "util/Assert.hpp" +#include "util/Spawn.hpp" #include "util/log/Logger.hpp" #include "util/prometheus/Label.hpp" #include "util/prometheus/Prometheus.hpp" +#include +#include +#include +#include #include +#include #include #include #include #include +#include namespace rpc { -void -WorkQueue::OneTimeCallable::setCallable(std::function func) -{ - func_ = func; -} - -void -WorkQueue::OneTimeCallable::operator()() -{ - if (not called_) { - func_(); - called_ = true; - } -} -WorkQueue::OneTimeCallable:: -operator bool() const -{ - return func_.operator bool(); -} - -WorkQueue::WorkQueue(std::uint32_t numWorkers, uint32_t maxSize) +WorkQueue::WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize) : queued_{PrometheusService::counterInt( "work_queue_queued_total_number", util::prometheus::Labels(), @@ -69,25 +57,156 @@ WorkQueue::WorkQueue(std::uint32_t numWorkers, uint32_t maxSize) "The current number of tasks in the queue" )} , ioc_{numWorkers} + , strand_{ioc_.get_executor()} + , waitTimer_(ioc_) { if (maxSize != 0) maxSize_ = maxSize; } +WorkQueue::WorkQueue(std::uint32_t numWorkers, uint32_t maxSize) + : WorkQueue(kDONT_START_PROCESSING_TAG, numWorkers, maxSize) +{ + startProcessing(); +} + WorkQueue::~WorkQueue() { - join(); + stop(); } void -WorkQueue::stop(std::function onQueueEmpty) +WorkQueue::startProcessing() +{ + util::spawn(strand_, [this](auto yield) { + ASSERT(not hasDispatcher_, "Dispatcher already running"); + + hasDispatcher_ = true; + dispatcherLoop(yield); + }); +} + +bool +WorkQueue::postCoro(TaskType func, bool isWhiteListed, Priority priority) +{ + if (stopping_) { + LOG(log_.warn()) << "Queue is stopping, rejecting incoming task."; + return false; + } + + if (size() >= maxSize_ && !isWhiteListed) { + LOG(log_.warn()) << "Queue is full. rejecting job. current size = " << size() << "; max size = " << maxSize_; + return false; + } + + ++curSize_.get(); + auto needsWakeup = false; + + { + auto state = dispatcherState_.lock(); + + needsWakeup = std::exchange(state->isIdle, false); + + state->push(priority, std::move(func)); + } + + if (needsWakeup) + boost::asio::post(strand_, [this] { waitTimer_.cancel(); }); + + return true; +} + +void +WorkQueue::dispatcherLoop(boost::asio::yield_context yield) +{ + LOG(log_.info()) << "WorkQueue dispatcher starting"; + + // all ongoing tasks must be completed before stopping fully + while (not stopping_ or size() > 0) { + std::vector batch; + + { + auto state = dispatcherState_.lock(); + + if (state->empty()) { + state->isIdle = true; + } else { + for (auto count = 0uz; count < kTAKE_HIGH_PRIO and not state->high.empty(); ++count) { + batch.push_back(std::move(state->high.front())); + state->high.pop(); + } + + if (not state->normal.empty()) { + batch.push_back(std::move(state->normal.front())); + state->normal.pop(); + } + } + } + + if (not stopping_ and batch.empty()) { + waitTimer_.expires_at(std::chrono::steady_clock::time_point::max()); + boost::system::error_code ec; + waitTimer_.async_wait(yield[ec]); + } else { + for (auto task : std::move(batch)) { + util::spawn( + ioc_, + [this, spawnedAt = std::chrono::system_clock::now(), task = std::move(task)](auto yield) mutable { + auto const takenAt = std::chrono::system_clock::now(); + auto const waited = + std::chrono::duration_cast(takenAt - spawnedAt).count(); + + ++queued_.get(); + durationUs_.get() += waited; + LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size(); + + task(yield); + + --curSize_.get(); + } + ); + } + + boost::asio::post(ioc_.get_executor(), yield); // yield back to avoid hijacking the thread + } + } + + LOG(log_.info()) << "WorkQueue dispatcher shutdown requested - time to execute onTasksComplete"; + + { + auto onTasksComplete = onQueueEmpty_.lock(); + ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true."); + onTasksComplete->operator()(); + } + + LOG(log_.info()) << "WorkQueue dispatcher finished"; +} + +void +WorkQueue::requestStop(std::function onQueueEmpty) { auto handler = onQueueEmpty_.lock(); - handler->setCallable(std::move(onQueueEmpty)); + *handler = std::move(onQueueEmpty); + stopping_ = true; - if (size() == 0) { - handler->operator()(); + auto needsWakeup = false; + + { + auto state = dispatcherState_.lock(); + needsWakeup = std::exchange(state->isIdle, false); } + + if (needsWakeup) + boost::asio::post(strand_, [this] { waitTimer_.cancel(); }); +} + +void +WorkQueue::stop() +{ + if (not stopping_.exchange(true)) + requestStop(); + + ioc_.join(); } WorkQueue @@ -115,12 +234,6 @@ WorkQueue::report() const return obj; } -void -WorkQueue::join() -{ - ioc_.join(); -} - size_t WorkQueue::size() const { diff --git a/src/rpc/WorkQueue.hpp b/src/rpc/WorkQueue.hpp index 57610f4b..702a173d 100644 --- a/src/rpc/WorkQueue.hpp +++ b/src/rpc/WorkQueue.hpp @@ -19,9 +19,7 @@ #pragma once -#include "util/Assert.hpp" #include "util/Mutex.hpp" -#include "util/Spawn.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/log/Logger.hpp" #include "util/prometheus/Counter.hpp" @@ -29,23 +27,80 @@ #include #include +#include +#include #include #include #include #include -#include #include #include #include #include +#include namespace rpc { +/** + * @brief An interface for any class providing a report as json object + */ +struct Reportable { + virtual ~Reportable() = default; + + /** + * @brief Generate a report of the work queue state. + * + * @return The report as a JSON object. + */ + [[nodiscard]] virtual boost::json::object + report() const = 0; +}; + /** * @brief An asynchronous, thread-safe queue for RPC requests. */ -class WorkQueue { +class WorkQueue : public Reportable { + using TaskType = std::function; + using QueueType = std::queue; + +public: + /** + * @brief Represents a task scheduling priority + */ + enum class Priority : uint8_t { + High, + Default, + }; + +private: + struct DispatcherState { + QueueType high; + QueueType normal; + + bool isIdle = false; + + void + push(Priority priority, auto&& task) + { + auto& queue = [this, priority] -> QueueType& { + if (priority == Priority::High) + return high; + return normal; + }(); + queue.push(std::forward(task)); + } + + [[nodiscard]] bool + empty() const + { + return high.empty() and normal.empty(); + } + }; + +private: + static constexpr auto kTAKE_HIGH_PRIO = 4uz; + // these are cumulative for the lifetime of the process std::reference_wrapper queued_; std::reference_wrapper durationUs_; @@ -55,33 +110,46 @@ class WorkQueue { util::Logger log_{"RPC"}; boost::asio::thread_pool ioc_; + boost::asio::strand strand_; + bool hasDispatcher_ = false; std::atomic_bool stopping_; - class OneTimeCallable { - std::function func_; - bool called_{false}; - - public: - void - setCallable(std::function func); - - void - operator()(); - - operator bool() const; - }; - util::Mutex onQueueEmpty_; + util::Mutex> onQueueEmpty_; + util::Mutex dispatcherState_; + boost::asio::steady_timer waitTimer_; public: + struct DontStartProcessingTag {}; + static constexpr DontStartProcessingTag kDONT_START_PROCESSING_TAG = {}; + /** - * @brief Create an we instance of the work queue. + * @brief Create an instance of the work queue. + * + * The work queue immediately starts to process tasks as they come. * * @param numWorkers The amount of threads to spawn in the pool * @param maxSize The maximum capacity of the queue; 0 means unlimited */ WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0); - ~WorkQueue(); + + /** + * @brief Create an instance of the work queue without starting the processing of events. + * + * Clients are expected to call `startProcessing` manually once ready to start processing tasks. + * + * @param numWorkers The amount of threads to spawn in the pool + * @param maxSize The maximum capacity of the queue; 0 means unlimited + */ + WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize = 0); + + ~WorkQueue() override; + + /** + * @brief Start processing of the enqueued tasks. + */ + void + startProcessing(); /** * @brief Put the work queue into a stopping state. This will prevent new jobs from being queued. @@ -89,7 +157,13 @@ public: * @param onQueueEmpty A callback to run when the last task in the queue is completed */ void - stop(std::function onQueueEmpty); + requestStop(std::function onQueueEmpty = [] {}); + + /** + * @brief Put the work queue into a stopping state and await workers to finish. + */ + void + stop(); /** * @brief A factory function that creates the work queue based on a config. @@ -97,7 +171,7 @@ public: * @param config The Clio config to use * @return The work queue */ - static WorkQueue + [[nodiscard]] static WorkQueue makeWorkQueue(util::config::ClioConfigDefinition const& config); /** @@ -105,60 +179,21 @@ public: * * The job will be rejected if isWhiteListed is set to false and the current size of the queue reached capacity. * - * @tparam FnType The function object type * @param func The function object to queue as a job * @param isWhiteListed Whether the queue capacity applies to this job + * @param priority The priority of the task * @return true if the job was successfully queued; false otherwise */ - template bool - postCoro(FnType&& func, bool isWhiteListed) - { - if (stopping_) { - LOG(log_.warn()) << "Queue is stopping, rejecting incoming task."; - return false; - } - - if (curSize_.get().value() >= maxSize_ && !isWhiteListed) { - LOG(log_.warn()) << "Queue is full. rejecting job. current size = " << curSize_.get().value() - << "; max size = " << maxSize_; - return false; - } - - ++curSize_.get(); - - // Each time we enqueue a job, we want to post a symmetrical job that will dequeue and run the job at the front - // of the job queue. - util::spawn( - ioc_, - [this, func = std::forward(func), start = std::chrono::system_clock::now()](auto yield) mutable { - auto const run = std::chrono::system_clock::now(); - auto const wait = std::chrono::duration_cast(run - start).count(); - - ++queued_.get(); - durationUs_.get() += wait; - LOG(log_.info()) << "WorkQueue wait time = " << wait << " queue size = " << curSize_.get().value(); - - func(yield); - --curSize_.get(); - if (curSize_.get().value() == 0 && stopping_) { - auto onTasksComplete = onQueueEmpty_.lock(); - ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true."); - onTasksComplete->operator()(); - } - } - ); - - return true; - } + postCoro(TaskType func, bool isWhiteListed, Priority priority = Priority::Default); /** * @brief Generate a report of the work queue state. * * @return The report as a JSON object. */ - boost::json::object - report() const; + [[nodiscard]] boost::json::object + report() const override; /** * @brief Wait until all the jobs in the queue are finished. @@ -171,8 +206,12 @@ public: * * @return The number of jobs in the queue. */ - size_t + [[nodiscard]] size_t size() const; + +private: + void + dispatcherLoop(boost::asio::yield_context yield); }; } // namespace rpc diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 334103ab..ba393f29 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -52,6 +52,8 @@ target_link_libraries( clio_util PUBLIC Boost::headers Boost::iostreams + Boost::coroutine + Boost::context fmt::fmt openssl::openssl xrpl::libxrpl diff --git a/tests/common/util/MockPrometheus.hpp b/tests/common/util/MockPrometheus.hpp index 8b0b23dc..90edef87 100644 --- a/tests/common/util/MockPrometheus.hpp +++ b/tests/common/util/MockPrometheus.hpp @@ -185,6 +185,7 @@ struct MockPrometheusImpl : PrometheusInterface { } std::unordered_map> metrics; + std::unordered_map> counterIntImpls; std::unordered_map> counterUintImpls; std::unordered_map> counterDoubleImpls; diff --git a/tests/unit/rpc/CountersTests.cpp b/tests/unit/rpc/CountersTests.cpp index 6480e5a7..2a22e1e4 100644 --- a/tests/unit/rpc/CountersTests.cpp +++ b/tests/unit/rpc/CountersTests.cpp @@ -23,6 +23,7 @@ #include "util/MockPrometheus.hpp" #include "util/prometheus/Counter.hpp" +#include #include #include #include @@ -37,9 +38,15 @@ using util::prometheus::CounterInt; using util::prometheus::WithMockPrometheus; using util::prometheus::WithPrometheus; -struct RPCCountersTest : WithPrometheus { - WorkQueue queue{4u, 1024u}; // todo: mock instead - Counters counters{queue}; +struct RPCCountersBaseTest { + struct WorkQueueMock : Reportable { + MOCK_METHOD(boost::json::object, report, (), (const, override)); + }; + testing::StrictMock workQueueMock; +}; + +struct RPCCountersTest : WithPrometheus, RPCCountersBaseTest { + Counters counters{workQueueMock}; }; TEST_F(RPCCountersTest, CheckThatCountersAddUp) @@ -57,6 +64,7 @@ TEST_F(RPCCountersTest, CheckThatCountersAddUp) counters.onInternalError(); } + EXPECT_CALL(workQueueMock, report); auto const report = counters.report(); auto const& rpc = report.at(JS(rpc)).as_object(); @@ -103,13 +111,10 @@ TEST_F(RPCCountersTest, CheckThatCountersAddUp) EXPECT_EQ(boost::json::value_to(report.at("bad_syntax_errors")), "512"); EXPECT_EQ(boost::json::value_to(report.at("unknown_command_errors")), "512"); EXPECT_EQ(boost::json::value_to(report.at("internal_errors")), "512"); - - EXPECT_EQ(report.at("work_queue"), queue.report()); // Counters report includes queue report } -struct RPCCountersMockPrometheusTests : WithMockPrometheus { - WorkQueue queue{4u, 1024u}; // todo: mock instead - Counters counters{queue}; +struct RPCCountersMockPrometheusTests : WithMockPrometheus, RPCCountersBaseTest { + Counters counters{workQueueMock}; }; TEST_F(RPCCountersMockPrometheusTests, rpcFailed) diff --git a/tests/unit/rpc/WorkQueueTests.cpp b/tests/unit/rpc/WorkQueueTests.cpp index bce7c9f5..d32422e6 100644 --- a/tests/unit/rpc/WorkQueueTests.cpp +++ b/tests/unit/rpc/WorkQueueTests.cpp @@ -30,8 +30,10 @@ #include #include +#include #include #include +#include using namespace util; using namespace util::config; @@ -39,15 +41,24 @@ using namespace rpc; using namespace util::prometheus; struct RPCWorkQueueTestBase : public virtual ::testing::Test { - ClioConfigDefinition cfg = { - {"server.max_queue_size", ConfigValue{ConfigType::Integer}.defaultValue(2)}, - {"workers", ConfigValue{ConfigType::Integer}.defaultValue(4)} - }; + ClioConfigDefinition cfg; + WorkQueue queue; - WorkQueue queue = WorkQueue::makeWorkQueue(cfg); + RPCWorkQueueTestBase(uint32_t workers, uint32_t maxQueueSize) + : cfg{ + {"server.max_queue_size", ConfigValue{ConfigType::Integer}.defaultValue(maxQueueSize)}, + {"workers", ConfigValue{ConfigType::Integer}.defaultValue(workers)}, + } + , queue{WorkQueue::makeWorkQueue(cfg)} + { + } }; -struct WorkQueueTest : WithPrometheus, RPCWorkQueueTestBase {}; +struct WorkQueueTest : WithPrometheus, RPCWorkQueueTestBase { + WorkQueueTest() : RPCWorkQueueTestBase(/* workers = */ 4, /* maxQueueSize = */ 2) + { + } +}; TEST_F(WorkQueueTest, WhitelistedExecutionCountAddsUp) { @@ -55,10 +66,10 @@ TEST_F(WorkQueueTest, WhitelistedExecutionCountAddsUp) std::atomic_uint32_t executeCount = 0u; for (auto i = 0u; i < kTOTAL; ++i) { - queue.postCoro([&executeCount](auto /* yield */) { ++executeCount; }, true); + queue.postCoro([&executeCount](auto /* yield */) { ++executeCount; }, /* isWhiteListed = */ true); } - queue.join(); + queue.stop(); auto const report = queue.report(); @@ -71,7 +82,6 @@ TEST_F(WorkQueueTest, WhitelistedExecutionCountAddsUp) TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded) { static constexpr auto kTOTAL = 3u; - auto expectedCount = 2u; auto unblocked = false; std::mutex mtx; @@ -82,10 +92,8 @@ TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded) [&](auto /* yield */) { std::unique_lock lk{mtx}; cv.wait(lk, [&] { return unblocked; }); - - --expectedCount; }, - false + /* isWhiteListed = */ false ); if (i == kTOTAL - 1) { @@ -99,10 +107,60 @@ TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded) } } - queue.join(); + queue.stop(); EXPECT_TRUE(unblocked); } +struct WorkQueuePriorityTest : WithPrometheus, virtual ::testing::Test { + WorkQueue queue{WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100}; +}; + +TEST_F(WorkQueuePriorityTest, HighPriorityTasks) +{ + static constexpr auto kTOTAL = 10; + std::vector executionOrder; + std::mutex mtx; + + for (int i = 0; i < kTOTAL; ++i) { + queue.postCoro( + [&](auto) { + std::lock_guard const lock(mtx); + executionOrder.push_back(WorkQueue::Priority::High); + }, + /* isWhiteListed = */ true, + WorkQueue::Priority::High + ); + queue.postCoro( + [&](auto) { + std::lock_guard const lock(mtx); + executionOrder.push_back(WorkQueue::Priority::Default); + }, + /* isWhiteListed = */ true, + WorkQueue::Priority::Default + ); + } + + queue.startProcessing(); + queue.stop(); + + // with 1 worker and the above, the execution order is deterministic + // we should see 4 high prio tasks, then 1 normal prio task, until high prio tasks are depleted + std::vector const expectedOrder = { + WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::High, + WorkQueue::Priority::High, WorkQueue::Priority::Default, WorkQueue::Priority::High, + WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::High, + WorkQueue::Priority::Default, WorkQueue::Priority::High, WorkQueue::Priority::High, + WorkQueue::Priority::Default, WorkQueue::Priority::Default, WorkQueue::Priority::Default, + WorkQueue::Priority::Default, WorkQueue::Priority::Default, WorkQueue::Priority::Default, + WorkQueue::Priority::Default, WorkQueue::Priority::Default, + }; + + ASSERT_EQ(executionOrder.size(), expectedOrder.size()); + for (auto i = 0uz; i < executionOrder.size(); ++i) { + EXPECT_EQ(executionOrder[i], expectedOrder[i]) << "Mismatch at index " << i; + } +} + struct WorkQueueStopTest : WorkQueueTest { testing::StrictMock> onTasksComplete; testing::StrictMock> taskMock; @@ -111,23 +169,24 @@ struct WorkQueueStopTest : WorkQueueTest { TEST_F(WorkQueueStopTest, RejectsNewTasksWhenStopping) { EXPECT_CALL(taskMock, Call()); - EXPECT_TRUE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, false)); + EXPECT_TRUE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, /* isWhiteListed = */ false)); - queue.stop([]() {}); - EXPECT_FALSE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, false)); + queue.requestStop(); + EXPECT_FALSE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, /* isWhiteListed = */ false)); - queue.join(); + queue.stop(); } TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingAndQueueIsEmpty) { EXPECT_CALL(taskMock, Call()); - EXPECT_TRUE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, false)); + EXPECT_TRUE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, /* isWhiteListed = */ false)); EXPECT_CALL(onTasksComplete, Call()).WillOnce([&]() { EXPECT_EQ(queue.size(), 0u); }); - queue.stop(onTasksComplete.AsStdFunction()); - queue.join(); + queue.requestStop(onTasksComplete.AsStdFunction()); + queue.stop(); } + TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingOnLastTask) { std::binary_semaphore semaphore{0}; @@ -138,19 +197,23 @@ TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingOnLastTask) taskMock.Call(); semaphore.acquire(); }, - false + /* isWhiteListed = */ false )); EXPECT_CALL(onTasksComplete, Call()).WillOnce([&]() { EXPECT_EQ(queue.size(), 0u); }); - queue.stop(onTasksComplete.AsStdFunction()); + queue.requestStop(onTasksComplete.AsStdFunction()); semaphore.release(); - queue.join(); + queue.stop(); } -struct WorkQueueMockPrometheusTest : WithMockPrometheus, RPCWorkQueueTestBase {}; +struct WorkQueueMockPrometheusTest : WithMockPrometheus, RPCWorkQueueTestBase { + WorkQueueMockPrometheusTest() : RPCWorkQueueTestBase(/* workers = */ 1, /*maxQueueSize = */ 2) + { + } +}; -TEST_F(WorkQueueMockPrometheusTest, postCoroCouhters) +TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) { auto& queuedMock = makeMock("work_queue_queued_total_number", ""); auto& durationMock = makeMock("work_queue_cumulative_tasks_duration_us", ""); @@ -158,16 +221,17 @@ TEST_F(WorkQueueMockPrometheusTest, postCoroCouhters) std::binary_semaphore semaphore{0}; - EXPECT_CALL(curSizeMock, value()).Times(2).WillRepeatedly(::testing::Return(0)); + EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0)).WillRepeatedly(::testing::Return(1)); EXPECT_CALL(curSizeMock, add(1)); EXPECT_CALL(queuedMock, add(1)); - EXPECT_CALL(durationMock, add(::testing::Gt(0))).WillOnce([&](auto) { + EXPECT_CALL(durationMock, add(::testing::Ge(0))).WillOnce([&](auto) { EXPECT_CALL(curSizeMock, add(-1)); + EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0)); semaphore.release(); }); - auto const res = queue.postCoro([&](auto /* yield */) { semaphore.acquire(); }, false); + auto const res = queue.postCoro([&](auto /* yield */) { semaphore.acquire(); }, /* isWhiteListed = */ false); ASSERT_TRUE(res); - queue.join(); + queue.stop(); }