Add JobCoro::join

This commit is contained in:
Miguel Portilla
2015-12-23 14:00:56 -05:00
committed by Edward Hennis
parent 49c86768e6
commit f73c55a922
2 changed files with 38 additions and 8 deletions

View File

@@ -23,6 +23,7 @@
#include <ripple/core/Job.h>
#include <beast/win32_workaround.h>
#include <boost/coroutine/all.hpp>
#include <condition_variable>
#include <string>
#include <mutex>
@@ -42,14 +43,17 @@ private:
JobQueue& jq_;
JobType type_;
std::string name_;
bool running_;
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
public:
// Private: Used in the implementation
template <class F>
JobCoro (detail::JobCoro_create_t, JobQueue&, JobType,
JobCoro(detail::JobCoro_create_t, JobQueue&, JobType,
std::string const&, F&&);
/** Suspend coroutine execution.
@@ -60,9 +64,9 @@ public:
The associated Job function returns.
Undefined behavior if called consecutively without a corresponding post.
*/
void yield () const;
void yield() const;
/** Schedule coroutine execution
/** Schedule coroutine execution.
Effects:
Returns immediately.
A new job is scheduled to resume the execution of the coroutine.
@@ -71,7 +75,10 @@ public:
after the previous call to yield.
Undefined behavior if called consecutively without a corresponding yield.
*/
void post ();
void post();
/** Waits until coroutine returns from the user function. */
void join();
};
} // ripple

View File

@@ -23,11 +23,12 @@
namespace ripple {
template <class F>
JobCoro::JobCoro (detail::JobCoro_create_t, JobQueue& jq, JobType type,
JobCoro::JobCoro(detail::JobCoro_create_t, JobQueue& jq, JobType type,
std::string const& name, F&& f)
: jq_(jq)
, type_(type)
, name_(name)
, running_(false)
, coro_(
[this, fn = std::forward<F>(f)]
(boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield)
@@ -41,21 +42,43 @@ JobCoro::JobCoro (detail::JobCoro_create_t, JobQueue& jq, JobType type,
inline
void
JobCoro::yield () const
JobCoro::yield() const
{
(*yield_)();
}
inline
void
JobCoro::post ()
JobCoro::post()
{
{
std::lock_guard<std::mutex> lk(mutex_run_);
running_ = true;
}
// sp keeps 'this' alive
jq_.addJob(type_, name_,
[this, sp = shared_from_this()](Job&)
{
std::lock_guard<std::mutex> lock (mutex_);
std::lock_guard<std::mutex> lock(mutex_);
context_sp().reset(&ctx_);
coro_();
std::lock_guard<std::mutex> lk(mutex_run_);
running_ = false;
cv_.notify_all();
context_sp().reset(nullptr);
});
}
inline
void
JobCoro::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk,
[this]()
{
return running_ == false;
});
}