Allow RPC yield to be a continuation.

This commit is contained in:
Tom Ritchford
2015-05-26 23:17:24 -04:00
committed by Nik Bougalis
parent b2b0377717
commit b8526f7894
10 changed files with 311 additions and 108 deletions

View File

@@ -35,7 +35,7 @@ struct LedgerFill
{
LedgerFill (Ledger& l,
int o = 0,
RPC::Yield const& y = {},
RPC::Callback const& y = {},
RPC::YieldStrategy const& ys = {})
: ledger (l),
options (o),
@@ -49,7 +49,7 @@ struct LedgerFill
Ledger& ledger;
int options;
RPC::Yield yield;
RPC::Callback yield;
RPC::YieldStrategy yieldStrategy;
};

View File

@@ -40,7 +40,8 @@ struct Context
NetworkOPs& netOps;
Role role;
InfoSub::pointer infoSub;
RPC::Yield yield;
Suspend suspend;
Callback yield;
NodeStore::ScopedMetrics metrics;
};

View File

@@ -25,27 +25,26 @@
namespace ripple {
namespace RPC {
/** Runs a function that takes a yield as a coroutine. */
/** SuspendCallback: a function that a Coroutine gives to the coroutine
scheduler so that it gets a callback with a Suspend when it runs.
*/
using SuspendCallback = std::function <void (Suspend const&)>;
/** Runs a function that takes a SuspendCallback as a coroutine. */
class Coroutine
{
public:
using YieldFunction = std::function <void (Yield const&)>;
explicit Coroutine (YieldFunction const&);
explicit Coroutine (SuspendCallback const&);
~Coroutine();
/** Is the coroutine finished? */
operator bool() const;
/** Run one more step of the coroutine. */
void operator()() const;
/** Run the coroutine and guarantee completion. */
void run ();
private:
struct Impl;
class Impl;
std::shared_ptr <Impl> impl_;
std::shared_ptr<Impl> impl_;
// We'd prefer to use std::unique_ptr here, but unfortunately, in C++11
// move semantics don't work well with `std::bind` or lambdas.
Coroutine (std::shared_ptr <Impl> const&);
};
} // RPC

View File

@@ -1,3 +1,72 @@
# RPC
# How to use RPC coroutines.
New code to generalize the operation of RPC commands
## Introduction.
By default, an RPC handler runs as an uninterrupted task on the JobQueue. This
is fine for commands that are fast to compute but might not be acceptable for
tasks that require multiple parts or are large, like a full ledger.
For this purpose, the rippled RPC handler allows *suspension with continuation*
- a request to suspend execution of the RPC response and to continue it after
some function or job has been executed. A default continuation is supplied
which simply reschedules the job on the JobQueue, or the programmer can supply
their own.
## The classes.
Suspension with continuation uses four `std::function`s in the `ripple::RPC`
namespace:
using Callback = std::function <void ()>;
using Continuation = std::function <void (Callback const&)>;
using Suspend = std::function <void (Continuation const&)>;
using SuspendCallback = std::function <void (Suspend const&)>;
A `Callback` is a generic 0-argument function. A given `Callback` might or might
not block. Unless otherwise advised, do not hold locks or any resource that
would prevent any other task from making forward progress when you call a
`Callback`.
A `Continuation` is a function that is given a `Callback` and promises to call
it later. A `Continuation` guarantees to call the `Callback` exactly once at
some point in the future, but it does not have to be immediately or even in the
current thread.
A `Suspend` is a function belonging to a `Coroutine`. A `Suspend` runs a
`Continuation`, passing it a `Callback` that continues execution of the
`Coroutine`.
And finally, a `SuspendCallback` is a `std::function` which is given a
`Suspend`. This is what the RPC handler gives to the coroutine manager,
expecting to get called back with a `Suspend` and to be able to start execution.
## The flow of control.
Given these functions, the flow of RPC control when using coroutines is
straight-forward.
1. The instance of `ServerHandler` receives an RPC request.
2. It creates a `SuspendCallback` and gives it to the coroutine manager.
3. The coroutine manager creates a `Coroutine`, starts it up, and then calls
the `SuspendCallback` with a `Suspend`.
4. Now the RPC response starts to be calculated.
5. When the RPC handler wants to suspend, it calls the `Suspend` function with
a `Continuation`.
6. Coroutine execution is suspended.
7. The `Continuation` is called with a `Callback` that the coroutine manager
creates.
8. The `Continuation` may choose to execute immediately, defer execution on the
job queue, or wait for some resource to be free.
9. When the `Continuation` is finished, it calls the `Callback` that the
coroutine manager gave it, perhaps a long time ago.
10. This `Callback` continues execution on the suspended `Coroutine` from where
it left off.

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_RPC_YIELD_H_INCLUDED
#define RIPPLE_RPC_YIELD_H_INCLUDED
#include <ripple/core/JobQueue.h>
#include <ripple/json/Output.h>
#include <beast/win32_workaround.h>
#include <boost/coroutine/all.hpp>
@@ -27,19 +28,26 @@
namespace ripple {
class JobQueue;
class Section;
namespace RPC {
/** Yield is a generic placeholder for a function that yields control of
execution - perhaps to another coroutine.
/** See the README.md in this directory for more information about how
the RPC yield mechanism works.
*/
When code calls Yield, it might block for an indeterminate period of time.
/** Callback: do something and eventually return. */
using Callback = std::function <void ()>;
By convention you must not be holding any locks or any resource that would
prevent any other task from making forward progress when you call Yield.
*/
using Yield = std::function <void ()>;
static
Callback const emptyCallback ([] () {});
/** Continuation: do something, guarantee to eventually call Callback. */
using Continuation = std::function <void (Callback const&)>;
/** Suspend: suspend execution, pending completion of a Continuation. */
using Suspend = std::function <void (Continuation const&)>;
/** Wrap an Output so it yields after approximately `chunkSize` bytes.
@@ -51,19 +59,19 @@ using Yield = std::function <void ()>;
then never send more data.
*/
Json::Output chunkedYieldingOutput (
Json::Output const&, Yield const&, std::size_t chunkSize);
Json::Output const&, Callback const&, std::size_t chunkSize);
/** Yield every yieldCount calls. If yieldCount is 0, never yield. */
class CountedYield
{
public:
CountedYield (std::size_t yieldCount, Yield const& yield);
CountedYield (std::size_t yieldCount, Callback const& yield);
void yield();
private:
std::size_t count_ = 0;
std::size_t const yieldCount_;
Yield const yield_;
Callback const yield_;
};
/** When do we yield when performing a ledger computation? */
@@ -95,6 +103,21 @@ struct YieldStrategy
/** Create a yield strategy from a configuration Section. */
YieldStrategy makeYieldStrategy (Section const&);
/** Return a continuation that runs a Callback on a Job Queue with a given
name and JobType. */
Continuation callbackOnJobQueue (
JobQueue&, std::string const& jobName, JobType);
/** Return a Callback that will suspend and then run a continuation. */
inline
Callback suspendForContinuation (
Suspend const& suspend, Continuation const& continuation)
{
return suspend
? Callback ([=] () { suspend (continuation); })
: emptyCallback;
}
} // RPC
} // ripple

