fix: ETLng more cpu usage fixes (#2399)

This commit is contained in:
Alex Kremer
2025-08-06 16:50:48 +01:00
committed by GitHub
parent 6c9c88e3fc
commit 1a298dedd2
3 changed files with 30 additions and 9 deletions

View File

@@ -183,6 +183,7 @@ TaskManager::stop()
for (auto& loader : loaders_)
loader.abort();
queue_.stop();
wait();
}

View File

@@ -70,6 +70,13 @@ public:
~TaskManager() override;
TaskManager(TaskManager const&) = delete;
TaskManager(TaskManager&&) = delete;
TaskManager&
operator=(TaskManager const&) = delete;
TaskManager&
operator=(TaskManager&&) = delete;
void
run(std::size_t numExtractors) override;

View File

@@ -20,10 +20,10 @@
#pragma once
#include "etlng/Models.hpp"
#include "util/Assert.hpp"
#include "util/Mutex.hpp"
#include <boost/atomic/atomic.hpp>
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
@@ -48,9 +48,6 @@ struct ReverseOrderComparator {
* @note This may be a candidate for future improvements if performance proves to be poor (e.g. use a lock free queue)
*/
class TaskQueue {
std::size_t limit_;
std::uint32_t increment_;
struct Data {
std::uint32_t expectedSequence;
std::priority_queue<model::LedgerData, std::vector<model::LedgerData>, ReverseOrderComparator> forwardLoadQueue;
@@ -60,9 +57,12 @@ class TaskQueue {
}
};
std::size_t limit_;
std::uint32_t increment_;
util::Mutex<Data> data_;
std::condition_variable cv_;
boost::atomics::atomic_bool stopping_ = false;
std::atomic_bool stopping_ = false;
public:
struct Settings {
@@ -83,9 +83,7 @@ public:
~TaskQueue()
{
// unblock all waiters
stopping_ = true;
cv_.notify_all();
ASSERT(stopping_, "stop() must be called before destroying the TaskQueue");
}
/**
@@ -149,9 +147,24 @@ public:
void
awaitTask()
{
if (stopping_)
return;
auto lock = data_.lock<std::unique_lock>();
cv_.wait(lock, [&] { return stopping_ or not lock->forwardLoadQueue.empty(); });
}
/**
* @brief Notify the queue that it's no longer needed
* @note This must be called before the queue is destroyed
*/
void
stop()
{
// unblock all waiters
stopping_ = true;
cv_.notify_all();
}
};
} // namespace etlng::impl