Better suspend for continuation

This commit is contained in:
Tom Ritchford
2015-07-28 19:04:22 -04:00
committed by Nik Bougalis
parent b0a855a10e
commit 545b2fd6b1
13 changed files with 205 additions and 179 deletions

View File

@@ -42,20 +42,10 @@ struct Context
LedgerMaster& ledgerMaster;
Role role;
InfoSub::pointer infoSub;
Suspend suspend;
Callback yield;
JobQueueSuspender suspend;
NodeStore::ScopedMetrics metrics;
};
inline
void suspend(Context const& context, Continuation const& continuation)
{
if (context.suspend)
context.suspend(continuation);
else
continuation(doNothingCallback);
}
} // RPC
} // ripple

View File

@@ -25,27 +25,15 @@
namespace ripple {
namespace RPC {
/** SuspendCallback: a function that a Coroutine gives to the coroutine
scheduler so that it gets a callback with a Suspend when it runs.
*/
using SuspendCallback = std::function <void (Suspend const&)>;
/** Coroutine is a function that is given to the coroutine scheduler which
later gets called with a Suspend. A Coroutine can't be empty. */
using Coroutine = std::function <void (Suspend const&)>;
/** Runs a function that takes a SuspendCallback as a coroutine. */
class Coroutine
{
public:
explicit Coroutine (SuspendCallback const&);
~Coroutine();
/** Run as a coroutine. */
void runOnCoroutine(Coroutine const&);
/** Run the coroutine and guarantee completion. */
void run ();
private:
struct Impl;
std::shared_ptr <Impl> impl_;
Coroutine (std::shared_ptr <Impl> const&);
};
/** Run as coroutine if UseCoroutines::yes, otherwise run immediately. */
void runOnCoroutine(UseCoroutines, Coroutine const&);
} // RPC
} // ripple

View File

@@ -20,7 +20,7 @@ namespace:
using Callback = std::function <void ()>;
using Continuation = std::function <void (Callback const&)>;
using Suspend = std::function <void (Continuation const&)>;
using SuspendCallback = std::function <void (Suspend const&)>;
using Coroutine = std::function <void (Suspend const&)>;
A `Callback` is a generic 0-argument function. A given `Callback` might or might
not block. Unless otherwise advised, do not hold locks or any resource that
@@ -36,7 +36,7 @@ A `Suspend` is a function belonging to a `Coroutine`. A `Suspend` runs a
`Continuation`, passing it a `Callback` that continues execution of the
`Coroutine`.
And finally, a `SuspendCallback` is a `std::function` which is given a
And finally, a `Coroutine` is a `std::function` which is given a
`Suspend`. This is what the RPC handler gives to the coroutine manager,
expecting to get called back with a `Suspend` and to be able to start execution.
@@ -47,10 +47,10 @@ straight-forward.
1. The instance of `ServerHandler` receives an RPC request.
2. It creates a `SuspendCallback` and gives it to the coroutine manager.
2. It creates a `Coroutine` and gives it to the coroutine manager.
3. The coroutine manager creates a `Coroutine`, starts it up, and then calls
the `SuspendCallback` with a `Suspend`.
the `Coroutine` with a `Suspend`.
4. Now the RPC response starts to be calculated.

View File

