mirror of
https://github.com/XRPLF/clio.git
synced 2026-04-29 15:37:53 +00:00
fix: Faster implementation of work queue (#2887)
This commit is contained in:
@@ -18,6 +18,7 @@
|
||||
//==============================================================================
|
||||
|
||||
#include "rpc/WorkQueue.hpp"
|
||||
#include "util/MockAssert.hpp"
|
||||
#include "util/MockPrometheus.hpp"
|
||||
#include "util/config/ConfigDefinition.hpp"
|
||||
#include "util/config/ConfigValue.hpp"
|
||||
@@ -29,10 +30,12 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <cstdint>
|
||||
#include <mutex>
|
||||
#include <semaphore>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
using namespace util;
|
||||
@@ -111,7 +114,32 @@ TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded)
|
||||
EXPECT_TRUE(unblocked);
|
||||
}
|
||||
|
||||
struct WorkQueuePriorityTest : WithPrometheus, virtual ::testing::Test {
|
||||
struct WorkQueueDelayedStartTest : WithPrometheus {
|
||||
WorkQueue queue{WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100};
|
||||
};
|
||||
|
||||
TEST_F(WorkQueueDelayedStartTest, WaitTimeIncludesDelayBeforeStartProcessing)
|
||||
{
|
||||
std::atomic_bool taskExecuted = false;
|
||||
|
||||
ASSERT_TRUE(queue.postCoro(
|
||||
[&taskExecuted](auto /* yield */) { taskExecuted = true; },
|
||||
/* isWhiteListed = */ true
|
||||
));
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
queue.startProcessing();
|
||||
queue.stop();
|
||||
|
||||
EXPECT_TRUE(taskExecuted);
|
||||
|
||||
auto const report = queue.report();
|
||||
auto const durationUs = report.at("queued_duration_us").as_uint64();
|
||||
|
||||
EXPECT_GE(durationUs, 50000u) << "Wait time should include the delay before startProcessing";
|
||||
}
|
||||
|
||||
struct WorkQueuePriorityTest : WithPrometheus {
|
||||
WorkQueue queue{WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100};
|
||||
};
|
||||
|
||||
@@ -207,11 +235,7 @@ TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingOnLastTask)
|
||||
queue.stop();
|
||||
}
|
||||
|
||||
struct WorkQueueMockPrometheusTest : WithMockPrometheus, RPCWorkQueueTestBase {
|
||||
WorkQueueMockPrometheusTest() : RPCWorkQueueTestBase(/* workers = */ 1, /*maxQueueSize = */ 2)
|
||||
{
|
||||
}
|
||||
};
|
||||
struct WorkQueueMockPrometheusTest : WithMockPrometheus {};
|
||||
|
||||
TEST_F(WorkQueueMockPrometheusTest, postCoroCounters)
|
||||
{
|
||||
@@ -221,17 +245,40 @@ TEST_F(WorkQueueMockPrometheusTest, postCoroCounters)
|
||||
|
||||
std::binary_semaphore semaphore{0};
|
||||
|
||||
EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0)).WillRepeatedly(::testing::Return(1));
|
||||
EXPECT_CALL(curSizeMock, value())
|
||||
.WillOnce(::testing::Return(0)) // in startProcessing
|
||||
.WillOnce(::testing::Return(0)); // first check in postCoro
|
||||
EXPECT_CALL(curSizeMock, add(1));
|
||||
EXPECT_CALL(queuedMock, add(1));
|
||||
EXPECT_CALL(durationMock, add(::testing::Ge(0))).WillOnce([&](auto) {
|
||||
EXPECT_CALL(curSizeMock, add(-1));
|
||||
EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0));
|
||||
semaphore.release();
|
||||
});
|
||||
|
||||
// Note: the queue is not in the fixture because above expectations must be setup before startProcessing runs
|
||||
WorkQueue queue(/* numWorkers = */ 4, /* maxSize = */ 2);
|
||||
auto const res = queue.postCoro([&](auto /* yield */) { semaphore.acquire(); }, /* isWhiteListed = */ false);
|
||||
|
||||
ASSERT_TRUE(res);
|
||||
queue.stop();
|
||||
}
|
||||
|
||||
// Note: not using EXPECT_CLIO_ASSERT_FAIL because exception is swallowed by the WQ context
|
||||
// TODO [https://github.com/XRPLF/clio/issues/2906]: Enable the test once we figure out a better way to do it without
|
||||
// using up >2 minutes of CI time
|
||||
struct WorkQueueDeathTest : WorkQueueMockPrometheusTest, common::util::WithMockAssert {};
|
||||
TEST_F(WorkQueueDeathTest, DISABLED_ExecuteTaskAssertsWhenQueueIsEmpty)
|
||||
{
|
||||
[[maybe_unused]] auto& queuedMock = makeMock<CounterInt>("work_queue_queued_total_number", "");
|
||||
[[maybe_unused]] auto& durationMock = makeMock<CounterInt>("work_queue_cumulative_tasks_duration_us", "");
|
||||
auto& curSizeMock = makeMock<GaugeInt>("work_queue_current_size", "");
|
||||
|
||||
EXPECT_CALL(curSizeMock, value()).WillRepeatedly(::testing::Return(1)); // lie about the size
|
||||
EXPECT_DEATH(
|
||||
{
|
||||
WorkQueue queue(WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 2);
|
||||
queue.startProcessing(); // the actual queue is empty which will lead to assertion failure
|
||||
},
|
||||
".*"
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user