Check suspended coros for JobQueue stop condition

This commit is contained in:
Vinnie Falco
2016-02-26 15:07:46 -05:00
parent 73df97f2d0
commit 94a47569d6
3 changed files with 19 additions and 2 deletions

View File

@@ -34,7 +34,7 @@ JobCoro::JobCoro(detail::JobCoro_create_t, JobQueue& jq, JobType type,
(boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield)
{
yield_ = &do_yield;
(*yield_)();
yield();
fn(shared_from_this());
}, boost::coroutines::attributes (1024 * 1024))
{
@@ -44,6 +44,10 @@ inline
void
JobCoro::yield() const
{
{
std::lock_guard<std::mutex> lock(jq_.m_mutex);
++jq_.nSuspend_;
}
(*yield_)();
}
@@ -60,6 +64,10 @@ JobCoro::post()
jq_.addJob(type_, name_,
[this, sp = shared_from_this()](Job&)
{
{
std::lock_guard<std::mutex> lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard<std::mutex> lock(mutex_);

View File

@@ -36,6 +36,8 @@ namespace ripple {
class Logs;
/** A pool of threads to perform work.
*/
class JobQueue
: public beast::Stoppable
, private beast::Workers::Callback
@@ -101,6 +103,8 @@ public:
rendezvous();
private:
friend class JobCoro;
using JobDataMap = std::map <JobType, JobTypeData>;
beast::Journal m_journal;
@@ -113,6 +117,9 @@ private:
// The number of jobs currently in processTask()
int m_processCount;
// The number of suspended coroutines
int nSuspend_ = 0;
beast::Workers m_workers;
Job::CancelCallback m_cancelCallback;

View File

@@ -339,11 +339,13 @@ JobQueue::checkStopped (std::lock_guard <std::mutex> const& lock)
// 2. All Stoppable children have stopped
// 3. There are no executing calls to processTask
// 4. There are no remaining Jobs in the job set
// 5. There are no suspended coroutines
//
if (isStopping() &&
areChildrenStopped() &&
(m_processCount == 0) &&
m_jobSet.empty())
m_jobSet.empty() &&
nSuspend_ == 0)
{
stopped();
}