From 94a47569d6085091d3e34a52ac36bf17d9064025 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Fri, 26 Feb 2016 15:07:46 -0500 Subject: [PATCH] Check suspended coros for JobQueue stop condition --- src/ripple/core/JobCoro.ipp | 10 +++++++++- src/ripple/core/JobQueue.h | 7 +++++++ src/ripple/core/impl/JobQueue.cpp | 4 +++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/ripple/core/JobCoro.ipp b/src/ripple/core/JobCoro.ipp index 727b9a5937..3083c3a01d 100644 --- a/src/ripple/core/JobCoro.ipp +++ b/src/ripple/core/JobCoro.ipp @@ -34,7 +34,7 @@ JobCoro::JobCoro(detail::JobCoro_create_t, JobQueue& jq, JobType type, (boost::coroutines::asymmetric_coroutine::push_type& do_yield) { yield_ = &do_yield; - (*yield_)(); + yield(); fn(shared_from_this()); }, boost::coroutines::attributes (1024 * 1024)) { @@ -44,6 +44,10 @@ inline void JobCoro::yield() const { + { + std::lock_guard lock(jq_.m_mutex); + ++jq_.nSuspend_; + } (*yield_)(); } @@ -60,6 +64,10 @@ JobCoro::post() jq_.addJob(type_, name_, [this, sp = shared_from_this()](Job&) { + { + std::lock_guard lock(jq_.m_mutex); + --jq_.nSuspend_; + } auto saved = detail::getLocalValues().release(); detail::getLocalValues().reset(&lvs_); std::lock_guard lock(mutex_); diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h index 7f5f2ddfb0..608d2f740f 100644 --- a/src/ripple/core/JobQueue.h +++ b/src/ripple/core/JobQueue.h @@ -36,6 +36,8 @@ namespace ripple { class Logs; +/** A pool of threads to perform work. +*/ class JobQueue : public beast::Stoppable , private beast::Workers::Callback @@ -101,6 +103,8 @@ public: rendezvous(); private: + friend class JobCoro; + using JobDataMap = std::map ; beast::Journal m_journal; @@ -113,6 +117,9 @@ private: // The number of jobs currently in processTask() int m_processCount; + // The number of suspended coroutines + int nSuspend_ = 0; + beast::Workers m_workers; Job::CancelCallback m_cancelCallback; diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index ac0bd6d7f9..c10c8c78b6 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -339,11 +339,13 @@ JobQueue::checkStopped (std::lock_guard const& lock) // 2. All Stoppable children have stopped // 3. There are no executing calls to processTask // 4. There are no remaining Jobs in the job set + // 5. There are no suspended coroutines // if (isStopping() && areChildrenStopped() && (m_processCount == 0) && - m_jobSet.empty()) + m_jobSet.empty() && + nSuspend_ == 0) { stopped(); }