Prevent misuse of JobQueue header files:

* Move `JobCoro` to `JobQueue::Coro` and remove separate JobCoro.h
This commit is contained in:
Miguel Portilla
2016-10-28 13:12:00 -04:00
committed by Nik Bougalis
parent afd4b45036
commit 4b261b12a4
13 changed files with 144 additions and 184 deletions

View File

@@ -1784,6 +1784,8 @@
</ClInclude>
<ClInclude Include="..\..\src\ripple\core\ConfigSections.h">
</ClInclude>
<None Include="..\..\src\ripple\core\Coro.ipp">
</None>
<ClInclude Include="..\..\src\ripple\core\DatabaseCon.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\core\DeadlineTimer.h">
@@ -1880,10 +1882,6 @@
</ClInclude>
<ClInclude Include="..\..\src\ripple\core\Job.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\core\JobCoro.h">
</ClInclude>
<None Include="..\..\src\ripple\core\JobCoro.ipp">
</None>
<ClInclude Include="..\..\src\ripple\core\JobQueue.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\core\JobTypeData.h">

View File

@@ -2415,6 +2415,9 @@
<ClInclude Include="..\..\src\ripple\core\ConfigSections.h">
<Filter>ripple\core</Filter>
</ClInclude>
<None Include="..\..\src\ripple\core\Coro.ipp">
<Filter>ripple\core</Filter>
</None>
<ClInclude Include="..\..\src\ripple\core\DatabaseCon.h">
<Filter>ripple\core</Filter>
</ClInclude>
@@ -2475,12 +2478,6 @@
<ClInclude Include="..\..\src\ripple\core\Job.h">
<Filter>ripple\core</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\core\JobCoro.h">
<Filter>ripple\core</Filter>
</ClInclude>
<None Include="..\..\src\ripple\core\JobCoro.ipp">
<Filter>ripple\core</Filter>
</None>
<ClInclude Include="..\..\src\ripple\core\JobQueue.h">
<Filter>ripple\core</Filter>
</ClInclude>

View File

