Add prometheus support (#950)

Fixes #888
This commit is contained in:
Sergey Kuznetsov
2023-11-02 17:26:03 +00:00
committed by GitHub
parent 320ebaa5d2
commit a16b680a7a
50 changed files with 4322 additions and 178 deletions

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include <util/Fixtures.h>
#include <util/MockPrometheus.h>
#include <rpc/WorkQueue.h>
@@ -28,6 +29,7 @@
using namespace util;
using namespace rpc;
using namespace util::prometheus;
namespace {
constexpr auto JSONConfig = R"JSON({
@@ -36,36 +38,31 @@ constexpr auto JSONConfig = R"JSON({
})JSON";
} // namespace
class RPCWorkQueueTest : public NoLoggerFixture {
protected:
struct RPCWorkQueueTestBase : NoLoggerFixture {
Config cfg = Config{boost::json::parse(JSONConfig)};
WorkQueue queue = WorkQueue::make_WorkQueue(cfg);
};
struct RPCWorkQueueTest : WithPrometheus, RPCWorkQueueTestBase {};
TEST_F(RPCWorkQueueTest, WhitelistedExecutionCountAddsUp)
{
WorkQueue queue = WorkQueue::make_WorkQueue(cfg);
auto constexpr static TOTAL = 512u;
uint32_t executeCount = 0u;
std::binary_semaphore sem{0};
std::mutex mtx;
for (auto i = 0u; i < TOTAL; ++i) {
queue.postCoro(
[&executeCount, &sem, &mtx](auto /* yield */) {
[&executeCount, &mtx](auto /* yield */) {
std::lock_guard const lk(mtx);
if (++executeCount; executeCount == TOTAL)
sem.release(); // 1) note we are still in user function
++executeCount;
},
true
);
}
sem.acquire();
// 2) so we have to allow the size of queue to decrease by one asynchronously
std::this_thread::sleep_for(std::chrono::milliseconds{1});
queue.join();
auto const report = queue.report();
@@ -77,13 +74,10 @@ TEST_F(RPCWorkQueueTest, WhitelistedExecutionCountAddsUp)
TEST_F(RPCWorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded)
{
auto queue = WorkQueue::make_WorkQueue(cfg);
auto constexpr static TOTAL = 3u;
auto expectedCount = 2u;
auto unblocked = false;
std::binary_semaphore sem{0};
std::mutex mtx;
std::condition_variable cv;
@@ -93,8 +87,7 @@ TEST_F(RPCWorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded)
std::unique_lock lk{mtx};
cv.wait(lk, [&] { return unblocked; });
if (--expectedCount; expectedCount == 0)
sem.release();
--expectedCount;
},
false
);
@@ -110,6 +103,40 @@ TEST_F(RPCWorkQueueTest, NonWhitelistedPreventSchedulingAtQueueLimitExceeded)
}
}
sem.acquire();
queue.join();
EXPECT_TRUE(unblocked);
}
struct RPCWorkQueueMockPrometheusTest : WithMockPrometheus, RPCWorkQueueTestBase {};
TEST_F(RPCWorkQueueMockPrometheusTest, postCoroCouhters)
{
auto& queuedMock = makeMock<CounterInt>("work_queue_queued_total_number", "");
auto& durationMock = makeMock<CounterInt>("work_queue_cumulitive_tasks_duration_us", "");
auto& curSizeMock = makeMock<GaugeInt>("work_queue_current_size", "");
std::mutex mtx;
bool canContinue = false;
std::condition_variable cv;
EXPECT_CALL(curSizeMock, value()).WillOnce(::testing::Return(0));
EXPECT_CALL(curSizeMock, add(1));
EXPECT_CALL(queuedMock, add(1));
EXPECT_CALL(durationMock, add(::testing::Gt(0))).WillOnce([&](auto) {
EXPECT_CALL(curSizeMock, add(-1));
std::unique_lock const lk{mtx};
canContinue = true;
cv.notify_all();
});
auto const res = queue.postCoro(
[&](auto /* yield */) {
std::unique_lock lk{mtx};
cv.wait(lk, [&]() { return canContinue; });
},
false
);
ASSERT_TRUE(res);
queue.join();
}