View File

@@ -20,25 +20,51 @@
#include <BeastConfig.h>
#include <ripple/rpc/Coroutine.h>
#include <ripple/rpc/tests/TestOutputSuite.test.h>
#include <iostream>
namespace ripple {
namespace RPC {
using CoroutinePull = boost::coroutines::coroutine <void>::pull_type;
using CoroutineType = Continuation;
using CoroutinePull = boost::coroutines::coroutine <CoroutineType>::pull_type;
using CoroutinePush = boost::coroutines::coroutine <CoroutineType>::push_type;
struct Coroutine::Impl : CoroutinePull
struct Coroutine::Impl : public std::enable_shared_from_this <Coroutine::Impl>
{
Impl (CoroutinePull&& p) : CoroutinePull (std::move(p)) {}
Impl (CoroutinePull&& pull_) : pull (std::move (pull_))
{
}
CoroutinePull pull;
void run()
{
while (pull)
{
pull();
if (! pull)
return;
if (auto continuation = pull.get())
{
auto that = shared_from_this();
continuation ([that] () { that->run(); });
return;
}
}
}
};
Coroutine::Coroutine (YieldFunction const& yieldFunction)
Coroutine::Coroutine (SuspendCallback const& suspendCallback)
{
CoroutinePull pull ([yieldFunction] (
boost::coroutines::coroutine <void>::push_type& push)
CoroutinePull pull ([suspendCallback] (CoroutinePush& push)
{
Yield yield = [&push] () { push(); };
yield ();
yieldFunction (yield);
Suspend suspend = [&push] (CoroutineType const& cbc) {
push (cbc);
};
suspend ({});
suspendCallback (suspend);
});
impl_ = std::make_shared<Impl> (std::move (pull));
@@ -46,14 +72,12 @@ Coroutine::Coroutine (YieldFunction const& yieldFunction)
Coroutine::~Coroutine() = default;
Coroutine::operator bool() const
void Coroutine::run()
{
return bool (*impl_);
}
void Coroutine::operator()() const
{
(*impl_)();
assert (impl_);
if (impl_)
impl_->run();
impl_.reset();
}
} // RPC

View File

@@ -25,8 +25,11 @@ namespace ripple {
namespace RPC {
Json::Output chunkedYieldingOutput (
Json::Output const& output, Yield const& yield, std::size_t chunkSize)
Json::Output const& output, Callback const& yield, std::size_t chunkSize)
{
if (!yield)
return output;
auto count = std::make_shared <std::size_t> (0);
return [chunkSize, count, output, yield] (boost::string_ref const& bytes)
{
@@ -41,14 +44,18 @@ Json::Output chunkedYieldingOutput (
}
CountedYield::CountedYield (std::size_t yieldCount, Yield const& yield)
CountedYield::CountedYield (std::size_t yieldCount, Callback const& yield)
: yieldCount_ (yieldCount), yield_ (yield)
{
}
void CountedYield::yield()
{
if (yieldCount_) {
if (!yield_)
return;
if (yieldCount_)
{
if (++count_ >= yieldCount_)
{
yield_();
@@ -73,5 +80,13 @@ YieldStrategy makeYieldStrategy (Section const& s)
return ys;
}
Continuation callbackOnJobQueue (
JobQueue& jobQueue, std::string const& name, JobType jobType)
{
return Continuation ([name, jobType, &jobQueue] (Callback const& cb) {
jobQueue.addJob (jobType, name, [cb] (Job&) { cb(); });
});
}
} // RPC
} // ripple

View File

@@ -30,40 +30,119 @@ class Coroutine_test : public TestOutputSuite
public:
using Strings = std::vector <std::string>;
void test (std::string const& name, int chunkSize, Strings const& expected)
void test (int chunkSize, Strings const& expected)
{
auto name = std::to_string (chunkSize);
setup (name);
std::string buffer;
Json::Output output = Json::stringOutput (buffer);
auto coroutine = Coroutine ([=] (Yield yield)
{
auto out = chunkedYieldingOutput (output, yield, chunkSize);
out ("hello ");
out ("there ");
out ("world.");
auto makeContinuation = [&] (std::string const& data) {
return Continuation ([=] (Callback const& cb) {
output (data + " ");
cb();
});
};
Strings result;
while (coroutine)
SuspendCallback suspendCallback ([&] (Suspend const& suspend)
{
coroutine();
Callback yield = suspendForContinuation (
suspend, makeContinuation ("*"));
auto out = chunkedYieldingOutput (output, yield, chunkSize);
out ("hello ");
result.push_back (buffer);
}
suspend (makeContinuation("HELLO"));
result.push_back (buffer);
out ("there ");
result.push_back (buffer);
suspend (makeContinuation("THERE"));
result.push_back (buffer);
out ("world ");
result.push_back (buffer);
suspend (makeContinuation("WORLD"));
result.push_back (buffer);
});
Coroutine (suspendCallback).run();
expectCollectionEquals (result, expected);
static
auto const printResults = false;
if (! printResults)
return;
std::string indent1 = " ";
std::string indent2 = indent1 + " ";
std::cerr << indent1 << "test (" + name + ", {";
for (auto i = 0; i < result.size(); ++i) {
if (i)
std::cerr << ",";
std::cerr << "\n" << indent2;
std::cerr << '"' << result[i] << '"';
}
std::cerr << "\n" << indent2 << "});\n";
expect(true);
}
void run() override
{
test ("zero", 0, {"hello ", "hello there ", "hello there world."});
test ("three", 3, {"hello ", "hello there ", "hello there world."});
test ("five", 5, {"hello ", "hello there ", "hello there world."});
test ("seven", 7, {"hello there ", "hello there world."});
test ("ten", 10, {"hello there ", "hello there world."});
test ("thirteen", 13, {"hello there world."});
test ("fifteen", 15, {"hello there world."});
test (0, {"hello ",
"hello HELLO ",
"hello HELLO * there ",
"hello HELLO * there THERE ",
"hello HELLO * there THERE * world ",
"hello HELLO * there THERE * world WORLD "
});
test (3, {"hello ",
"hello HELLO ",
"hello HELLO * there ",
"hello HELLO * there THERE ",
"hello HELLO * there THERE * world ",
"hello HELLO * there THERE * world WORLD "
});
test (5, {"hello ",
"hello HELLO ",
"hello HELLO * there ",
"hello HELLO * there THERE ",
"hello HELLO * there THERE * world ",
"hello HELLO * there THERE * world WORLD "
});
test (7, {"hello ",
"hello HELLO ",
"hello HELLO there ",
"hello HELLO there THERE ",
"hello HELLO there THERE * world ",
"hello HELLO there THERE * world WORLD "
});
test (10, {"hello ",
"hello HELLO ",
"hello HELLO there ",
"hello HELLO there THERE ",
"hello HELLO there THERE * world ",
"hello HELLO there THERE * world WORLD "
});
test (13, {"hello ",
"hello HELLO ",
"hello HELLO there ",
"hello HELLO there THERE ",
"hello HELLO there THERE world ",
"hello HELLO there THERE world WORLD "
});
test (15, {"hello ",
"hello HELLO ",
"hello HELLO there ",
"hello HELLO there THERE ",
"hello HELLO there THERE world ",
"hello HELLO there THERE world WORLD "
});
}
};

View File

@@ -64,6 +64,8 @@ ServerHandlerImp::ServerHandlerImp (Stoppable& parent,
, m_networkOPs (networkOPs)
, m_server (HTTP::make_Server(
*this, io_service, deprecatedLogs().journal("Server")))
, m_continuation (RPC::callbackOnJobQueue (
jobQueue, "RPC-Coroutine", jtCLIENT))
{
auto const& group (cm.group ("rpc"));
rpc_requests_ = group->make_counter ("requests");
@@ -156,27 +158,6 @@ Json::Output makeOutput (HTTP::Session& session)
};
}
namespace {
void runCoroutine (RPC::Coroutine coroutine, JobQueue& jobQueue)
{
if (!coroutine)
return;
coroutine();
if (!coroutine)
return;
// Reschedule the job on the job queue.
jobQueue.addJob (
jtCLIENT, "RPC-Coroutine",
[coroutine, &jobQueue] (Job&)
{
runCoroutine (coroutine, jobQueue);
});
}
} // namespace
void
ServerHandlerImp::onRequest (HTTP::Session& session)
{
@@ -190,8 +171,8 @@ ServerHandlerImp::onRequest (HTTP::Session& session)
}
// Check user/password authorization
if (! authorized (session.port(),
build_map(session.request().headers)))
if (! authorized (
session.port(), build_map(session.request().headers)))
{
HTTPReply (403, "Forbidden", makeOutput (session));
session.close (true);
@@ -203,15 +184,20 @@ ServerHandlerImp::onRequest (HTTP::Session& session)
if (setup_.yieldStrategy.useCoroutines ==
RPC::YieldStrategy::UseCoroutines::yes)
{
RPC::Coroutine::YieldFunction yieldFunction =
[this, detach] (Yield const& y) { processSession (detach, y); };
runCoroutine (RPC::Coroutine (yieldFunction), m_jobQueue);
RPC::SuspendCallback suspend (
[this, detach] (RPC::Suspend const& suspend) {
processSession (detach, suspend);
});
RPC::Coroutine coroutine (suspend);
coroutine.run();
}
else
{
m_jobQueue.addJob (
jtCLIENT, "RPC-Client",
[=] (Job&) { processSession (detach, RPC::Yield{}); });
[=] (Job&) {
processSession (detach, RPC::Suspend());
});
}
}
@@ -229,21 +215,17 @@ ServerHandlerImp::onStopped (HTTP::Server&)
//------------------------------------------------------------------------------
// Dispatched on the job queue
// Run as a couroutine.
void
ServerHandlerImp::processSession (
std::shared_ptr<HTTP::Session> const& session, Yield const& yield)
std::shared_ptr<HTTP::Session> const& session, Suspend const& suspend)
{
auto output = makeOutput (*session);
if (auto byteYieldCount = setup_.yieldStrategy.byteYieldCount)
output = RPC::chunkedYieldingOutput (output, yield, byteYieldCount);
processRequest (
session->port(),
to_string (session->body()),
session->remoteAddress().at_port (0),
output,
yield);
makeOutput (*session),
suspend);
if (session->request().keep_alive())
session->complete();
@@ -256,9 +238,18 @@ ServerHandlerImp::processRequest (
HTTP::Port const& port,
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output output,
Yield yield)
Output&& output,
Suspend const& suspend)
{
auto yield = RPC::suspendForContinuation (suspend, m_continuation);
// Move off the webserver thread onto the JobQueue.
yield();
assert (getApp().getJobQueue().getJobForThread());
if (auto count = setup_.yieldStrategy.byteYieldCount)
output = RPC::chunkedYieldingOutput (std::move (output), yield, count);
Json::Value jsonRPC;
{
Json::Reader reader;
@@ -374,7 +365,9 @@ ServerHandlerImp::processRequest (
<< "doRpcCommand:" << strMethod << ":" << params;
auto const start (std::chrono::high_resolution_clock::now ());
RPC::Context context {params, loadType, m_networkOPs, role, nullptr, yield};
RPC::Context context {
params, loadType, m_networkOPs, role, nullptr,
std::move (suspend), std::move (yield)};
std::string response;
if (setup_.yieldStrategy.streaming == RPC::YieldStrategy::Streaming::yes)
@@ -641,7 +634,6 @@ to_Port(ParsedPort const& parsed, std::ostream& log)
throw std::exception();
}
p.port = *parsed.port;
if (parsed.admin_ip)
p.admin_ip = *parsed.admin_ip;

View File

@@ -40,6 +40,7 @@ private:
JobQueue& m_jobQueue;
NetworkOPs& m_networkOPs;
std::unique_ptr<HTTP::Server> m_server;
RPC::Continuation m_continuation;
Setup setup_;
beast::insight::Counter rpc_requests_;
beast::insight::Event rpc_io_;
@@ -55,7 +56,7 @@ public:
private:
using Output = Json::Output;
using Yield = RPC::Yield;
using Suspend = RPC::Suspend;
void
setup (Setup const& setup, beast::Journal journal) override;
@@ -107,11 +108,11 @@ private:
//--------------------------------------------------------------------------
void
processSession (std::shared_ptr<HTTP::Session> const&, Yield const&);
processSession (std::shared_ptr<HTTP::Session> const&, Suspend const&);
void
processRequest (HTTP::Port const& port, std::string const& request,
beast::IP::Endpoint const& remoteIPAddress, Output, Yield);
beast::IP::Endpoint const& remoteIPAddress, Output&&, Suspend const&);
//
// PropertyStream