@@ -28,6 +28,7 @@
namespace ripple {
class BasicConfig;
class JobQueue;
class Section;
@@ -37,19 +38,21 @@ namespace RPC {
the RPC yield mechanism works.
*/
/** Callback: do something and eventually return. */
/** Callback: do something and eventually return. Can't be empty. */
using Callback = std::function <void ()>;
/** A non-empty callback that does nothing. */
static
Callback const doNothingCallback ([] () {});
/** Continuation: do something, guarantee to eventually call Callback. */
/** Continuation: do something, guarantee to eventually call Callback.
Can't be empty. */
using Continuation = std::function <void (Callback const&)>;
/** Suspend: suspend execution, pending completion of a Continuation. */
/** Suspend: suspend execution, pending completion of a Continuation.
Can't be empty. */
using Suspend = std::function <void (Continuation const&)>;
/** A non-empty Suspend that immediately calls its callback. */
extern
Suspend const dontSuspend;
/** Wrap an Output so it yields after approximately `chunkSize` bytes.
chunkedYieldingOutput() only yields after a call to output(), so there might
@@ -75,11 +78,12 @@ private:
Callback const yield_;
};
enum class UseCoroutines {no, yes};
/** When do we yield when performing a ledger computation? */
struct YieldStrategy
{
enum class Streaming {no, yes};
enum class UseCoroutines {no, yes};
/** Is the data streamed, or generated monolithically? */
Streaming streaming = Streaming::no;
@@ -88,10 +92,6 @@ struct YieldStrategy
never yield. */
UseCoroutines useCoroutines = UseCoroutines::no;
/** How many bytes do we emit before yielding? 0 means "never yield due to
number of bytes sent". */
std::size_t byteYieldCount = 0;
/** How many accounts do we process before yielding? 0 means "never yield
due to number of accounts processed." */
std::size_t accountYieldCount = 0;
@@ -101,23 +101,32 @@ struct YieldStrategy
std::size_t transactionYieldCount = 0;
};
/** Create a yield strategy from a configuration Section. */
YieldStrategy makeYieldStrategy (Section const&);
/** Does a BasicConfig require the use of coroutines? */
UseCoroutines useCoroutines(BasicConfig const&);
/** Return a continuation that runs a Callback on a Job Queue with a given
name and JobType. */
Continuation callbackOnJobQueue (
JobQueue&, std::string const& jobName, JobType);
/** Create a yield strategy from a BasicConfig. */
YieldStrategy makeYieldStrategy(BasicConfig const&);
/** Return a Callback that will suspend and then run a continuation. */
inline
Callback suspendForContinuation (
Suspend const& suspend, Continuation const& continuation)
/** JobQueueSuspender is a suspend, with a yield that reschedules the job
on the job queue. */
struct JobQueueSuspender
{
return suspend
? Callback ([=] () { suspend (continuation); })
: Callback ([=] () { continuation (doNothingCallback); });
}
/** Possibly suspend current execution. */
Suspend const suspend;
/** Possibly yield and restart on the job queue. */
Callback const yield;
/** Create a JobQueueSuspender where yield does nothing and the suspend
immediately executes the continuation. */
JobQueueSuspender();
/** Create a JobQueueSuspender with a Suspend.
When yield is called, it reschedules the current job on the JobQueue
with the given jobName. */
JobQueueSuspender(Suspend const&, std::string const& jobName);
};
} // RPC
} // ripple

View File

