#include "rpc/WorkQueue.hpp" #include "util/MockAssert.hpp" #include "util/MockPrometheus.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigValue.hpp" #include "util/config/Types.hpp" #include "util/prometheus/Counter.hpp" #include "util/prometheus/Gauge.hpp" #include #include #include #include #include #include #include #include #include #include using namespace util; using namespace util::config; using namespace rpc; using namespace util::prometheus; struct RPCWorkQueueTestBase : public virtual ::testing::Test { ClioConfigDefinition cfg; WorkQueue queue; RPCWorkQueueTestBase(uint32_t workers, uint32_t maxQueueSize) : cfg{ {"server.max_queue_size", ConfigValue{ConfigType::Integer}.defaultValue(maxQueueSize)}, {"workers", ConfigValue{ConfigType::Integer}.defaultValue(workers)}, } , queue{WorkQueue::makeWorkQueue(cfg)} { } }; struct WorkQueueTest : WithPrometheus, RPCWorkQueueTestBase { WorkQueueTest() : RPCWorkQueueTestBase(/* workers = */ 4, /* maxQueueSize = */ 2) { } }; TEST_F(WorkQueueTest, WhitelistedExecutionCountAddsUp) { static constexpr auto kTOTAL = 512u; std::atomic_uint32_t executeCount = 0u; for (auto i = 0u; i < kTOTAL; ++i) { queue.postCoro( [&executeCount](auto /* yield */) { ++executeCount; }, /* isWhiteListed = */ true ); } queue.stop(); auto const report = queue.report(); EXPECT_EQ(executeCount, kTOTAL); EXPECT_EQ(report.at("queued"), kTOTAL); EXPECT_EQ(report.at("current_queue_size"), 0); EXPECT_EQ(report.at("max_queue_size"), 2); } TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded) { static constexpr auto kTOTAL = 3u; auto unblocked = false; std::mutex mtx; std::condition_variable cv; for (auto i = 0u; i < kTOTAL; ++i) { auto res = queue.postCoro( [&](auto /* yield */) { std::unique_lock lk{mtx}; cv.wait(lk, [&] { return unblocked; }); }, /* isWhiteListed = */ false ); if (i == kTOTAL - 1) { EXPECT_FALSE(res); std::unique_lock const lk{mtx}; unblocked = true; cv.notify_all(); } else { EXPECT_TRUE(res); } } queue.stop(); EXPECT_TRUE(unblocked); } struct WorkQueueDelayedStartTest : WithPrometheus { WorkQueue queue{ WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100 }; }; TEST_F(WorkQueueDelayedStartTest, WaitTimeIncludesDelayBeforeStartProcessing) { std::atomic_bool taskExecuted = false; ASSERT_TRUE(queue.postCoro( [&taskExecuted](auto /* yield */) { taskExecuted = true; }, /* isWhiteListed = */ true )); std::this_thread::sleep_for(std::chrono::milliseconds(50)); queue.startProcessing(); queue.stop(); EXPECT_TRUE(taskExecuted); auto const report = queue.report(); auto const durationUs = report.at("queued_duration_us").as_uint64(); EXPECT_GE(durationUs, 50000u) << "Wait time should include the delay before startProcessing"; } struct WorkQueuePriorityTest : WithPrometheus { WorkQueue queue{ WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100 }; }; TEST_F(WorkQueuePriorityTest, HighPriorityTasks) { static constexpr auto kTOTAL = 10; std::vector executionOrder; std::mutex mtx; for (int i = 0; i < kTOTAL; ++i) { queue.postCoro( [&](auto) { std::lock_guard const lock(mtx); executionOrder.push_back(WorkQueue::Priority::High); }, /* isWhiteListed = */ true, WorkQueue::Priority::High ); queue.postCoro( [&](auto) { std::lock_guard const lock(mtx); executionOrder.push_back(WorkQueue::Priority::Default); }, /* isWhiteListed = */ true, WorkQueue::Priority::Default ); } queue.startProcessing(); queue.stop(); // with 1 worker and the above, the execution order is deterministic // we should see 4 high prio tasks, then 1 normal prio task, until high prio tasks are depleted std::vector const expectedOrder = { WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::Default, WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::Default, WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::Default, WorkQueue::Priority::Default, WorkQueue::Priority::Default, WorkQueue::Priority::Default, WorkQueue::Priority::Default, WorkQueue::Priority::Default, WorkQueue::Priority::Default, WorkQueue::Priority::Default, }; ASSERT_EQ(executionOrder.size(), expectedOrder.size()); for (auto i = 0uz; i < executionOrder.size(); ++i) { EXPECT_EQ(executionOrder[i], expectedOrder[i]) << "Mismatch at index " << i; } } struct WorkQueueStopTest : WorkQueueTest { testing::StrictMock> onTasksComplete; testing::StrictMock> taskMock; }; TEST_F(WorkQueueStopTest, RejectsNewTasksWhenStopping) { EXPECT_CALL(taskMock, Call()); EXPECT_TRUE( queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, /* isWhiteListed = */ false) ); queue.requestStop(); EXPECT_FALSE( queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, /* isWhiteListed = */ false) ); queue.stop(); } TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingAndQueueIsEmpty) { EXPECT_CALL(taskMock, Call()); EXPECT_TRUE( queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, /* isWhiteListed = */ false) ); EXPECT_CALL(onTasksComplete, Call()).WillOnce([&]() { EXPECT_EQ(queue.size(), 0u); }); queue.requestStop(onTasksComplete.AsStdFunction()); queue.stop(); } TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingOnLastTask) { std::binary_semaphore semaphore{0}; EXPECT_CALL(taskMock, Call()); EXPECT_TRUE(queue.postCoro( [&](auto /* yield */) { taskMock.Call(); semaphore.acquire(); }, /* isWhiteListed = */ false )); EXPECT_CALL(onTasksComplete, Call()).WillOnce([&]() { EXPECT_EQ(queue.size(), 0u); }); queue.requestStop(onTasksComplete.AsStdFunction()); semaphore.release(); queue.stop(); } struct WorkQueueMockPrometheusTest : WithMockPrometheus {}; TEST_F(WorkQueueMockPrometheusTest, postCoroCounters) { auto& queuedMock = makeMock("work_queue_queued_total_number", ""); auto& durationMock = makeMock("work_queue_cumulative_tasks_duration_us", ""); auto& curSizeMock = makeMock("work_queue_current_size", ""); std::binary_semaphore semaphore{0}; EXPECT_CALL(curSizeMock, value()) .WillOnce(::testing::Return(0)) // in startProcessing .WillOnce(::testing::Return(0)); // first check in postCoro EXPECT_CALL(curSizeMock, add(1)); EXPECT_CALL(queuedMock, add(1)); EXPECT_CALL(durationMock, add(::testing::Ge(0))).WillOnce([&](auto) { EXPECT_CALL(curSizeMock, add(-1)); semaphore.release(); }); // Note: the queue is not in the fixture because above expectations must be setup before // startProcessing runs WorkQueue queue(/* numWorkers = */ 4, /* maxSize = */ 2); auto const res = queue.postCoro([&](auto /* yield */) { semaphore.acquire(); }, /* isWhiteListed = */ false); ASSERT_TRUE(res); queue.stop(); } // Note: not using EXPECT_CLIO_ASSERT_FAIL because exception is swallowed by the WQ context // TODO [https://github.com/XRPLF/clio/issues/2906]: Enable the test once we figure out a better way // to do it without using up >2 minutes of CI time struct WorkQueueDeathTest : WorkQueueMockPrometheusTest, common::util::WithMockAssert {}; TEST_F(WorkQueueDeathTest, DISABLED_ExecuteTaskAssertsWhenQueueIsEmpty) { [[maybe_unused]] auto& queuedMock = makeMock("work_queue_queued_total_number", ""); [[maybe_unused]] auto& durationMock = makeMock("work_queue_cumulative_tasks_duration_us", ""); auto& curSizeMock = makeMock("work_queue_current_size", ""); EXPECT_CALL(curSizeMock, value()).WillRepeatedly(::testing::Return(1)); // lie about the size EXPECT_DEATH( { WorkQueue queue( WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 2 ); queue.startProcessing(); // the actual queue is empty which will lead to assertion // failure }, ".*" ); }