@@ -17,13 +17,14 @@
*/
//==============================================================================
#ifndef RIPPLE_CORE_JOBCOROINL_H_INCLUDED
#define RIPPLE_CORE_JOBCOROINL_H_INCLUDED
#ifndef RIPPLE_CORE_COROINL_H_INCLUDED
#define RIPPLE_CORE_COROINL_H_INCLUDED
namespace ripple {
template <class F>
JobCoro::JobCoro(detail::JobCoro_create_t, JobQueue& jq, JobType type,
JobQueue::Coro::
Coro(Coro_create_t, JobQueue& jq, JobType type,
std::string const& name, F&& f)
: jq_(jq)
, type_(type)
@@ -44,14 +45,16 @@ JobCoro::JobCoro(detail::JobCoro_create_t, JobQueue& jq, JobType type,
}
inline
JobCoro::~JobCoro()
JobQueue::Coro::
~Coro()
{
assert(finished_);
}
inline
void
JobCoro::yield() const
JobQueue::Coro::
yield() const
{
{
std::lock_guard<std::mutex> lock(jq_.m_mutex);
@@ -62,7 +65,8 @@ JobCoro::yield() const
inline
void
JobCoro::post()
JobQueue::Coro::
post()
{
{
std::lock_guard<std::mutex> lk(mutex_run_);
@@ -91,7 +95,8 @@ JobCoro::post()
inline
void
JobCoro::join()
JobQueue::Coro::
join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk,

View File

@@ -1,99 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_CORE_JOBCORO_H_INCLUDED
#define RIPPLE_CORE_JOBCORO_H_INCLUDED
#include <ripple/basics/win32_workaround.h>
#include <ripple/core/Job.h>
#include <ripple/basics/LocalValue.h>
#include <boost/coroutine/all.hpp>
#include <condition_variable>
#include <string>
#include <memory>
#include <mutex>
namespace ripple {
class JobQueue;
namespace detail {
struct JobCoro_create_t { };
} // detail
/** Coroutines must run to completion. */
class JobCoro : public std::enable_shared_from_this<JobCoro>
{
private:
detail::LocalValues lvs_;
JobQueue& jq_;
JobType type_;
std::string name_;
bool running_;
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
#ifndef NDEBUG
bool finished_ = false;
#endif
public:
// Private: Used in the implementation
template <class F>
JobCoro(detail::JobCoro_create_t, JobQueue&, JobType,
std::string const&, F&&);
// Not copy-constructible or assignable
JobCoro(JobCoro const&) = delete;
JobCoro& operator= (JobCoro const&) = delete;
~JobCoro();
/** Suspend coroutine execution.
Effects:
The coroutine's stack is saved.
The associated Job thread is released.
Note:
The associated Job function returns.
Undefined behavior if called consecutively without a corresponding post.
*/
void yield() const;
/** Schedule coroutine execution.
Effects:
Returns immediately.
A new job is scheduled to resume the execution of the coroutine.
When the job runs, the coroutine's stack is restored and execution
continues at the beginning of coroutine function or the statement
after the previous call to yield.
Undefined behavior if called consecutively without a corresponding yield.
*/
void post();
/** Waits until coroutine returns from the user function. */
void join();
};
} // ripple
#endif

View File

@@ -20,26 +20,31 @@
#ifndef RIPPLE_CORE_JOBQUEUE_H_INCLUDED
#define RIPPLE_CORE_JOBQUEUE_H_INCLUDED
#include <ripple/basics/LocalValue.h>
#include <ripple/basics/win32_workaround.h>
#include <ripple/core/Job.h>
#include <ripple/core/JobTypes.h>
#include <ripple/core/JobTypeData.h>
#include <ripple/core/JobCoro.h>
#include <ripple/core/impl/Workers.h>
#include <ripple/json/json_value.h>
#include <ripple/beast/insight/Collector.h>
#include <ripple/core/Stoppable.h>
#include <boost/coroutine/all.hpp>
#include <boost/function.hpp>
#include <condition_variable>
#include <mutex>
#include <set>
#include <thread>
namespace ripple {
class Logs;
struct Coro_create_t {};
/** A pool of threads to perform work.
A job posted will always run to completion.
Coroutines that are suspended must be resumed,
and run to completion.
@@ -51,6 +56,61 @@ class JobQueue
, private Workers::Callback
{
public:
/** Coroutines must run to completion. */
class Coro : public std::enable_shared_from_this<Coro>
{
private:
detail::LocalValues lvs_;
JobQueue& jq_;
JobType type_;
std::string name_;
bool running_;
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
#ifndef NDEBUG
bool finished_ = false;
#endif
public:
// Private: Used in the implementation
template <class F>
Coro(Coro_create_t, JobQueue&, JobType,
std::string const&, F&&);
// Not copy-constructible or assignable
Coro(Coro const&) = delete;
Coro& operator= (Coro const&) = delete;
~Coro();
/** Suspend coroutine execution.
Effects:
The coroutine's stack is saved.
The associated Job thread is released.
Note:
The associated Job function returns.
Undefined behavior if called consecutively without a corresponding post.
*/
void yield() const;
/** Schedule coroutine execution.
Effects:
Returns immediately.
A new job is scheduled to resume the execution of the coroutine.
When the job runs, the coroutine's stack is restored and execution
continues at the beginning of coroutine function or the statement
after the previous call to yield.
Undefined behavior if called consecutively without a corresponding yield.
*/
void post();
/** Waits until coroutine returns from the user function. */
void join();
};
using JobFunction = std::function <void(Job&)>;
JobQueue (beast::insight::Collector::ptr const& collector,
@@ -63,7 +123,7 @@ public:
@param t The type of job.
@param name Name of the job.
@param f Has a signature of void(std::shared_ptr<JobCoro>). Called when the job executes.
@param f Has a signature of void(std::shared_ptr<Coro>). Called when the job executes.
*/
template <class F>
void postCoro (JobType t, std::string const& name, F&& f);
@@ -111,7 +171,7 @@ public:
rendezvous();
private:
friend class JobCoro;
friend class Coro;
using JobDataMap = std::map <JobType, JobTypeData>;
@@ -235,7 +295,7 @@ private:
point. This frees up the handler thread and allows it to continue handling
other requests while the RPC command completes its work asynchronously.
postCoro() creates a JobCoro object. When the JobCoro ctor is called, and its
postCoro() creates a Coro object. When the Coro ctor is called, and its
coro_ member is initialized(a boost::coroutines::pull_type), execution
automatically passes to the coroutine, which we don't want at this point,
since we are still in the handler thread context. It's important to note here
@@ -243,21 +303,21 @@ private:
coroutine. A pull_type object automatically generates a push_type that is
used as the as a parameter(do_yield) in the signature of the function the
pull_type was created with. This function is immediately called during coro_
construction and within it, JobCoro::yield_ is assigned the push_type
construction and within it, Coro::yield_ is assigned the push_type
parameter(do_yield) address and called(yield()) so we can return execution
back to the caller's stack.
postCoro() then calls JobCoro::post(), which schedules a job on the job
postCoro() then calls Coro::post(), which schedules a job on the job
queue to continue execution of the coroutine in a JobQueue worker thread at
some later time. When the job runs, we lock on the JobCoro::mutex_ and call
some later time. When the job runs, we lock on the Coro::mutex_ and call
coro_ which continues where we had left off. Since we the last thing we did
in coro_ was call yield(), the next thing we continue with is calling the
function param f, that was passed into JobCoro ctor. It is within this
function param f, that was passed into Coro ctor. It is within this
function body that the caller specifies what he would like to do while
running in the coroutine and allow them to suspend and resume execution.
A task that relies on other events to complete, such as path finding, calls
JobCoro::yield() to suspend its execution while waiting on those events to
complete and continue when signaled via the JobCoro::post() method.
Coro::yield() to suspend its execution while waiting on those events to
complete and continue when signaled via the Coro::post() method.
There is a potential race condition that exists here where post() can get
called before yield() after f is called. Technically the problem only occurs
@@ -288,7 +348,7 @@ private:
} // ripple
#include <ripple/core/JobCoro.ipp>
#include <ripple/core/Coro.ipp>
namespace ripple {
@@ -297,10 +357,10 @@ void JobQueue::postCoro (JobType t, std::string const& name, F&& f)
{
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<JobCoro>).
void(std::shared_ptr<Coro>).
*/
auto const coro = std::make_shared<JobCoro>(
detail::JobCoro_create_t{}, *this, t, name, std::forward<F>(f));
auto const coro = std::make_shared<Coro>(
Coro_create_t{}, *this, t, name, std::forward<F>(f));
coro->post();
}

View File

@@ -21,7 +21,7 @@
#define RIPPLE_RPC_CONTEXT_H_INCLUDED
#include <ripple/core/Config.h>
#include <ripple/core/JobCoro.h>
#include <ripple/core/JobQueue.h>
#include <ripple/net/InfoSub.h>
#include <ripple/rpc/Role.h>
@@ -55,7 +55,7 @@ struct Context
LedgerMaster& ledgerMaster;
Resource::Consumer& consumer;
Role role;
std::shared_ptr<JobCoro> jobCoro;
std::shared_ptr<JobQueue::Coro> coro;
InfoSub::pointer infoSub;
Headers headers;
};

View File

@@ -56,11 +56,11 @@ Json::Value doRipplePathFind (RPC::Context& context)
lpLedger = context.ledgerMaster.getClosedLedger();
jvResult = context.app.getPathRequests().makeLegacyPathRequest (
request, std::bind(&JobCoro::post, context.jobCoro),
request, std::bind(&JobQueue::Coro::post, context.coro),
context.consumer, lpLedger, context.params);
if (request)
{
context.jobCoro->yield();
context.coro->yield();
jvResult = request->doStatus (context.params);
}

View File

@@ -316,9 +316,9 @@ ServerHandlerImp::onRequest (Session& session)
}
m_jobQueue.postCoro(jtCLIENT, "RPC-Client",
[this, detach = session.detach()](std::shared_ptr<JobCoro> jc)
[this, detach = session.detach()](std::shared_ptr<JobQueue::Coro> c)
{
processSession(detach, jc);
processSession(detach, c);
});
}
@@ -358,10 +358,10 @@ ServerHandlerImp::onWSMessage(
m_jobQueue.postCoro(jtCLIENT, "WS-Client",
[this, session = std::move(session),
jv = std::move(jv)](auto const& jc)
jv = std::move(jv)](auto const& c)
{
auto const jr =
this->processSession(session, jc, jv);
this->processSession(session, c, jv);
auto const s = to_string(jr);
auto const n = s.length();
beast::streambuf sb(n);
@@ -392,7 +392,7 @@ ServerHandlerImp::onStopped (Server&)
Json::Value
ServerHandlerImp::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobCoro> const& coro,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv)
{
auto is = std::static_pointer_cast<WSInfoSub> (session->appDefined);
@@ -482,13 +482,13 @@ ServerHandlerImp::processSession(
// Run as a coroutine.
void
ServerHandlerImp::processSession (std::shared_ptr<Session> const& session,
std::shared_ptr<JobCoro> jobCoro)
std::shared_ptr<JobQueue::Coro> coro)
{
processRequest (
session->port(), buffers_to_string(
session->request().body.data()),
session->remoteAddress().at_port (0),
makeOutput (*session), jobCoro,
makeOutput (*session), coro,
[&]
{
auto const iter =
@@ -517,7 +517,7 @@ ServerHandlerImp::processSession (std::shared_ptr<Session> const& session,
void
ServerHandlerImp::processRequest (Port const& port,
std::string const& request, beast::IP::Endpoint const& remoteIPAddress,
Output&& output, std::shared_ptr<JobCoro> jobCoro,
Output&& output, std::shared_ptr<JobQueue::Coro> coro,
std::string forwardedFor, std::string user)
{
auto rpcJ = app_.journal ("RPC");
@@ -650,7 +650,7 @@ ServerHandlerImp::processRequest (Port const& port,
auto const start (std::chrono::high_resolution_clock::now ());
RPC::Context context {m_journal, params, app_, loadType, m_networkOPs,
app_.getLedgerMaster(), usage, role, jobCoro, InfoSub::pointer(),
app_.getLedgerMaster(), usage, role, coro, InfoSub::pointer(),
{user, forwardedFor}};
Json::Value result;
RPC::doCommand (context, result);

View File

@@ -20,8 +20,7 @@
#ifndef RIPPLE_RPC_SERVERHANDLERIMP_H_INCLUDED
#define RIPPLE_RPC_SERVERHANDLERIMP_H_INCLUDED
#include <ripple/core/Job.h>
#include <ripple/core/JobCoro.h>
#include <ripple/core/JobQueue.h>
#include <ripple/rpc/impl/WSInfoSub.h>
#include <ripple/server/Server.h>
#include <ripple/server/Session.h>
@@ -153,17 +152,17 @@ private:
Json::Value
processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobCoro> const& coro,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv);
void
processSession (std::shared_ptr<Session> const&,
std::shared_ptr<JobCoro> jobCoro);
std::shared_ptr<JobQueue::Coro> coro);
void
processRequest (Port const& port, std::string const& request,
beast::IP::Endpoint const& remoteIPAddress, Output&&,
std::shared_ptr<JobCoro> jobCoro,
std::shared_ptr<JobQueue::Coro> coro,
std::string forwardedFor, std::string user);
Handoff

View File

@@ -24,7 +24,7 @@
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/CountedObject.h>
#include <ripple/basics/Log.h>
#include <ripple/core/JobCoro.h>
#include <ripple/core/JobQueue.h>
#include <ripple/json/to_string.h>
#include <ripple/net/InfoSub.h>
#include <ripple/net/RPCErr.h>
@@ -97,7 +97,7 @@ public:
boost::optional <std::string> getMessage ();
bool checkMessage ();
Json::Value invokeCommand (Json::Value const& jvRequest,
std::shared_ptr<JobCoro> jobCoro);
std::shared_ptr<JobQueue::Coro> coro);
// Generically implemented per version.
void setPingTimer ();
@@ -243,7 +243,7 @@ ConnectionImpl <WebSocket>::getMessage ()
template <class WebSocket>
Json::Value ConnectionImpl <WebSocket>::invokeCommand (
Json::Value const& jvRequest, std::shared_ptr<JobCoro> jobCoro)
Json::Value const& jvRequest, std::shared_ptr<JobQueue::Coro> coro)
{
if (getConsumer().disconnect ())
{
@@ -287,7 +287,7 @@ Json::Value ConnectionImpl <WebSocket>::invokeCommand (
{
RPC::Context context {app_.journal ("RPCHandler"), jvRequest,
app_, loadType, m_netOPs, app_.getLedgerMaster(), getConsumer(),
role, jobCoro, this->shared_from_this(),
role, coro, this->shared_from_this(),
{m_user, m_forwardedFor}};
RPC::doCommand (context, jvResult[jss::result]);
}

View File

@@ -308,9 +308,9 @@ public:
{
app_.getJobQueue().postCoro(jtCLIENT, "WSClient",
[this, cpClient]
(auto const& jc)
(auto const& c)
{
this->do_messages(jc, cpClient);
this->do_messages(c, cpClient);
});
}
@@ -348,7 +348,7 @@ public:
message_job("command", cpClient);
}
void do_messages (std::shared_ptr<JobCoro> const& jc,
void do_messages (std::shared_ptr<JobQueue::Coro> const& c,
connection_ptr const& cpClient)
{
wsc_ptr ptr;
@@ -374,7 +374,7 @@ public:
if (! msg)
return;
do_message(jc, cpClient, ptr, *msg);
do_message(c, cpClient, ptr, *msg);
}
if (ptr->checkMessage ())
@@ -382,7 +382,7 @@ public:
}
void
do_message (std::shared_ptr<JobCoro> const& jc,
do_message (std::shared_ptr<JobQueue::Coro> const& c,
const connection_ptr cpClient, wsc_ptr conn,
const std::string& message)
{
@@ -415,7 +415,7 @@ public:
{
using namespace std::chrono;
auto const start = high_resolution_clock::now();
auto buffer = to_string(conn->invokeCommand(jvRequest, jc));
auto buffer = to_string(conn->invokeCommand(jvRequest, c));
rpc_time_.notify (
static_cast <beast::insight::Event::value_type> (
duration_cast <milliseconds> (

View File

@@ -233,7 +233,7 @@ public:
[&](auto const& coro)
{
context.params = std::move (params);
context.jobCoro = coro;
context.coro = coro;
RPC::doCommand (context, result);
g.signal();
});
@@ -303,7 +303,7 @@ public:
{
context.params = rpf(Account("alice"), Account("bob"),
RPC::Tuning::max_src_cur);
context.jobCoro = coro;
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
});
@@ -316,7 +316,7 @@ public:
{
context.params = rpf(Account("alice"), Account("bob"),
RPC::Tuning::max_src_cur + 1);
context.jobCoro = coro;
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
});
@@ -330,7 +330,7 @@ public:
[&](auto const& coro)
{
context.params = rpf(Account("alice"), Account("bob"), 0);
context.jobCoro = coro;
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
});
@@ -343,7 +343,7 @@ public:
[&](auto const& coro)
{
context.params = rpf(Account("alice"), Account("bob"), 0);
context.jobCoro = coro;
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
});

View File

@@ -68,18 +68,18 @@ public:
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, false);
gate g1, g2;
std::shared_ptr<JobCoro> jc;
std::shared_ptr<JobQueue::Coro> c;
jq.postCoro(jtCLIENT, "Coroutine-Test",
[&](auto const& jcr)
[&](auto const& cr)
{
jc = jcr;
c = cr;
g1.signal();
jc->yield();
c->yield();
g2.signal();
});
BEAST_EXPECT(g1.wait_for(5s));
jc->join();
jc->post();
c->join();
c->post();
BEAST_EXPECT(g2.wait_for(5s));
}
@@ -93,10 +93,10 @@ public:
jq.setThreadCount(0, false);
gate g;
jq.postCoro(jtCLIENT, "Coroutine-Test",
[&](auto const& jc)
[&](auto const& c)
{
jc->post();
jc->yield();
c->post();
c->yield();
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
@@ -111,7 +111,7 @@ public:
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, true);
static int const N = 4;
std::array<std::shared_ptr<JobCoro>, N> a;
std::array<std::shared_ptr<JobQueue::Coro>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
@@ -131,33 +131,33 @@ public:
for(int i = 0; i < N; ++i)
{
jq.postCoro(jtCLIENT, "Coroutine-Test",
[&, id = i](auto const& jc)
[&, id = i](auto const& c)
{
a[id] = jc;
a[id] = c;
g.signal();
jc->yield();
c->yield();
this->BEAST_EXPECT(*lv == -1);
*lv = id;
this->BEAST_EXPECT(*lv == id);
g.signal();
jc->yield();
c->yield();
this->BEAST_EXPECT(*lv == id);
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}
for(auto const& jc : a)
for(auto const& c : a)
{
jc->post();
c->post();
BEAST_EXPECT(g.wait_for(5s));
jc->join();
c->join();
}
for(auto const& jc : a)
for(auto const& c : a)
{
jc->post();
jc->join();
c->post();
c->join();
}
jq.addJob(jtCLIENT, "LocalValue-Test",