@@ -29,6 +29,7 @@
#include <ripple/protocol/JsonFields.h>
#include <ripple/rpc/Context.h>
#include <ripple/rpc/Status.h>
#include <ripple/rpc/Yield.h>
#include <ripple/rpc/impl/Handler.h>
#include <ripple/server/Role.h>
@@ -86,15 +87,15 @@ private:
template <class Object>
void LedgerHandler::writeResult (Object& value)
{
auto& yield = context_.suspend.yield;
if (ledger_)
{
Json::copyFrom (value, result_);
addJson (value, {*ledger_, options_, context_.yield});
addJson (value, {*ledger_, options_, yield});
}
else
{
auto& master = getApp().getLedgerMaster ();
auto& yield = context_.yield;
{
auto&& closed = Json::addObject (value, jss::closed);
addJson (closed, {*master.getClosedLedger(), 0, yield});

View File

@@ -82,8 +82,7 @@ Json::Value doRipplePathFind (RPC::Context& context)
Json::Value jvResult;
if (! context.suspend ||
getConfig().RUN_STANDALONE ||
if (getConfig().RUN_STANDALONE ||
context.params.isMember(jss::ledger) ||
context.params.isMember(jss::ledger_index) ||
context.params.isMember(jss::ledger_hash))
@@ -105,15 +104,13 @@ Json::Value doRipplePathFind (RPC::Context& context)
lpLedger = context.ledgerMaster.getClosedLedger();
PathRequest::pointer request;
suspend(context,
[&request, &context, &jvResult, &lpLedger]
(RPC::Callback const& callback)
context.suspend.suspend(
[&request, &context, &jvResult, &lpLedger]
(RPC::Callback const& callback)
{
jvResult = getApp().getPathRequests().makeLegacyPathRequest (
request, callback, lpLedger, context.params);
assert(callback);
if (! request && callback)
callback();
callback();
});
if (request)

View File

@@ -24,60 +24,56 @@
namespace ripple {
namespace RPC {
namespace {
using CoroutineType = Continuation;
using CoroutinePull = boost::coroutines::coroutine <CoroutineType>::pull_type;
using CoroutinePush = boost::coroutines::coroutine <CoroutineType>::push_type;
using BoostCoroutine = boost::coroutines::asymmetric_coroutine<CoroutineType>;
using Pull = BoostCoroutine::pull_type;
using Push = BoostCoroutine::push_type;
struct Coroutine::Impl : public std::enable_shared_from_this <Coroutine::Impl>
void runOnCoroutineImpl(std::shared_ptr<Pull> pull)
{
Impl (CoroutinePull&& pull_) : pull (std::move (pull_))
while (*pull)
{
}
(*pull)();
CoroutinePull pull;
if (! *pull)
return;
void run()
{
while (pull)
if (auto continuation = pull->get())
{
pull();
if (! pull)
return;
if (auto continuation = pull.get())
{
auto that = shared_from_this();
continuation ([that] () { that->run(); });
return;
}
continuation ([pull] () { runOnCoroutineImpl(pull); });
return;
}
}
};
Coroutine::Coroutine (SuspendCallback const& suspendCallback)
{
CoroutinePull pull ([suspendCallback] (CoroutinePush& push)
{
Suspend suspend = [&push] (CoroutineType const& cbc) {
push (cbc);
};
suspend ({});
suspendCallback (suspend);
});
impl_ = std::make_shared<Impl> (std::move (pull));
}
Coroutine::~Coroutine() = default;
} // namespace
void Coroutine::run()
void runOnCoroutine(Coroutine const& coroutine)
{
assert (impl_);
if (impl_)
impl_->run();
impl_.reset();
auto pullFunction = [coroutine] (Push& push) {
Suspend suspend = [&push] (CoroutineType const& cbc) {
if (push)
push (cbc);
};
// Run once doing nothing, to get the other side started.
suspend([] (Callback const& callback) { callback(); });
// Now run the coroutine.
coroutine(suspend);
};
runOnCoroutineImpl(std::make_shared<Pull>(pullFunction));
}
void runOnCoroutine(UseCoroutines useCoroutines, Coroutine const& coroutine)
{
if (useCoroutines == UseCoroutines::yes)
runOnCoroutine(coroutine);
else
coroutine(dontSuspend);
}
} // RPC

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include <BeastConfig.h>
#include <ripple/app/main/Application.h>
#include <ripple/basics/BasicConfig.h>
#include <ripple/rpc/Yield.h>
#include <ripple/rpc/tests/TestOutputSuite.test.h>
@@ -25,6 +26,34 @@
namespace ripple {
namespace RPC {
static
UseCoroutines defaultUseCoroutines = UseCoroutines::no;
Suspend const dontSuspend = [] (Continuation const& continuation)
{
continuation([] () {});
};
namespace {
void runOnJobQueue(std::string const& name, Callback const& callback)
{
boost::function <void (Job&)> cb([callback] (Job&) { callback(); });
getApp().getJobQueue().addJob(jtCLIENT, name, cb);
};
Callback suspendForJobQueue(Suspend const& suspend, std::string const& jobName)
{
assert(suspend);
return Callback( [suspend, jobName] () {
suspend([jobName] (Callback const& callback) {
runOnJobQueue(jobName, callback);
});
});
}
} // namespace
Json::Output chunkedYieldingOutput (
Json::Output const& output, Callback const& yield, std::size_t chunkSize)
{
@@ -44,7 +73,6 @@ Json::Output chunkedYieldingOutput (
};
}
CountedYield::CountedYield (std::size_t yieldCount, Callback const& yield)
: yieldCount_ (yieldCount), yield_ (yield)
{
@@ -52,10 +80,7 @@ CountedYield::CountedYield (std::size_t yieldCount, Callback const& yield)
void CountedYield::yield()
{
if (!yield_)
return;
if (yieldCount_)
if (yieldCount_ && yield_)
{
if (++count_ >= yieldCount_)
{
@@ -65,28 +90,38 @@ void CountedYield::yield()
}
}
YieldStrategy makeYieldStrategy (Section const& s)
UseCoroutines useCoroutines(BasicConfig const& config)
{
if (auto use = config["section"].get<bool>("use_coroutines"))
return *use ? UseCoroutines::yes : UseCoroutines::no;
return defaultUseCoroutines;
}
YieldStrategy makeYieldStrategy (BasicConfig const& config)
{
auto s = config["section"];
YieldStrategy ys;
ys.streaming = get<bool> (s, "streaming") ?
YieldStrategy::Streaming::yes :
YieldStrategy::Streaming::no;
ys.useCoroutines = get<bool> (s, "use_coroutines") ?
YieldStrategy::UseCoroutines::yes :
YieldStrategy::UseCoroutines::no;
ys.byteYieldCount = get<std::size_t> (s, "byte_yield_count");
ys.useCoroutines = useCoroutines(config);
ys.accountYieldCount = get<std::size_t> (s, "account_yield_count");
ys.transactionYieldCount = get<std::size_t> (s, "transaction_yield_count");
return ys;
}
Continuation callbackOnJobQueue (
JobQueue& jobQueue, std::string const& name, JobType jobType)
JobQueueSuspender::JobQueueSuspender(
Suspend const& susp, std::string const& jobName)
: suspend(susp ? susp : dontSuspend),
yield(suspendForJobQueue(suspend, jobName))
{
// There's a non-empty jobName exactly if there's a non-empty Suspend.
assert(!(susp && jobName.empty()));
}
JobQueueSuspender::JobQueueSuspender() : JobQueueSuspender({}, {})
{
return Continuation ([name, jobType, &jobQueue] (Callback const& cb) {
jobQueue.addJob (jobType, name, [cb] (Job&) { cb(); });
});
}
} // RPC

View File

@@ -46,10 +46,9 @@ public:
};
Strings result;
SuspendCallback suspendCallback ([&] (Suspend const& suspend)
Coroutine coroutine ([&] (Suspend const& suspend)
{
Callback yield = suspendForContinuation (
suspend, makeContinuation ("*"));
Callback yield ([=] () { suspend (makeContinuation ("*")); });
auto out = chunkedYieldingOutput (output, yield, chunkSize);
out ("hello ");
result.push_back (buffer);
@@ -70,7 +69,7 @@ public:
result.push_back (buffer);
});
Coroutine (suspendCallback).run();
runOnCoroutine(UseCoroutines::yes, coroutine);
expectCollectionEquals (result, expected);
}

View File

@@ -63,8 +63,7 @@ ServerHandlerImp::ServerHandlerImp (Stoppable& parent,
, m_networkOPs (networkOPs)
, m_server (HTTP::make_Server(
*this, io_service, deprecatedLogs().journal("Server")))
, m_continuation (RPC::callbackOnJobQueue (
jobQueue, "RPC-Coroutine", jtCLIENT))
, m_jobQueue (jobQueue)
{
auto const& group (cm.group ("rpc"));
rpc_requests_ = group->make_counter ("requests");
@@ -180,24 +179,15 @@ ServerHandlerImp::onRequest (HTTP::Session& session)
auto detach = session.detach();
if (setup_.yieldStrategy.useCoroutines ==
RPC::YieldStrategy::UseCoroutines::yes)
{
RPC::SuspendCallback suspend (
// We can copy `this` because ServerHandlerImp is a long-lasting singleton.
auto job = [this, detach] (Job&) {
RPC::runOnCoroutine(
setup_.yieldStrategy.useCoroutines,
[this, detach] (RPC::Suspend const& suspend) {
processSession (detach, suspend);
processSession(detach, suspend);
});
RPC::Coroutine coroutine (suspend);
coroutine.run();
}
else
{
getApp().getJobQueue().addJob (
jtCLIENT, "RPC-Client",
[=] (Job&) {
processSession (detach, RPC::Suspend());
});
}
};
m_jobQueue.addJob(jtCLIENT, "RPC-Client", job);
}
void
@@ -240,15 +230,9 @@ ServerHandlerImp::processRequest (
Output&& output,
Suspend const& suspend)
{
auto yield = RPC::suspendForContinuation (suspend, m_continuation);
// Move off the webserver thread onto the JobQueue.
yield();
assert (getApp().getJobQueue().getJobForThread());
if (auto count = setup_.yieldStrategy.byteYieldCount)
output = RPC::chunkedYieldingOutput (std::move (output), yield, count);
Json::Value jsonRPC;
{
Json::Reader reader;
@@ -267,7 +251,6 @@ ServerHandlerImp::processRequest (
// VFALCO NOTE Except that "id" isn't included in the following errors.
//
Json::Value const& id = jsonRPC ["id"];
Json::Value const& method = jsonRPC ["method"];
if (! method) {
@@ -365,9 +348,11 @@ ServerHandlerImp::processRequest (
<< "doRpcCommand:" << strMethod << ":" << params;
auto const start (std::chrono::high_resolution_clock::now ());
RPC::Context context {
params, loadType, m_networkOPs, getApp().getLedgerMaster(), role,
nullptr, std::move (suspend), std::move (yield)};
nullptr, {suspend, "RPC-Coroutine"}};
std::string response;
if (setup_.yieldStrategy.streaming == RPC::YieldStrategy::Streaming::yes)
@@ -755,11 +740,12 @@ setup_Overlay (ServerHandler::Setup& setup)
}
ServerHandler::Setup
setup_ServerHandler (BasicConfig const& config, std::ostream& log)
setup_ServerHandler(BasicConfig const& config, std::ostream& log)
{
ServerHandler::Setup setup;
setup.ports = detail::parse_Ports (config, log);
setup.yieldStrategy = RPC::makeYieldStrategy (config["server"]);
setup.ports = detail::parse_Ports(config, log);
setup.yieldStrategy = RPC::makeYieldStrategy(config);
detail::setup_Client(setup);
detail::setup_Overlay(setup);

View File

@@ -39,8 +39,8 @@ private:
beast::Journal m_journal;
NetworkOPs& m_networkOPs;
std::unique_ptr<HTTP::Server> m_server;
RPC::Continuation m_continuation;
Setup setup_;
JobQueue& m_jobQueue;
beast::insight::Counter rpc_requests_;
beast::insight::Event rpc_io_;
beast::insight::Event rpc_size_;

View File

@@ -24,7 +24,6 @@
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/CountedObject.h>
#include <ripple/basics/Log.h>
#include <ripple/core/Config.h>
#include <ripple/json/to_string.h>
#include <ripple/net/InfoSub.h>
#include <ripple/net/RPCErr.h>
@@ -32,10 +31,12 @@
#include <ripple/protocol/JsonFields.h>
#include <ripple/resource/Fees.h>
#include <ripple/resource/ResourceManager.h>
#include <ripple/rpc/Coroutine.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/server/Port.h>
#include <ripple/json/to_string.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/rpc/Yield.h>
#include <ripple/server/Role.h>
#include <ripple/websocket/WebSocket.h>
@@ -94,7 +95,7 @@ public:
message_ptr getMessage ();
bool checkMessage ();
void returnMessage (message_ptr const&);
Json::Value invokeCommand (Json::Value& jvRequest);
Json::Value invokeCommand (Json::Value const& jvRequest, RPC::Suspend const&);
// Generically implemented per version.
void setPingTimer ();
@@ -226,7 +227,8 @@ void ConnectionImpl <WebSocket>::returnMessage (message_ptr const& ptr)
}
template <class WebSocket>
Json::Value ConnectionImpl <WebSocket>::invokeCommand (Json::Value& jvRequest)
Json::Value ConnectionImpl <WebSocket>::invokeCommand (
Json::Value const& jvRequest, RPC::Suspend const& suspend)
{
if (getConsumer().disconnect ())
{
@@ -269,7 +271,8 @@ Json::Value ConnectionImpl <WebSocket>::invokeCommand (Json::Value& jvRequest)
{
RPC::Context context {
jvRequest, loadType, m_netOPs, getApp().getLedgerMaster(), role,
std::dynamic_pointer_cast<InfoSub> (this->shared_from_this ())};
this->shared_from_this (),
{suspend, "WSClient::command"}};
RPC::doCommand (context, jvResult[jss::result]);
}

View File

@@ -56,6 +56,7 @@ beast::IP::Endpoint makeBeastEndpoint (beast::IP::Endpoint const& e)
return e;
}
template <class WebSocket>
class HandlerImpl
: public WebSocket::Handler
@@ -293,7 +294,14 @@ public:
getApp().getJobQueue ().addJob (
jtCLIENT,
"WSClient::destroy",
std::bind (&ConnectionImpl <WebSocket>::destroy, ptr));
[ptr] (Job&) { ConnectionImpl <WebSocket>::destroy(ptr); });
}
void message_job(std::string const& name,
connection_ptr const& cpClient)
{
auto msgs = [this, cpClient] (Job& j) { do_messages(j, cpClient); };
getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::" + name, msgs);
}
void on_message (connection_ptr cpClient, message_ptr mpMessage) override
@@ -327,9 +335,7 @@ public:
}
if (bRunQ)
getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::command",
std::bind (&HandlerImpl <WebSocket>::do_messages,
this, std::placeholders::_1, cpClient));
message_job("command", cpClient);
}
void do_messages (Job& job, connection_ptr const& cpClient)
@@ -364,17 +370,14 @@ public:
}
if (ptr->checkMessage ())
getApp().getJobQueue ().addJob (
jtCLIENT, "WSClient::more",
std::bind (&HandlerImpl <WebSocket>::do_messages, this,
std::placeholders::_1, cpClient));
message_job("more", cpClient);
}
bool do_message (Job& job, const connection_ptr& cpClient,
const wsc_ptr& conn, const message_ptr& mpMessage)
{
Json::Value jvRequest;
Json::Reader jrReader;
Json::Value jvRequest;
Json::Reader jrReader;
try
{
@@ -414,19 +417,38 @@ public:
{
Json::Value& jCmd = jvRequest[jss::command];
if (jCmd.isString())
job.rename (std::string ("WSClient::") + jCmd.asString());
job.rename ("WSClient::" + jCmd.asString());
}
auto const start (std::chrono::high_resolution_clock::now ());
Json::Value const jvObj (conn->invokeCommand (jvRequest));
std::string const buffer (to_string (jvObj));
auto const start = std::chrono::high_resolution_clock::now ();
struct HandlerCoroutineData
{
Json::Value jvRequest;
std::string buffer;
wsc_ptr conn;
};
auto data = std::make_shared<HandlerCoroutineData>();
data->jvRequest = std::move(jvRequest);
data->conn = conn;
auto coroutine = [data] (RPC::Suspend const& suspend) {
data->buffer = to_string(
data->conn->invokeCommand(data->jvRequest, suspend));
};
static auto const disableWebsocketsCoroutines = true;
auto useCoroutines = disableWebsocketsCoroutines ?
RPC::UseCoroutines::no : RPC::useCoroutines(desc_.config);
runOnCoroutine(useCoroutines, coroutine);
rpc_time_.notify (static_cast <beast::insight::Event::value_type> (
std::chrono::duration_cast <std::chrono::milliseconds> (
std::chrono::high_resolution_clock::now () - start)));
++rpc_requests_;
rpc_size_.notify (static_cast <beast::insight::Event::value_type>
(buffer.size ()));
send (cpClient, buffer, false);
(data->buffer.size()));
send (cpClient, data->buffer, false);
}
return true;