Redesign stoppable object pattern

This commit is contained in:
John Freeman
2021-05-26 17:45:41 -05:00
committed by manojsdoshi
parent c10c0be11b
commit a2a37a928a
92 changed files with 781 additions and 2460 deletions

View File

@@ -20,22 +20,21 @@
#include <ripple/basics/PerfLog.h>
#include <ripple/basics/contract.h>
#include <ripple/core/JobQueue.h>
#include <mutex>
namespace ripple {
JobQueue::JobQueue(
beast::insight::Collector::ptr const& collector,
Stoppable& parent,
beast::Journal journal,
Logs& logs,
perf::PerfLog& perfLog)
: Stoppable("JobQueue", parent)
, m_journal(journal)
: m_journal(journal)
, m_lastJob(0)
, m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
, m_processCount(0)
, m_workers(*this, &perfLog, "JobQueue", 0)
, m_cancelCallback(std::bind(&Stoppable::isStopping, this))
, m_cancelCallback(std::bind(&JobQueue::isStopping, this))
, perfLog_(perfLog)
, m_collector(collector)
{
@@ -96,24 +95,8 @@ JobQueue::addRefCountedJob(
{
std::lock_guard lock(m_mutex);
// If this goes off it means that a child didn't follow
// the Stoppable API rules. A job may only be added if:
//
// - The JobQueue has NOT stopped
// AND
// * We are currently processing jobs
// OR
// * We have have pending jobs
// OR
// * Not all children are stopped
//
assert(
!isStopped() &&
(m_processCount > 0 || !m_jobSet.empty() || !areChildrenStopped()));
std::pair<std::set<Job>::iterator, bool> result(m_jobSet.insert(
Job(type, name, ++m_lastJob, data.load(), func, m_cancelCallback)));
auto result = m_jobSet.emplace(
type, name, ++m_lastJob, data.load(), func, m_cancelCallback);
queueJob(*result.first, lock);
}
return true;
@@ -278,7 +261,7 @@ void
JobQueue::rendezvous()
{
std::unique_lock<std::mutex> lock(m_mutex);
cv_.wait(lock, [&] { return m_processCount == 0 && m_jobSet.empty(); });
cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
}
JobTypeData&
@@ -296,28 +279,31 @@ JobQueue::getJobTypeData(JobType type)
}
void
JobQueue::onStop()
JobQueue::stop()
{
// onStop must be defined and empty here,
// otherwise the base class will do the wrong thing.
stopping_ = true;
using namespace std::chrono_literals;
jobCounter_.join("JobQueue", 1s, m_journal);
{
// After the JobCounter is joined, all jobs have finished executing
// (i.e. returned from `Job::doJob`) and no more are being accepted,
// but there may still be some threads between the return of
// `Job::doJob` and the return of `JobQueue::processTask`. That is why
// we must wait on the condition variable to make these assertions.
std::unique_lock<std::mutex> lock(m_mutex);
cv_.wait(
lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
assert(m_processCount == 0);
assert(m_jobSet.empty());
assert(nSuspend_ == 0);
stopped_ = true;
}
}
void
JobQueue::checkStopped(std::lock_guard<std::mutex> const& lock)
bool
JobQueue::isStopped() const
{
// We are stopped when all of the following are true:
//
// 1. A stop notification was received
// 2. All Stoppable children have stopped
// 3. There are no executing calls to processTask
// 4. There are no remaining Jobs in the job set
// 5. There are no suspended coroutines
//
if (isStopping() && areChildrenStopped() && (m_processCount == 0) &&
m_jobSet.empty() && nSuspend_ == 0)
{
stopped();
}
return stopped_;
}
void
@@ -437,13 +423,12 @@ JobQueue::processTask(int instance)
{
std::lock_guard lock(m_mutex);
// Job should be destroyed before calling checkStopped
// Job should be destroyed before stopping
// otherwise destructors with side effects can access
// parent objects that are already destroyed.
finishJob(type);
if (--m_processCount == 0 && m_jobSet.empty())
cv_.notify_all();
checkStopped(lock);
}
// Note that when Job::~Job is called, the last reference
@@ -459,11 +444,4 @@ JobQueue::getJobLimit(JobType type)
return j.limit();
}
void
JobQueue::onChildrenStopped()
{
std::lock_guard lock(m_mutex);
checkStopped(lock);
}
} // namespace ripple

View File

@@ -1,221 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/basics/contract.h>
#include <ripple/core/Stoppable.h>
#include <cassert>
namespace ripple {
Stoppable::Stoppable(std::string name, RootStoppable& root)
: m_name(std::move(name)), m_root(root), m_child(this)
{
}
Stoppable::Stoppable(std::string name, Stoppable& parent)
: m_name(std::move(name)), m_root(parent.m_root), m_child(this)
{
setParent(parent);
}
Stoppable::~Stoppable()
{
}
void
Stoppable::setParent(Stoppable& parent)
{
assert(!hasParent_);
assert(!parent.isStopping());
assert(std::addressof(m_root) == std::addressof(parent.m_root));
parent.m_children.push_front(&m_child);
hasParent_ = true;
}
bool
Stoppable::isStopping() const
{
return m_root.isStopping();
}
bool
Stoppable::isStopped() const
{
return m_stopped;
}
bool
Stoppable::areChildrenStopped() const
{
return m_childrenStopped;
}
void
Stoppable::stopped()
{
std::lock_guard lk{m_mut};
m_is_stopping = true;
m_cv.notify_all();
}
void
Stoppable::onPrepare()
{
}
void
Stoppable::onStart()
{
}
void
Stoppable::onStop()
{
stopped();
}
void
Stoppable::onChildrenStopped()
{
}
//------------------------------------------------------------------------------
void
Stoppable::prepareRecursive()
{
for (Children::const_iterator iter(m_children.cbegin());
iter != m_children.cend();
++iter)
iter->stoppable->prepareRecursive();
onPrepare();
}
void
Stoppable::startRecursive()
{
onStart();
for (Children::const_iterator iter(m_children.cbegin());
iter != m_children.cend();
++iter)
iter->stoppable->startRecursive();
}
void
Stoppable::stopAsyncRecursive(beast::Journal j)
{
onStop();
for (Children::const_iterator iter(m_children.cbegin());
iter != m_children.cend();
++iter)
iter->stoppable->stopAsyncRecursive(j);
}
void
Stoppable::stopRecursive(beast::Journal j)
{
// Block on each child from the bottom of the tree up.
//
for (Children::const_iterator iter(m_children.cbegin());
iter != m_children.cend();
++iter)
iter->stoppable->stopRecursive(j);
// if we get here then all children have stopped
//
m_childrenStopped = true;
onChildrenStopped();
// Now block on this Stoppable until m_is_stopping is set by stopped().
//
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk{m_mut};
if (!m_cv.wait_for(lk, 1s, [this] { return m_is_stopping; }))
{
if (auto stream = j.error())
stream << "Waiting for '" << m_name << "' to stop";
m_cv.wait(lk, [this] { return m_is_stopping; });
}
m_stopped = true;
}
//------------------------------------------------------------------------------
RootStoppable::RootStoppable(std::string name)
: Stoppable(std::move(name), *this)
{
}
RootStoppable::~RootStoppable()
{
using namespace std::chrono_literals;
jobCounter_.join(m_name.c_str(), 1s, debugLog());
}
bool
RootStoppable::isStopping() const
{
return stopEntered_;
}
void
RootStoppable::start()
{
if (startEntered_.exchange(true))
return;
prepareRecursive();
startRecursive();
startExited_ = true;
}
void
RootStoppable::stop(beast::Journal j)
{
// Must have a prior call to start()
assert(startExited_);
bool alreadyCalled;
{
// Even though stopEntered_ is atomic, we change its value under a
// lock. This removes a small timing window that occurs if the
// waiting thread is handling a spurious wakeup while stopEntered_
// changes state.
std::unique_lock<std::mutex> lock(m_);
alreadyCalled = stopEntered_.exchange(true);
}
if (alreadyCalled)
{
if (auto stream = j.warn())
stream << "RootStoppable::stop called again";
return;
}
// Wait until all in-flight JobQueue Jobs are completed.
using namespace std::chrono_literals;
jobCounter_.join(m_name.c_str(), 1s, j);
c_.notify_all();
stopAsyncRecursive(j);
stopRecursive(j);
}
} // namespace ripple

View File

@@ -44,7 +44,7 @@ Workers::Workers(
Workers::~Workers()
{
pauseAllThreadsAndWait();
stop();
deleteWorkers(m_everyone);
}
@@ -111,7 +111,7 @@ Workers::setNumberOfThreads(int numberOfThreads)
}
void
Workers::pauseAllThreadsAndWait()
Workers::stop()
{
setNumberOfThreads(0);

View File

@@ -34,7 +34,36 @@ namespace perf {
class PerfLog;
}
/** A group of threads that process tasks.
/**
* `Workers` is effectively a thread pool. The constructor takes a "callback"
* that has a `void processTask(int instance)` method, and a number of
* workers. It creates that many Workers and then waits for calls to
* `Workers::addTask()`. It holds a semaphore that counts the number of
* waiting tasks, and a condition variable for the event when the last worker
* pauses itself.
*
* Creating a `Worker` creates a thread that calls `Worker::run()`. When that
* thread enters `Worker::run`, it increments the count of active workers in
* the parent `Workers` object and then blocks on the semaphore if there are
* no waiting tasks. It will be unblocked whenever the number of waiting tasks
* is incremented. That only happens in two circumstances: (1) when
* `Workers::addTask` is called and (2) when `Workers` wants to pause some
* workers ("pause one worker" is considered one task), which happens when
* someone wants to stop the workers or shrink the threadpool. No worker
* threads are ever destroyed until `Workers` is destroyed; it merely pauses
* workers until then.
*
* When an idle worker is woken, it checks whether `Workers` is trying to pause
* workers. If so, it adds itself to the set of paused workers and blocks on
* its own condition variable. If not, then it calls `processTask` on the
* "callback" held by `Workers`.
*
* When a paused worker is woken, it checks whether it should exit. The signal
* to exit is only set in the destructor of `Worker`, which unblocks the
* paused thread and waits for it to exit. A `Worker::run` thread checks
* whether it needs to exit only when it is woken from a pause (not when it is
* woken from idle). This is why the destructor for `Workers` pauses all the
* workers before destroying them.
*/
class Workers
{
@@ -104,7 +133,7 @@ public:
@note This function is not thread-safe.
*/
void
pauseAllThreadsAndWait();
stop();
/** Add a task to be performed.