mirror of
https://github.com/XRPLF/clio.git
synced 2026-04-29 15:37:53 +00:00
feat: WorkQueue priorities (#2721)
Co-authored-by: Sergey Kuznetsov <skuznetsov@ripple.com> Co-authored-by: Ayaz Salikhov <mathbunnyru@users.noreply.github.com>
This commit is contained in:
@@ -102,7 +102,7 @@ Counters::getMethodInfo(std::string const& method)
|
||||
return it->second;
|
||||
}
|
||||
|
||||
Counters::Counters(WorkQueue const& wq)
|
||||
Counters::Counters(Reportable const& wq)
|
||||
: tooBusyCounter_(
|
||||
PrometheusService::counterInt(
|
||||
"rpc_error_total_number",
|
||||
|
||||
@@ -66,7 +66,7 @@ class Counters {
|
||||
CounterType unknownCommandCounter_;
|
||||
CounterType internalErrorCounter_;
|
||||
|
||||
std::reference_wrapper<WorkQueue const> workQueue_;
|
||||
std::reference_wrapper<Reportable const> workQueue_;
|
||||
std::chrono::time_point<std::chrono::system_clock> startupTime_;
|
||||
|
||||
public:
|
||||
@@ -75,7 +75,7 @@ public:
|
||||
*
|
||||
* @param wq The work queue to operate on
|
||||
*/
|
||||
Counters(WorkQueue const& wq);
|
||||
Counters(Reportable const& wq);
|
||||
|
||||
/**
|
||||
* @brief A factory function that creates a new counters instance.
|
||||
|
||||
@@ -19,40 +19,28 @@
|
||||
|
||||
#include "rpc/WorkQueue.hpp"
|
||||
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/Spawn.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/prometheus/Label.hpp"
|
||||
#include "util/prometheus/Prometheus.hpp"
|
||||
|
||||
#include <boost/asio/dispatch.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/json/object.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace rpc {
|
||||
|
||||
void
|
||||
WorkQueue::OneTimeCallable::setCallable(std::function<void()> func)
|
||||
{
|
||||
func_ = func;
|
||||
}
|
||||
|
||||
void
|
||||
WorkQueue::OneTimeCallable::operator()()
|
||||
{
|
||||
if (not called_) {
|
||||
func_();
|
||||
called_ = true;
|
||||
}
|
||||
}
|
||||
WorkQueue::OneTimeCallable::
|
||||
operator bool() const
|
||||
{
|
||||
return func_.operator bool();
|
||||
}
|
||||
|
||||
WorkQueue::WorkQueue(std::uint32_t numWorkers, uint32_t maxSize)
|
||||
WorkQueue::WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize)
|
||||
: queued_{PrometheusService::counterInt(
|
||||
"work_queue_queued_total_number",
|
||||
util::prometheus::Labels(),
|
||||
@@ -69,25 +57,156 @@ WorkQueue::WorkQueue(std::uint32_t numWorkers, uint32_t maxSize)
|
||||
"The current number of tasks in the queue"
|
||||
)}
|
||||
, ioc_{numWorkers}
|
||||
, strand_{ioc_.get_executor()}
|
||||
, waitTimer_(ioc_)
|
||||
{
|
||||
if (maxSize != 0)
|
||||
maxSize_ = maxSize;
|
||||
}
|
||||
|
||||
WorkQueue::WorkQueue(std::uint32_t numWorkers, uint32_t maxSize)
|
||||
: WorkQueue(kDONT_START_PROCESSING_TAG, numWorkers, maxSize)
|
||||
{
|
||||
startProcessing();
|
||||
}
|
||||
|
||||
WorkQueue::~WorkQueue()
|
||||
{
|
||||
join();
|
||||
stop();
|
||||
}
|
||||
|
||||
void
|
||||
WorkQueue::stop(std::function<void()> onQueueEmpty)
|
||||
WorkQueue::startProcessing()
|
||||
{
|
||||
util::spawn(strand_, [this](auto yield) {
|
||||
ASSERT(not hasDispatcher_, "Dispatcher already running");
|
||||
|
||||
hasDispatcher_ = true;
|
||||
dispatcherLoop(yield);
|
||||
});
|
||||
}
|
||||
|
||||
bool
|
||||
WorkQueue::postCoro(TaskType func, bool isWhiteListed, Priority priority)
|
||||
{
|
||||
if (stopping_) {
|
||||
LOG(log_.warn()) << "Queue is stopping, rejecting incoming task.";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (size() >= maxSize_ && !isWhiteListed) {
|
||||
LOG(log_.warn()) << "Queue is full. rejecting job. current size = " << size() << "; max size = " << maxSize_;
|
||||
return false;
|
||||
}
|
||||
|
||||
++curSize_.get();
|
||||
auto needsWakeup = false;
|
||||
|
||||
{
|
||||
auto state = dispatcherState_.lock();
|
||||
|
||||
needsWakeup = std::exchange(state->isIdle, false);
|
||||
|
||||
state->push(priority, std::move(func));
|
||||
}
|
||||
|
||||
if (needsWakeup)
|
||||
boost::asio::post(strand_, [this] { waitTimer_.cancel(); });
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
WorkQueue::dispatcherLoop(boost::asio::yield_context yield)
|
||||
{
|
||||
LOG(log_.info()) << "WorkQueue dispatcher starting";
|
||||
|
||||
// all ongoing tasks must be completed before stopping fully
|
||||
while (not stopping_ or size() > 0) {
|
||||
std::vector<TaskType> batch;
|
||||
|
||||
{
|
||||
auto state = dispatcherState_.lock();
|
||||
|
||||
if (state->empty()) {
|
||||
state->isIdle = true;
|
||||
} else {
|
||||
for (auto count = 0uz; count < kTAKE_HIGH_PRIO and not state->high.empty(); ++count) {
|
||||
batch.push_back(std::move(state->high.front()));
|
||||
state->high.pop();
|
||||
}
|
||||
|
||||
if (not state->normal.empty()) {
|
||||
batch.push_back(std::move(state->normal.front()));
|
||||
state->normal.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (not stopping_ and batch.empty()) {
|
||||
waitTimer_.expires_at(std::chrono::steady_clock::time_point::max());
|
||||
boost::system::error_code ec;
|
||||
waitTimer_.async_wait(yield[ec]);
|
||||
} else {
|
||||
for (auto task : std::move(batch)) {
|
||||
util::spawn(
|
||||
ioc_,
|
||||
[this, spawnedAt = std::chrono::system_clock::now(), task = std::move(task)](auto yield) mutable {
|
||||
auto const takenAt = std::chrono::system_clock::now();
|
||||
auto const waited =
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(takenAt - spawnedAt).count();
|
||||
|
||||
++queued_.get();
|
||||
durationUs_.get() += waited;
|
||||
LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size();
|
||||
|
||||
task(yield);
|
||||
|
||||
--curSize_.get();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
boost::asio::post(ioc_.get_executor(), yield); // yield back to avoid hijacking the thread
|
||||
}
|
||||
}
|
||||
|
||||
LOG(log_.info()) << "WorkQueue dispatcher shutdown requested - time to execute onTasksComplete";
|
||||
|
||||
{
|
||||
auto onTasksComplete = onQueueEmpty_.lock();
|
||||
ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true.");
|
||||
onTasksComplete->operator()();
|
||||
}
|
||||
|
||||
LOG(log_.info()) << "WorkQueue dispatcher finished";
|
||||
}
|
||||
|
||||
void
|
||||
WorkQueue::requestStop(std::function<void()> onQueueEmpty)
|
||||
{
|
||||
auto handler = onQueueEmpty_.lock();
|
||||
handler->setCallable(std::move(onQueueEmpty));
|
||||
*handler = std::move(onQueueEmpty);
|
||||
|
||||
stopping_ = true;
|
||||
if (size() == 0) {
|
||||
handler->operator()();
|
||||
auto needsWakeup = false;
|
||||
|
||||
{
|
||||
auto state = dispatcherState_.lock();
|
||||
needsWakeup = std::exchange(state->isIdle, false);
|
||||
}
|
||||
|
||||
if (needsWakeup)
|
||||
boost::asio::post(strand_, [this] { waitTimer_.cancel(); });
|
||||
}
|
||||
|
||||
void
|
||||
WorkQueue::stop()
|
||||
{
|
||||
if (not stopping_.exchange(true))
|
||||
requestStop();
|
||||
|
||||
ioc_.join();
|
||||
}
|
||||
|
||||
WorkQueue
|
||||
@@ -115,12 +234,6 @@ WorkQueue::report() const
|
||||
return obj;
|
||||
}
|
||||
|
||||
void
|
||||
WorkQueue::join()
|
||||
{
|
||||
ioc_.join();
|
||||
}
|
||||
|
||||
size_t
|
||||
WorkQueue::size() const
|
||||
{
|
||||
|
||||
@@ -19,9 +19,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/Mutex.hpp"
|
||||
#include "util/Spawn.hpp"
|
||||
#include "util/config/ConfigDefinition.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/prometheus/Counter.hpp"
|
||||
@@ -29,23 +27,80 @@
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/asio/thread_pool.hpp>
|
||||
#include <boost/json.hpp>
|
||||
#include <boost/json/object.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <queue>
|
||||
|
||||
namespace rpc {
|
||||
|
||||
/**
|
||||
* @brief An interface for any class providing a report as json object
|
||||
*/
|
||||
struct Reportable {
|
||||
virtual ~Reportable() = default;
|
||||
|
||||
/**
|
||||
* @brief Generate a report of the work queue state.
|
||||
*
|
||||
* @return The report as a JSON object.
|
||||
*/
|
||||
[[nodiscard]] virtual boost::json::object
|
||||
report() const = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief An asynchronous, thread-safe queue for RPC requests.
|
||||
*/
|
||||
class WorkQueue {
|
||||
class WorkQueue : public Reportable {
|
||||
using TaskType = std::function<void(boost::asio::yield_context)>;
|
||||
using QueueType = std::queue<TaskType>;
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Represents a task scheduling priority
|
||||
*/
|
||||
enum class Priority : uint8_t {
|
||||
High,
|
||||
Default,
|
||||
};
|
||||
|
||||
private:
|
||||
struct DispatcherState {
|
||||
QueueType high;
|
||||
QueueType normal;
|
||||
|
||||
bool isIdle = false;
|
||||
|
||||
void
|
||||
push(Priority priority, auto&& task)
|
||||
{
|
||||
auto& queue = [this, priority] -> QueueType& {
|
||||
if (priority == Priority::High)
|
||||
return high;
|
||||
return normal;
|
||||
}();
|
||||
queue.push(std::forward<decltype(task)>(task));
|
||||
}
|
||||
|
||||
[[nodiscard]] bool
|
||||
empty() const
|
||||
{
|
||||
return high.empty() and normal.empty();
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
static constexpr auto kTAKE_HIGH_PRIO = 4uz;
|
||||
|
||||
// these are cumulative for the lifetime of the process
|
||||
std::reference_wrapper<util::prometheus::CounterInt> queued_;
|
||||
std::reference_wrapper<util::prometheus::CounterInt> durationUs_;
|
||||
@@ -55,33 +110,46 @@ class WorkQueue {
|
||||
|
||||
util::Logger log_{"RPC"};
|
||||
boost::asio::thread_pool ioc_;
|
||||
boost::asio::strand<boost::asio::thread_pool::executor_type> strand_;
|
||||
bool hasDispatcher_ = false;
|
||||
|
||||
std::atomic_bool stopping_;
|
||||
|
||||
class OneTimeCallable {
|
||||
std::function<void()> func_;
|
||||
bool called_{false};
|
||||
|
||||
public:
|
||||
void
|
||||
setCallable(std::function<void()> func);
|
||||
|
||||
void
|
||||
operator()();
|
||||
|
||||
operator bool() const;
|
||||
};
|
||||
util::Mutex<OneTimeCallable> onQueueEmpty_;
|
||||
util::Mutex<std::function<void()>> onQueueEmpty_;
|
||||
util::Mutex<DispatcherState> dispatcherState_;
|
||||
boost::asio::steady_timer waitTimer_;
|
||||
|
||||
public:
|
||||
struct DontStartProcessingTag {};
|
||||
static constexpr DontStartProcessingTag kDONT_START_PROCESSING_TAG = {};
|
||||
|
||||
/**
|
||||
* @brief Create an we instance of the work queue.
|
||||
* @brief Create an instance of the work queue.
|
||||
*
|
||||
* The work queue immediately starts to process tasks as they come.
|
||||
*
|
||||
* @param numWorkers The amount of threads to spawn in the pool
|
||||
* @param maxSize The maximum capacity of the queue; 0 means unlimited
|
||||
*/
|
||||
WorkQueue(std::uint32_t numWorkers, uint32_t maxSize = 0);
|
||||
~WorkQueue();
|
||||
|
||||
/**
|
||||
* @brief Create an instance of the work queue without starting the processing of events.
|
||||
*
|
||||
* Clients are expected to call `startProcessing` manually once ready to start processing tasks.
|
||||
*
|
||||
* @param numWorkers The amount of threads to spawn in the pool
|
||||
* @param maxSize The maximum capacity of the queue; 0 means unlimited
|
||||
*/
|
||||
WorkQueue(DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize = 0);
|
||||
|
||||
~WorkQueue() override;
|
||||
|
||||
/**
|
||||
* @brief Start processing of the enqueued tasks.
|
||||
*/
|
||||
void
|
||||
startProcessing();
|
||||
|
||||
/**
|
||||
* @brief Put the work queue into a stopping state. This will prevent new jobs from being queued.
|
||||
@@ -89,7 +157,13 @@ public:
|
||||
* @param onQueueEmpty A callback to run when the last task in the queue is completed
|
||||
*/
|
||||
void
|
||||
stop(std::function<void()> onQueueEmpty);
|
||||
requestStop(std::function<void()> onQueueEmpty = [] {});
|
||||
|
||||
/**
|
||||
* @brief Put the work queue into a stopping state and await workers to finish.
|
||||
*/
|
||||
void
|
||||
stop();
|
||||
|
||||
/**
|
||||
* @brief A factory function that creates the work queue based on a config.
|
||||
@@ -97,7 +171,7 @@ public:
|
||||
* @param config The Clio config to use
|
||||
* @return The work queue
|
||||
*/
|
||||
static WorkQueue
|
||||
[[nodiscard]] static WorkQueue
|
||||
makeWorkQueue(util::config::ClioConfigDefinition const& config);
|
||||
|
||||
/**
|
||||
@@ -105,60 +179,21 @@ public:
|
||||
*
|
||||
* The job will be rejected if isWhiteListed is set to false and the current size of the queue reached capacity.
|
||||
*
|
||||
* @tparam FnType The function object type
|
||||
* @param func The function object to queue as a job
|
||||
* @param isWhiteListed Whether the queue capacity applies to this job
|
||||
* @param priority The priority of the task
|
||||
* @return true if the job was successfully queued; false otherwise
|
||||
*/
|
||||
template <typename FnType>
|
||||
bool
|
||||
postCoro(FnType&& func, bool isWhiteListed)
|
||||
{
|
||||
if (stopping_) {
|
||||
LOG(log_.warn()) << "Queue is stopping, rejecting incoming task.";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (curSize_.get().value() >= maxSize_ && !isWhiteListed) {
|
||||
LOG(log_.warn()) << "Queue is full. rejecting job. current size = " << curSize_.get().value()
|
||||
<< "; max size = " << maxSize_;
|
||||
return false;
|
||||
}
|
||||
|
||||
++curSize_.get();
|
||||
|
||||
// Each time we enqueue a job, we want to post a symmetrical job that will dequeue and run the job at the front
|
||||
// of the job queue.
|
||||
util::spawn(
|
||||
ioc_,
|
||||
[this, func = std::forward<FnType>(func), start = std::chrono::system_clock::now()](auto yield) mutable {
|
||||
auto const run = std::chrono::system_clock::now();
|
||||
auto const wait = std::chrono::duration_cast<std::chrono::microseconds>(run - start).count();
|
||||
|
||||
++queued_.get();
|
||||
durationUs_.get() += wait;
|
||||
LOG(log_.info()) << "WorkQueue wait time = " << wait << " queue size = " << curSize_.get().value();
|
||||
|
||||
func(yield);
|
||||
--curSize_.get();
|
||||
if (curSize_.get().value() == 0 && stopping_) {
|
||||
auto onTasksComplete = onQueueEmpty_.lock();
|
||||
ASSERT(onTasksComplete->operator bool(), "onTasksComplete must be set when stopping is true.");
|
||||
onTasksComplete->operator()();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
postCoro(TaskType func, bool isWhiteListed, Priority priority = Priority::Default);
|
||||
|
||||
/**
|
||||
* @brief Generate a report of the work queue state.
|
||||
*
|
||||
* @return The report as a JSON object.
|
||||
*/
|
||||
boost::json::object
|
||||
report() const;
|
||||
[[nodiscard]] boost::json::object
|
||||
report() const override;
|
||||
|
||||
/**
|
||||
* @brief Wait until all the jobs in the queue are finished.
|
||||
@@ -171,8 +206,12 @@ public:
|
||||
*
|
||||
* @return The number of jobs in the queue.
|
||||
*/
|
||||
size_t
|
||||
[[nodiscard]] size_t
|
||||
size() const;
|
||||
|
||||
private:
|
||||
void
|
||||
dispatcherLoop(boost::asio::yield_context yield);
|
||||
};
|
||||
|
||||
} // namespace rpc
|
||||
|
||||
@@ -52,6 +52,8 @@ target_link_libraries(
|
||||
clio_util
|
||||
PUBLIC Boost::headers
|
||||
Boost::iostreams
|
||||
Boost::coroutine
|
||||
Boost::context
|
||||
fmt::fmt
|
||||
openssl::openssl
|
||||
xrpl::libxrpl
|
||||
|
||||
@@ -185,6 +185,7 @@ struct MockPrometheusImpl : PrometheusInterface {
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, std::unique_ptr<MetricBase>> metrics;
|
||||
|
||||
std::unordered_map<std::string, ::testing::StrictMock<MockCounterImplInt>> counterIntImpls;
|
||||
std::unordered_map<std::string, ::testing::StrictMock<MockCounterImplUint>> counterUintImpls;
|
||||
std::unordered_map<std::string, ::testing::StrictMock<MockCounterImplDouble>> counterDoubleImpls;
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include "util/MockPrometheus.hpp"
|
||||
#include "util/prometheus/Counter.hpp"
|
||||
|
||||
#include <boost/json/object.hpp>
|
||||
#include <boost/json/value_to.hpp>
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
@@ -37,9 +38,15 @@ using util::prometheus::CounterInt;
|
||||
using util::prometheus::WithMockPrometheus;
|
||||
using util::prometheus::WithPrometheus;
|
||||
|
||||
struct RPCCountersTest : WithPrometheus {
|
||||
WorkQueue queue{4u, 1024u}; // todo: mock instead
|
||||
Counters counters{queue};
|
||||
struct RPCCountersBaseTest {
|
||||
struct WorkQueueMock : Reportable {
|
||||
MOCK_METHOD(boost::json::object, report, (), (const, override));
|
||||
};
|
||||
testing::StrictMock<WorkQueueMock> workQueueMock;
|
||||
};
|
||||
|
||||
struct RPCCountersTest : WithPrometheus, RPCCountersBaseTest {
|
||||
Counters counters{workQueueMock};
|
||||
};
|
||||
|
||||
TEST_F(RPCCountersTest, CheckThatCountersAddUp)
|
||||
@@ -57,6 +64,7 @@ TEST_F(RPCCountersTest, CheckThatCountersAddUp)
|
||||
counters.onInternalError();
|
||||
}
|
||||
|
||||
EXPECT_CALL(workQueueMock, report);
|
||||
auto const report = counters.report();
|
||||
auto const& rpc = report.at(JS(rpc)).as_object();
|
||||
|
||||
@@ -103,13 +111,10 @@ TEST_F(RPCCountersTest, CheckThatCountersAddUp)
|
||||
EXPECT_EQ(boost::json::value_to<std::string>(report.at("bad_syntax_errors")), "512");
|
||||
EXPECT_EQ(boost::json::value_to<std::string>(report.at("unknown_command_errors")), "512");
|
||||
EXPECT_EQ(boost::json::value_to<std::string>(report.at("internal_errors")), "512");
|
||||
|
||||
EXPECT_EQ(report.at("work_queue"), queue.report()); // Counters report includes queue report
|
||||
}
|
||||
|
||||
struct RPCCountersMockPrometheusTests : WithMockPrometheus {
|
||||
WorkQueue queue{4u, 1024u}; // todo: mock instead
|
||||
Counters counters{queue};
|
||||
struct RPCCountersMockPrometheusTests : WithMockPrometheus, RPCCountersBaseTest {
|
||||
Counters counters{workQueueMock};
|
||||
};
|
||||
|
||||
TEST_F(RPCCountersMockPrometheusTests, rpcFailed)
|
||||
|
||||
@@ -30,8 +30,10 @@
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <cstdint>
|
||||
#include <mutex>
|
||||
#include <semaphore>
|
||||
#include <vector>
|
||||
|
||||
using namespace util;
|
||||
using namespace util::config;
|
||||
@@ -39,15 +41,24 @@ using namespace rpc;
|
||||
using namespace util::prometheus;
|
||||
|
||||
struct RPCWorkQueueTestBase : public virtual ::testing::Test {
|
||||
ClioConfigDefinition cfg = {
|
||||
{"server.max_queue_size", ConfigValue{ConfigType::Integer}.defaultValue(2)},
|
||||
{"workers", ConfigValue{ConfigType::Integer}.defaultValue(4)}
|
||||
};
|
||||
ClioConfigDefinition cfg;
|
||||
WorkQueue queue;
|
||||
|
||||
WorkQueue queue = WorkQueue::makeWorkQueue(cfg);
|
||||
RPCWorkQueueTestBase(uint32_t workers, uint32_t maxQueueSize)
|
||||
: cfg{
|
||||
{"server.max_queue_size", ConfigValue{ConfigType::Integer}.defaultValue(maxQueueSize)},
|
||||
{"workers", ConfigValue{ConfigType::Integer}.defaultValue(workers)},
|
||||
}
|
||||
, queue{WorkQueue::makeWorkQueue(cfg)}
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
struct WorkQueueTest : WithPrometheus, RPCWorkQueueTestBase {};
|
||||
struct WorkQueueTest : WithPrometheus, RPCWorkQueueTestBase {
|
||||
WorkQueueTest() : RPCWorkQueueTestBase(/* workers = */ 4, /* maxQueueSize = */ 2)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(WorkQueueTest, WhitelistedExecutionCountAddsUp)
|
||||
{
|
||||
@@ -55,10 +66,10 @@ TEST_F(WorkQueueTest, WhitelistedExecutionCountAddsUp)
|
||||
std::atomic_uint32_t executeCount = 0u;
|
||||
|
||||
for (auto i = 0u; i < kTOTAL; ++i) {
|
||||
queue.postCoro([&executeCount](auto /* yield */) { ++executeCount; }, true);
|
||||
queue.postCoro([&executeCount](auto /* yield */) { ++executeCount; }, /* isWhiteListed = */ true);
|
||||
}
|
||||
|
||||
queue.join();
|
||||
queue.stop();
|
||||
|
||||
auto const report = queue.report();
|
||||
|
||||
@@ -71,7 +82,6 @@ TEST_F(WorkQueueTest, WhitelistedExecutionCountAddsUp)
|
||||
TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded)
|
||||
{
|
||||
static constexpr auto kTOTAL = 3u;
|
||||
auto expectedCount = 2u;
|
||||
auto unblocked = false;
|
||||
|
||||
std::mutex mtx;
|
||||
@@ -82,10 +92,8 @@ TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded)
|
||||
[&](auto /* yield */) {
|
||||
std::unique_lock lk{mtx};
|
||||
cv.wait(lk, [&] { return unblocked; });
|
||||
|
||||
--expectedCount;
|
||||
},
|
||||
false
|
||||
/* isWhiteListed = */ false
|
||||
);
|
||||
|
||||
if (i == kTOTAL - 1) {
|
||||
@@ -99,10 +107,60 @@ TEST_F(WorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded)
|
||||
}
|
||||
}
|
||||
|
||||
queue.join();
|
||||
queue.stop();
|
||||
EXPECT_TRUE(unblocked);
|
||||
}
|
||||
|
||||
struct WorkQueuePriorityTest : WithPrometheus, virtual ::testing::Test {
|
||||
WorkQueue queue{WorkQueue::kDONT_START_PROCESSING_TAG, /* numWorkers = */ 1, /* maxSize = */ 100};
|
||||
};
|
||||
|
||||
TEST_F(WorkQueuePriorityTest, HighPriorityTasks)
|
||||
{
|
||||
static constexpr auto kTOTAL = 10;
|
||||
std::vector<WorkQueue::Priority> executionOrder;
|
||||
std::mutex mtx;
|
||||
|
||||
for (int i = 0; i < kTOTAL; ++i) {
|
||||
queue.postCoro(
|
||||
[&](auto) {
|
||||
std::lock_guard const lock(mtx);
|
||||
executionOrder.push_back(WorkQueue::Priority::High);
|
||||
},
|
||||
/* isWhiteListed = */ true,
|
||||
WorkQueue::Priority::High
|
||||
);
|
||||
queue.postCoro(
|
||||
[&](auto) {
|
||||
std::lock_guard const lock(mtx);
|
||||
executionOrder.push_back(WorkQueue::Priority::Default);
|
||||
},
|
||||
/* isWhiteListed = */ true,
|
||||
WorkQueue::Priority::Default
|
||||
);
|
||||
}
|
||||
|
||||
queue.startProcessing();
|
||||
queue.stop();
|
||||
|
||||
// with 1 worker and the above, the execution order is deterministic
|
||||
// we should see 4 high prio tasks, then 1 normal prio task, until high prio tasks are depleted
|
||||
std::vector<WorkQueue::Priority> const expectedOrder = {
|
||||
WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::High,
|
||||
WorkQueue::Priority::High, WorkQueue::Priority::Default, WorkQueue::Priority::High,
|
||||
WorkQueue::Priority::High, WorkQueue::Priority::High, WorkQueue::Priority::High,
|
||||
WorkQueue::Priority::Default, WorkQueue::Priority::High, WorkQueue::Priority::High,
|
||||
WorkQueue::Priority::Default, WorkQueue::Priority::Default, WorkQueue::Priority::Default,
|
||||
WorkQueue::Priority::Default, WorkQueue::Priority::Default, WorkQueue::Priority::Default,
|
||||
WorkQueue::Priority::Default, WorkQueue::Priority::Default,
|
||||
};
|
||||
|
||||
ASSERT_EQ(executionOrder.size(), expectedOrder.size());
|
||||
for (auto i = 0uz; i < executionOrder.size(); ++i) {
|
||||
EXPECT_EQ(executionOrder[i], expectedOrder[i]) << "Mismatch at index " << i;
|
||||
}
|
||||
}
|
||||
|
||||
struct WorkQueueStopTest : WorkQueueTest {
|
||||
testing::StrictMock<testing::MockFunction<void()>> onTasksComplete;
|
||||
testing::StrictMock<testing::MockFunction<void()>> taskMock;
|
||||
@@ -111,23 +169,24 @@ struct WorkQueueStopTest : WorkQueueTest {
|
||||
TEST_F(WorkQueueStopTest, RejectsNewTasksWhenStopping)
|
||||
{
|
||||
EXPECT_CALL(taskMock, Call());
|
||||
EXPECT_TRUE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, false));
|
||||
EXPECT_TRUE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, /* isWhiteListed = */ false));
|
||||
|
||||
queue.stop([]() {});
|
||||
EXPECT_FALSE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, false));
|
||||
queue.requestStop();
|
||||
EXPECT_FALSE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, /* isWhiteListed = */ false));
|
||||
|
||||
queue.join();
|
||||
queue.stop();
|
||||
}
|
||||
|
||||
TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingAndQueueIsEmpty)
|
||||
{
|
||||
EXPECT_CALL(taskMock, Call());
|
||||
EXPECT_TRUE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, false));
|
||||
EXPECT_TRUE(queue.postCoro([this](auto /* yield */) { taskMock.Call(); }, /* isWhiteListed = */ false));
|
||||
|
||||
EXPECT_CALL(onTasksComplete, Call()).WillOnce([&]() { EXPECT_EQ(queue.size(), 0u); });
|
||||
queue.stop(onTasksComplete.AsStdFunction());
|
||||
queue.join();
|
||||
queue.requestStop(onTasksComplete.AsStdFunction());
|
||||
queue.stop();
|
||||
}
|
||||
|
||||
TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingOnLastTask)
|
||||
{
|
||||
std::binary_semaphore semaphore{0};
|
||||
@@ -138,19 +197,23 @@ TEST_F(WorkQueueStopTest, CallsOnTasksCompleteWhenStoppingOnLastTask)
|
||||
taskMock.Call();
|
||||
semaphore.acquire();
|
||||
},
|
||||
false
|
||||
/* isWhiteListed = */ false
|
||||
));
|
||||
|
||||
EXPECT_CALL(onTasksComplete, Call()).WillOnce([&]() { EXPECT_EQ(queue.size(), 0u); });
|
||||
queue.stop(onTasksComplete.AsStdFunction());
|
||||
queue.requestStop(onTasksComplete.AsStdFunction());
|
||||
semaphore.release();
|
||||
|
||||
queue.join();
|
||||
queue.stop();
|
||||
}
|
||||
|
||||
struct WorkQueueMockPrometheusTest : WithMockPrometheus, RPCWorkQueueTestBase {};
|
||||
struct WorkQueueMockPrometheusTest : WithMockPrometheus, RPCWorkQueueTestBase {
|
||||
WorkQueueMockPrometheusTest() : RPCWorkQueueTestBase(/* workers = */ 1, /*maxQueueSize = */ 2)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(WorkQueueMockPrometheusTest, postCoroCouhters)
|
||||
TEST_F(WorkQueueMockPrometheusTest, postCoroCounters)
|
||||
{
|
||||
auto& queuedMock = makeMock<CounterInt>("work_queue_queued_total_number", "");
|
||||
auto& durationMock = makeMock<CounterInt>("work_queue_cumulative_tasks_duration_us", "");
|
||||
@@ -158,16 +221,17 @@ TEST_F(WorkQueueMockPrometheusTest, postCoroCouhters)
|
||||
|
||||
std::binary_semaphore semaphore{0};
|
||||
|
||||
EXPECT_CALL(curSizeMock, value()).Times(2).WillRepeatedly(::testing::Return(0));
|
||||
EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0)).WillRepeatedly(::testing::Return(1));
|
||||
EXPECT_CALL(curSizeMock, add(1));
|
||||
EXPECT_CALL(queuedMock, add(1));
|
||||
EXPECT_CALL(durationMock, add(::testing::Gt(0))).WillOnce([&](auto) {
|
||||
EXPECT_CALL(durationMock, add(::testing::Ge(0))).WillOnce([&](auto) {
|
||||
EXPECT_CALL(curSizeMock, add(-1));
|
||||
EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0));
|
||||
semaphore.release();
|
||||
});
|
||||
|
||||
auto const res = queue.postCoro([&](auto /* yield */) { semaphore.acquire(); }, false);
|
||||
auto const res = queue.postCoro([&](auto /* yield */) { semaphore.acquire(); }, /* isWhiteListed = */ false);
|
||||
|
||||
ASSERT_TRUE(res);
|
||||
queue.join();
|
||||
queue.stop();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user