diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj
index 40d06f1df..b12cc027e 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj
+++ b/Builds/VisualStudio2015/RippleD.vcxproj
@@ -1784,6 +1784,8 @@
+
+
@@ -1880,10 +1882,6 @@
-
-
-
-
diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters
index 8a1925f80..38f373d98 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters
@@ -2415,6 +2415,9 @@
ripple\core
+
+ ripple\core
+
ripple\core
@@ -2475,12 +2478,6 @@
ripple\core
-
- ripple\core
-
-
- ripple\core
-
ripple\core
diff --git a/src/ripple/core/JobCoro.ipp b/src/ripple/core/Coro.ipp
similarity index 91%
rename from src/ripple/core/JobCoro.ipp
rename to src/ripple/core/Coro.ipp
index da9f0413f..8eeadd32f 100644
--- a/src/ripple/core/JobCoro.ipp
+++ b/src/ripple/core/Coro.ipp
@@ -17,13 +17,14 @@
*/
//==============================================================================
-#ifndef RIPPLE_CORE_JOBCOROINL_H_INCLUDED
-#define RIPPLE_CORE_JOBCOROINL_H_INCLUDED
+#ifndef RIPPLE_CORE_COROINL_H_INCLUDED
+#define RIPPLE_CORE_COROINL_H_INCLUDED
namespace ripple {
template
-JobCoro::JobCoro(detail::JobCoro_create_t, JobQueue& jq, JobType type,
+JobQueue::Coro::
+Coro(Coro_create_t, JobQueue& jq, JobType type,
std::string const& name, F&& f)
: jq_(jq)
, type_(type)
@@ -44,14 +45,16 @@ JobCoro::JobCoro(detail::JobCoro_create_t, JobQueue& jq, JobType type,
}
inline
-JobCoro::~JobCoro()
+JobQueue::Coro::
+~Coro()
{
assert(finished_);
}
inline
void
-JobCoro::yield() const
+JobQueue::Coro::
+yield() const
{
{
std::lock_guard lock(jq_.m_mutex);
@@ -62,7 +65,8 @@ JobCoro::yield() const
inline
void
-JobCoro::post()
+JobQueue::Coro::
+post()
{
{
std::lock_guard lk(mutex_run_);
@@ -91,7 +95,8 @@ JobCoro::post()
inline
void
-JobCoro::join()
+JobQueue::Coro::
+join()
{
std::unique_lock lk(mutex_run_);
cv_.wait(lk,
diff --git a/src/ripple/core/JobCoro.h b/src/ripple/core/JobCoro.h
deleted file mode 100644
index 9a762bd65..000000000
--- a/src/ripple/core/JobCoro.h
+++ /dev/null
@@ -1,99 +0,0 @@
-//------------------------------------------------------------------------------
-/*
- This file is part of rippled: https://github.com/ripple/rippled
- Copyright (c) 2012, 2013 Ripple Labs Inc.
-
- Permission to use, copy, modify, and/or distribute this software for any
- purpose with or without fee is hereby granted, provided that the above
- copyright notice and this permission notice appear in all copies.
-
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-*/
-//==============================================================================
-
-#ifndef RIPPLE_CORE_JOBCORO_H_INCLUDED
-#define RIPPLE_CORE_JOBCORO_H_INCLUDED
-
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-
-namespace ripple {
-
-class JobQueue;
-
-namespace detail {
-
-struct JobCoro_create_t { };
-
-} // detail
-
-/** Coroutines must run to completion. */
-class JobCoro : public std::enable_shared_from_this
-{
-private:
- detail::LocalValues lvs_;
- JobQueue& jq_;
- JobType type_;
- std::string name_;
- bool running_;
- std::mutex mutex_;
- std::mutex mutex_run_;
- std::condition_variable cv_;
- boost::coroutines::asymmetric_coroutine::pull_type coro_;
- boost::coroutines::asymmetric_coroutine::push_type* yield_;
-#ifndef NDEBUG
- bool finished_ = false;
-#endif
-
-public:
- // Private: Used in the implementation
- template
- JobCoro(detail::JobCoro_create_t, JobQueue&, JobType,
- std::string const&, F&&);
-
- // Not copy-constructible or assignable
- JobCoro(JobCoro const&) = delete;
- JobCoro& operator= (JobCoro const&) = delete;
-
- ~JobCoro();
-
- /** Suspend coroutine execution.
- Effects:
- The coroutine's stack is saved.
- The associated Job thread is released.
- Note:
- The associated Job function returns.
- Undefined behavior if called consecutively without a corresponding post.
- */
- void yield() const;
-
- /** Schedule coroutine execution.
- Effects:
- Returns immediately.
- A new job is scheduled to resume the execution of the coroutine.
- When the job runs, the coroutine's stack is restored and execution
- continues at the beginning of coroutine function or the statement
- after the previous call to yield.
- Undefined behavior if called consecutively without a corresponding yield.
- */
- void post();
-
- /** Waits until coroutine returns from the user function. */
- void join();
-};
-
-} // ripple
-
-#endif
diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h
index 9d3596b74..c7d590e74 100644
--- a/src/ripple/core/JobQueue.h
+++ b/src/ripple/core/JobQueue.h
@@ -20,26 +20,31 @@
#ifndef RIPPLE_CORE_JOBQUEUE_H_INCLUDED
#define RIPPLE_CORE_JOBQUEUE_H_INCLUDED
+#include
+#include
+#include
#include
#include
-#include
#include
#include
#include
#include
+#include
#include
#include
+#include
#include
#include
namespace ripple {
class Logs;
+struct Coro_create_t {};
/** A pool of threads to perform work.
A job posted will always run to completion.
-
+
Coroutines that are suspended must be resumed,
and run to completion.
@@ -51,6 +56,61 @@ class JobQueue
, private Workers::Callback
{
public:
+ /** Coroutines must run to completion. */
+ class Coro : public std::enable_shared_from_this
+ {
+ private:
+ detail::LocalValues lvs_;
+ JobQueue& jq_;
+ JobType type_;
+ std::string name_;
+ bool running_;
+ std::mutex mutex_;
+ std::mutex mutex_run_;
+ std::condition_variable cv_;
+ boost::coroutines::asymmetric_coroutine::pull_type coro_;
+ boost::coroutines::asymmetric_coroutine::push_type* yield_;
+ #ifndef NDEBUG
+ bool finished_ = false;
+ #endif
+
+ public:
+ // Private: Used in the implementation
+ template
+ Coro(Coro_create_t, JobQueue&, JobType,
+ std::string const&, F&&);
+
+ // Not copy-constructible or assignable
+ Coro(Coro const&) = delete;
+ Coro& operator= (Coro const&) = delete;
+
+ ~Coro();
+
+ /** Suspend coroutine execution.
+ Effects:
+ The coroutine's stack is saved.
+ The associated Job thread is released.
+ Note:
+ The associated Job function returns.
+ Undefined behavior if called consecutively without a corresponding post.
+ */
+ void yield() const;
+
+ /** Schedule coroutine execution.
+ Effects:
+ Returns immediately.
+ A new job is scheduled to resume the execution of the coroutine.
+ When the job runs, the coroutine's stack is restored and execution
+ continues at the beginning of coroutine function or the statement
+ after the previous call to yield.
+ Undefined behavior if called consecutively without a corresponding yield.
+ */
+ void post();
+
+ /** Waits until coroutine returns from the user function. */
+ void join();
+ };
+
using JobFunction = std::function ;
JobQueue (beast::insight::Collector::ptr const& collector,
@@ -63,7 +123,7 @@ public:
@param t The type of job.
@param name Name of the job.
- @param f Has a signature of void(std::shared_ptr). Called when the job executes.
+ @param f Has a signature of void(std::shared_ptr). Called when the job executes.
*/
template
void postCoro (JobType t, std::string const& name, F&& f);
@@ -111,7 +171,7 @@ public:
rendezvous();
private:
- friend class JobCoro;
+ friend class Coro;
using JobDataMap = std::map ;
@@ -235,7 +295,7 @@ private:
point. This frees up the handler thread and allows it to continue handling
other requests while the RPC command completes its work asynchronously.
- postCoro() creates a JobCoro object. When the JobCoro ctor is called, and its
+ postCoro() creates a Coro object. When the Coro ctor is called, and its
coro_ member is initialized(a boost::coroutines::pull_type), execution
automatically passes to the coroutine, which we don't want at this point,
since we are still in the handler thread context. It's important to note here
@@ -243,21 +303,21 @@ private:
coroutine. A pull_type object automatically generates a push_type that is
used as the as a parameter(do_yield) in the signature of the function the
pull_type was created with. This function is immediately called during coro_
- construction and within it, JobCoro::yield_ is assigned the push_type
+ construction and within it, Coro::yield_ is assigned the push_type
parameter(do_yield) address and called(yield()) so we can return execution
back to the caller's stack.
- postCoro() then calls JobCoro::post(), which schedules a job on the job
+ postCoro() then calls Coro::post(), which schedules a job on the job
queue to continue execution of the coroutine in a JobQueue worker thread at
- some later time. When the job runs, we lock on the JobCoro::mutex_ and call
+ some later time. When the job runs, we lock on the Coro::mutex_ and call
coro_ which continues where we had left off. Since we the last thing we did
in coro_ was call yield(), the next thing we continue with is calling the
- function param f, that was passed into JobCoro ctor. It is within this
+ function param f, that was passed into Coro ctor. It is within this
function body that the caller specifies what he would like to do while
running in the coroutine and allow them to suspend and resume execution.
A task that relies on other events to complete, such as path finding, calls
- JobCoro::yield() to suspend its execution while waiting on those events to
- complete and continue when signaled via the JobCoro::post() method.
+ Coro::yield() to suspend its execution while waiting on those events to
+ complete and continue when signaled via the Coro::post() method.
There is a potential race condition that exists here where post() can get
called before yield() after f is called. Technically the problem only occurs
@@ -288,7 +348,7 @@ private:
} // ripple
-#include
+#include
namespace ripple {
@@ -297,10 +357,10 @@ void JobQueue::postCoro (JobType t, std::string const& name, F&& f)
{
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
- void(std::shared_ptr).
+ void(std::shared_ptr).
*/
- auto const coro = std::make_shared(
- detail::JobCoro_create_t{}, *this, t, name, std::forward(f));
+ auto const coro = std::make_shared(
+ Coro_create_t{}, *this, t, name, std::forward(f));
coro->post();
}
diff --git a/src/ripple/rpc/Context.h b/src/ripple/rpc/Context.h
index 4838b995a..d545dcd4b 100644
--- a/src/ripple/rpc/Context.h
+++ b/src/ripple/rpc/Context.h
@@ -21,7 +21,7 @@
#define RIPPLE_RPC_CONTEXT_H_INCLUDED
#include
-#include
+#include
#include
#include
@@ -55,7 +55,7 @@ struct Context
LedgerMaster& ledgerMaster;
Resource::Consumer& consumer;
Role role;
- std::shared_ptr jobCoro;
+ std::shared_ptr coro;
InfoSub::pointer infoSub;
Headers headers;
};
diff --git a/src/ripple/rpc/handlers/RipplePathFind.cpp b/src/ripple/rpc/handlers/RipplePathFind.cpp
index 59ffdde88..0e59886fe 100644
--- a/src/ripple/rpc/handlers/RipplePathFind.cpp
+++ b/src/ripple/rpc/handlers/RipplePathFind.cpp
@@ -56,11 +56,11 @@ Json::Value doRipplePathFind (RPC::Context& context)
lpLedger = context.ledgerMaster.getClosedLedger();
jvResult = context.app.getPathRequests().makeLegacyPathRequest (
- request, std::bind(&JobCoro::post, context.jobCoro),
+ request, std::bind(&JobQueue::Coro::post, context.coro),
context.consumer, lpLedger, context.params);
if (request)
{
- context.jobCoro->yield();
+ context.coro->yield();
jvResult = request->doStatus (context.params);
}
diff --git a/src/ripple/rpc/impl/ServerHandlerImp.cpp b/src/ripple/rpc/impl/ServerHandlerImp.cpp
index efd03a5ae..9ba8869e7 100644
--- a/src/ripple/rpc/impl/ServerHandlerImp.cpp
+++ b/src/ripple/rpc/impl/ServerHandlerImp.cpp
@@ -316,9 +316,9 @@ ServerHandlerImp::onRequest (Session& session)
}
m_jobQueue.postCoro(jtCLIENT, "RPC-Client",
- [this, detach = session.detach()](std::shared_ptr jc)
+ [this, detach = session.detach()](std::shared_ptr c)
{
- processSession(detach, jc);
+ processSession(detach, c);
});
}
@@ -358,10 +358,10 @@ ServerHandlerImp::onWSMessage(
m_jobQueue.postCoro(jtCLIENT, "WS-Client",
[this, session = std::move(session),
- jv = std::move(jv)](auto const& jc)
+ jv = std::move(jv)](auto const& c)
{
auto const jr =
- this->processSession(session, jc, jv);
+ this->processSession(session, c, jv);
auto const s = to_string(jr);
auto const n = s.length();
beast::streambuf sb(n);
@@ -392,7 +392,7 @@ ServerHandlerImp::onStopped (Server&)
Json::Value
ServerHandlerImp::processSession(
std::shared_ptr const& session,
- std::shared_ptr const& coro,
+ std::shared_ptr const& coro,
Json::Value const& jv)
{
auto is = std::static_pointer_cast (session->appDefined);
@@ -482,13 +482,13 @@ ServerHandlerImp::processSession(
// Run as a coroutine.
void
ServerHandlerImp::processSession (std::shared_ptr const& session,
- std::shared_ptr jobCoro)
+ std::shared_ptr coro)
{
processRequest (
session->port(), buffers_to_string(
session->request().body.data()),
session->remoteAddress().at_port (0),
- makeOutput (*session), jobCoro,
+ makeOutput (*session), coro,
[&]
{
auto const iter =
@@ -517,7 +517,7 @@ ServerHandlerImp::processSession (std::shared_ptr const& session,
void
ServerHandlerImp::processRequest (Port const& port,
std::string const& request, beast::IP::Endpoint const& remoteIPAddress,
- Output&& output, std::shared_ptr jobCoro,
+ Output&& output, std::shared_ptr coro,
std::string forwardedFor, std::string user)
{
auto rpcJ = app_.journal ("RPC");
@@ -650,7 +650,7 @@ ServerHandlerImp::processRequest (Port const& port,
auto const start (std::chrono::high_resolution_clock::now ());
RPC::Context context {m_journal, params, app_, loadType, m_networkOPs,
- app_.getLedgerMaster(), usage, role, jobCoro, InfoSub::pointer(),
+ app_.getLedgerMaster(), usage, role, coro, InfoSub::pointer(),
{user, forwardedFor}};
Json::Value result;
RPC::doCommand (context, result);
diff --git a/src/ripple/rpc/impl/ServerHandlerImp.h b/src/ripple/rpc/impl/ServerHandlerImp.h
index 45ea0daf9..a94ccda88 100644
--- a/src/ripple/rpc/impl/ServerHandlerImp.h
+++ b/src/ripple/rpc/impl/ServerHandlerImp.h
@@ -20,8 +20,7 @@
#ifndef RIPPLE_RPC_SERVERHANDLERIMP_H_INCLUDED
#define RIPPLE_RPC_SERVERHANDLERIMP_H_INCLUDED
-#include
-#include
+#include
#include
#include
#include
@@ -153,17 +152,17 @@ private:
Json::Value
processSession(
std::shared_ptr const& session,
- std::shared_ptr const& coro,
+ std::shared_ptr const& coro,
Json::Value const& jv);
void
processSession (std::shared_ptr const&,
- std::shared_ptr jobCoro);
+ std::shared_ptr coro);
void
processRequest (Port const& port, std::string const& request,
beast::IP::Endpoint const& remoteIPAddress, Output&&,
- std::shared_ptr jobCoro,
+ std::shared_ptr coro,
std::string forwardedFor, std::string user);
Handoff
diff --git a/src/ripple/websocket/Connection.h b/src/ripple/websocket/Connection.h
index 696e977a9..307980566 100644
--- a/src/ripple/websocket/Connection.h
+++ b/src/ripple/websocket/Connection.h
@@ -24,7 +24,7 @@
#include
#include
#include
-#include
+#include
#include
#include
#include
@@ -97,7 +97,7 @@ public:
boost::optional getMessage ();
bool checkMessage ();
Json::Value invokeCommand (Json::Value const& jvRequest,
- std::shared_ptr jobCoro);
+ std::shared_ptr coro);
// Generically implemented per version.
void setPingTimer ();
@@ -243,7 +243,7 @@ ConnectionImpl ::getMessage ()
template
Json::Value ConnectionImpl ::invokeCommand (
- Json::Value const& jvRequest, std::shared_ptr jobCoro)
+ Json::Value const& jvRequest, std::shared_ptr coro)
{
if (getConsumer().disconnect ())
{
@@ -287,7 +287,7 @@ Json::Value ConnectionImpl ::invokeCommand (
{
RPC::Context context {app_.journal ("RPCHandler"), jvRequest,
app_, loadType, m_netOPs, app_.getLedgerMaster(), getConsumer(),
- role, jobCoro, this->shared_from_this(),
+ role, coro, this->shared_from_this(),
{m_user, m_forwardedFor}};
RPC::doCommand (context, jvResult[jss::result]);
}
diff --git a/src/ripple/websocket/Handler.h b/src/ripple/websocket/Handler.h
index cb5f70594..5efb86871 100644
--- a/src/ripple/websocket/Handler.h
+++ b/src/ripple/websocket/Handler.h
@@ -308,9 +308,9 @@ public:
{
app_.getJobQueue().postCoro(jtCLIENT, "WSClient",
[this, cpClient]
- (auto const& jc)
+ (auto const& c)
{
- this->do_messages(jc, cpClient);
+ this->do_messages(c, cpClient);
});
}
@@ -348,7 +348,7 @@ public:
message_job("command", cpClient);
}
- void do_messages (std::shared_ptr const& jc,
+ void do_messages (std::shared_ptr const& c,
connection_ptr const& cpClient)
{
wsc_ptr ptr;
@@ -374,7 +374,7 @@ public:
if (! msg)
return;
- do_message(jc, cpClient, ptr, *msg);
+ do_message(c, cpClient, ptr, *msg);
}
if (ptr->checkMessage ())
@@ -382,7 +382,7 @@ public:
}
void
- do_message (std::shared_ptr const& jc,
+ do_message (std::shared_ptr const& c,
const connection_ptr cpClient, wsc_ptr conn,
const std::string& message)
{
@@ -415,7 +415,7 @@ public:
{
using namespace std::chrono;
auto const start = high_resolution_clock::now();
- auto buffer = to_string(conn->invokeCommand(jvRequest, jc));
+ auto buffer = to_string(conn->invokeCommand(jvRequest, c));
rpc_time_.notify (
static_cast (
duration_cast (
diff --git a/src/test/app/Path_test.cpp b/src/test/app/Path_test.cpp
index f4a562e32..0e821b3da 100644
--- a/src/test/app/Path_test.cpp
+++ b/src/test/app/Path_test.cpp
@@ -233,7 +233,7 @@ public:
[&](auto const& coro)
{
context.params = std::move (params);
- context.jobCoro = coro;
+ context.coro = coro;
RPC::doCommand (context, result);
g.signal();
});
@@ -303,7 +303,7 @@ public:
{
context.params = rpf(Account("alice"), Account("bob"),
RPC::Tuning::max_src_cur);
- context.jobCoro = coro;
+ context.coro = coro;
RPC::doCommand(context, result);
g.signal();
});
@@ -316,7 +316,7 @@ public:
{
context.params = rpf(Account("alice"), Account("bob"),
RPC::Tuning::max_src_cur + 1);
- context.jobCoro = coro;
+ context.coro = coro;
RPC::doCommand(context, result);
g.signal();
});
@@ -330,7 +330,7 @@ public:
[&](auto const& coro)
{
context.params = rpf(Account("alice"), Account("bob"), 0);
- context.jobCoro = coro;
+ context.coro = coro;
RPC::doCommand(context, result);
g.signal();
});
@@ -343,7 +343,7 @@ public:
[&](auto const& coro)
{
context.params = rpf(Account("alice"), Account("bob"), 0);
- context.jobCoro = coro;
+ context.coro = coro;
RPC::doCommand(context, result);
g.signal();
});
diff --git a/src/test/core/Coroutine_test.cpp b/src/test/core/Coroutine_test.cpp
index 0e8c110f4..05f3cb504 100644
--- a/src/test/core/Coroutine_test.cpp
+++ b/src/test/core/Coroutine_test.cpp
@@ -68,18 +68,18 @@ public:
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, false);
gate g1, g2;
- std::shared_ptr jc;
+ std::shared_ptr c;
jq.postCoro(jtCLIENT, "Coroutine-Test",
- [&](auto const& jcr)
+ [&](auto const& cr)
{
- jc = jcr;
+ c = cr;
g1.signal();
- jc->yield();
+ c->yield();
g2.signal();
});
BEAST_EXPECT(g1.wait_for(5s));
- jc->join();
- jc->post();
+ c->join();
+ c->post();
BEAST_EXPECT(g2.wait_for(5s));
}
@@ -93,10 +93,10 @@ public:
jq.setThreadCount(0, false);
gate g;
jq.postCoro(jtCLIENT, "Coroutine-Test",
- [&](auto const& jc)
+ [&](auto const& c)
{
- jc->post();
- jc->yield();
+ c->post();
+ c->yield();
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
@@ -111,7 +111,7 @@ public:
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, true);
static int const N = 4;
- std::array, N> a;
+ std::array, N> a;
LocalValue lv(-1);
BEAST_EXPECT(*lv == -1);
@@ -131,33 +131,33 @@ public:
for(int i = 0; i < N; ++i)
{
jq.postCoro(jtCLIENT, "Coroutine-Test",
- [&, id = i](auto const& jc)
+ [&, id = i](auto const& c)
{
- a[id] = jc;
+ a[id] = c;
g.signal();
- jc->yield();
+ c->yield();
this->BEAST_EXPECT(*lv == -1);
*lv = id;
this->BEAST_EXPECT(*lv == id);
g.signal();
- jc->yield();
+ c->yield();
this->BEAST_EXPECT(*lv == id);
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}
- for(auto const& jc : a)
+ for(auto const& c : a)
{
- jc->post();
+ c->post();
BEAST_EXPECT(g.wait_for(5s));
- jc->join();
+ c->join();
}
- for(auto const& jc : a)
+ for(auto const& c : a)
{
- jc->post();
- jc->join();
+ c->post();
+ c->join();
}
jq.addJob(jtCLIENT, "LocalValue-Test",