Simplify the Job Queue:

This is a refactor aimed at cleaning up and simplifying the existing
job queue.

As of now, all jobs are cancelled at the same time and in the same
way, so this commit removes the per-job cancellation token. If the
need for such support is demonstrated, support can be re-added.

* Revise documentation for ClosureCounter and Workers.
* Simplify code, removing unnecessary function arguments and
  deduplicating expressions
* Restructure job handlers to no longer need to pass a job's
  handle to the job.
This commit is contained in:
John Freeman
2021-11-16 10:55:52 -06:00
committed by Nik Bougalis
parent df02eb125f
commit c2a08a1f26
31 changed files with 164 additions and 242 deletions

View File

@@ -36,10 +36,8 @@ Job::Job(
std::string const& name,
std::uint64_t index,
LoadMonitor& lm,
std::function<void(Job&)> const& job,
CancelCallback cancelCallback)
: m_cancelCallback(cancelCallback)
, mType(type)
std::function<void()> const& job)
: mType(type)
, mJobIndex(index)
, mJob(job)
, mName(name)
@@ -54,27 +52,12 @@ Job::getType() const
return mType;
}
Job::CancelCallback
Job::getCancelCallback() const
{
assert(m_cancelCallback);
return m_cancelCallback;
}
Job::clock_type::time_point const&
Job::queue_time() const
{
return m_queue_time;
}
bool
Job::shouldCancel() const
{
if (m_cancelCallback)
return m_cancelCallback();
return false;
}
void
Job::doJob()
{
@@ -82,19 +65,13 @@ Job::doJob()
m_loadEvent->start();
m_loadEvent->setName(mName);
mJob(*this);
mJob();
// Destroy the lambda, otherwise we won't include
// its duration in the time measurement
mJob = nullptr;
}
void
Job::rename(std::string const& newName)
{
mName = newName;
}
bool
Job::operator>(const Job& j) const
{

View File

@@ -35,7 +35,6 @@ JobQueue::JobQueue(
, m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
, m_processCount(0)
, m_workers(*this, &perfLog, "JobQueue", threadCount)
, m_cancelCallback(std::bind(&JobQueue::isStopping, this))
, perfLog_(perfLog)
, m_collector(collector)
{
@@ -100,9 +99,27 @@ JobQueue::addRefCountedJob(
{
std::lock_guard lock(m_mutex);
auto result = m_jobSet.emplace(
type, name, ++m_lastJob, data.load(), func, m_cancelCallback);
queueJob(*result.first, lock);
auto result =
m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func);
auto const& job = *result.first;
JobType const type(job.getType());
assert(type != jtINVALID);
assert(m_jobSet.find(job) != m_jobSet.end());
perfLog_.jobQueue(type);
JobTypeData& data(getJobTypeData(type));
if (data.waiting + data.running < getJobLimit(type))
{
m_workers.addTask();
}
else
{
// defer the task until we go below the limit
++data.deferred;
}
++data.waiting;
}
return true;
}
@@ -282,29 +299,6 @@ JobQueue::isStopped() const
return stopped_;
}
void
JobQueue::queueJob(Job const& job, std::lock_guard<std::mutex> const& lock)
{
JobType const type(job.getType());
assert(type != jtINVALID);
assert(m_jobSet.find(job) != m_jobSet.end());
perfLog_.jobQueue(type);
JobTypeData& data(getJobTypeData(type));
if (data.waiting + data.running < getJobLimit(type))
{
m_workers.addTask();
}
else
{
// defer the task until we go below the limit
//
++data.deferred;
}
++data.waiting;
}
void
JobQueue::getNextJob(Job& job)
{
@@ -313,30 +307,25 @@ JobQueue::getNextJob(Job& job)
std::set<Job>::const_iterator iter;
for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
{
JobTypeData& data(getJobTypeData(iter->getType()));
JobType const type = iter->getType();
assert(type != jtINVALID);
assert(data.running <= getJobLimit(data.type()));
JobTypeData& data(getJobTypeData(type));
assert(data.running <= getJobLimit(type));
// Run this job if we're running below the limit.
if (data.running < getJobLimit(data.type()))
{
assert(data.waiting > 0);
--data.waiting;
++data.running;
break;
}
}
assert(iter != m_jobSet.end());
JobType const type = iter->getType();
JobTypeData& data(getJobTypeData(type));
assert(type != jtINVALID);
job = *iter;
m_jobSet.erase(iter);
--data.waiting;
++data.running;
}
void

View File

@@ -254,7 +254,7 @@ public:
// There is a separate check in `checkpoint` for a valid
// connection in the rare case when the DatabaseCon is destroyed
// after locking this weak_ptr
[wp = std::weak_ptr<Checkpointer>{shared_from_this()}](Job&) {
[wp = std::weak_ptr<Checkpointer>{shared_from_this()}]() {
if (auto self = wp.lock())
self->checkpoint();
}))

View File

@@ -37,24 +37,35 @@ class PerfLog;
/**
* `Workers` is effectively a thread pool. The constructor takes a "callback"
* that has a `void processTask(int instance)` method, and a number of
* workers. It creates that many Workers and then waits for calls to
* workers. It creates that many `Worker`s and then waits for calls to
* `Workers::addTask()`. It holds a semaphore that counts the number of
* waiting tasks, and a condition variable for the event when the last worker
* pauses itself.
* pending "tasks", and a condition variable for the event when the last
* worker pauses itself.
*
* A "task" is just a call to the callback's `processTask` method.
* "Adding a task" means calling that method now, or remembering to call it in
* the future.
* This is implemented with a semaphore.
* If there are any workers waiting when a task is added, then one will be
* woken to claim the task.
* If not, then the next worker to wait on the semaphore will claim the task.
*
* Creating a `Worker` creates a thread that calls `Worker::run()`. When that
* thread enters `Worker::run`, it increments the count of active workers in
* the parent `Workers` object and then blocks on the semaphore if there are
* no waiting tasks. It will be unblocked whenever the number of waiting tasks
* is incremented. That only happens in two circumstances: (1) when
* the parent `Workers` object and then tries to claim a task, which blocks if
* there are none pending.
* It will be unblocked whenever the semaphore is notified (i.e. when the
* number of pending tasks is incremented).
* That only happens in two circumstances: (1) when
* `Workers::addTask` is called and (2) when `Workers` wants to pause some
* workers ("pause one worker" is considered one task), which happens when
* someone wants to stop the workers or shrink the threadpool. No worker
* threads are ever destroyed until `Workers` is destroyed; it merely pauses
* workers until then.
*
* When an idle worker is woken, it checks whether `Workers` is trying to pause
* workers. If so, it adds itself to the set of paused workers and blocks on
* When a waiting worker is woken, it checks whether `Workers` is trying to
* pause workers. If so, it changes its status from active to paused and
* blocks on
* its own condition variable. If not, then it calls `processTask` on the
* "callback" held by `Workers`.
*
@@ -62,8 +73,8 @@ class PerfLog;
* to exit is only set in the destructor of `Worker`, which unblocks the
* paused thread and waits for it to exit. A `Worker::run` thread checks
* whether it needs to exit only when it is woken from a pause (not when it is
* woken from idle). This is why the destructor for `Workers` pauses all the
* workers before destroying them.
* woken from waiting). This is why the destructor for `Workers` pauses all
* the workers before destroying them.
*/
class Workers
{