From b8526f7894189341d2d114c21a6b6a1f0cf08937 Mon Sep 17 00:00:00 2001 From: Tom Ritchford Date: Tue, 26 May 2015 23:17:24 -0400 Subject: [PATCH] Allow RPC yield to be a continuation. --- src/ripple/app/ledger/LedgerToJson.h | 4 +- src/ripple/rpc/Context.h | 3 +- src/ripple/rpc/Coroutine.h | 25 ++--- src/ripple/rpc/README.md | 73 +++++++++++- src/ripple/rpc/Yield.h | 43 +++++-- src/ripple/rpc/impl/Coroutine.cpp | 56 +++++++--- src/ripple/rpc/impl/Yield.cpp | 21 +++- src/ripple/rpc/tests/Coroutine.test.cpp | 117 ++++++++++++++++---- src/ripple/server/impl/ServerHandlerImp.cpp | 70 ++++++------ src/ripple/server/impl/ServerHandlerImp.h | 7 +- 10 files changed, 311 insertions(+), 108 deletions(-) diff --git a/src/ripple/app/ledger/LedgerToJson.h b/src/ripple/app/ledger/LedgerToJson.h index 6770a0ea1..1daf4109b 100644 --- a/src/ripple/app/ledger/LedgerToJson.h +++ b/src/ripple/app/ledger/LedgerToJson.h @@ -35,7 +35,7 @@ struct LedgerFill { LedgerFill (Ledger& l, int o = 0, - RPC::Yield const& y = {}, + RPC::Callback const& y = {}, RPC::YieldStrategy const& ys = {}) : ledger (l), options (o), @@ -49,7 +49,7 @@ struct LedgerFill Ledger& ledger; int options; - RPC::Yield yield; + RPC::Callback yield; RPC::YieldStrategy yieldStrategy; }; diff --git a/src/ripple/rpc/Context.h b/src/ripple/rpc/Context.h index 159c6cb13..c9671fd0a 100644 --- a/src/ripple/rpc/Context.h +++ b/src/ripple/rpc/Context.h @@ -40,7 +40,8 @@ struct Context NetworkOPs& netOps; Role role; InfoSub::pointer infoSub; - RPC::Yield yield; + Suspend suspend; + Callback yield; NodeStore::ScopedMetrics metrics; }; diff --git a/src/ripple/rpc/Coroutine.h b/src/ripple/rpc/Coroutine.h index 7a8784b5b..1a42ab533 100644 --- a/src/ripple/rpc/Coroutine.h +++ b/src/ripple/rpc/Coroutine.h @@ -25,27 +25,26 @@ namespace ripple { namespace RPC { -/** Runs a function that takes a yield as a coroutine. */ +/** SuspendCallback: a function that a Coroutine gives to the coroutine + scheduler so that it gets a callback with a Suspend when it runs. + */ +using SuspendCallback = std::function ; + +/** Runs a function that takes a SuspendCallback as a coroutine. */ class Coroutine { public: - using YieldFunction = std::function ; - - explicit Coroutine (YieldFunction const&); + explicit Coroutine (SuspendCallback const&); ~Coroutine(); - /** Is the coroutine finished? */ - operator bool() const; - - /** Run one more step of the coroutine. */ - void operator()() const; + /** Run the coroutine and guarantee completion. */ + void run (); private: - struct Impl; + class Impl; + std::shared_ptr impl_; - std::shared_ptr impl_; - // We'd prefer to use std::unique_ptr here, but unfortunately, in C++11 - // move semantics don't work well with `std::bind` or lambdas. + Coroutine (std::shared_ptr const&); }; } // RPC diff --git a/src/ripple/rpc/README.md b/src/ripple/rpc/README.md index cb52d938d..19e625dd3 100644 --- a/src/ripple/rpc/README.md +++ b/src/ripple/rpc/README.md @@ -1,3 +1,72 @@ -# RPC +# How to use RPC coroutines. -New code to generalize the operation of RPC commands +## Introduction. + +By default, an RPC handler runs as an uninterrupted task on the JobQueue. This +is fine for commands that are fast to compute but might not be acceptable for +tasks that require multiple parts or are large, like a full ledger. + +For this purpose, the rippled RPC handler allows *suspension with continuation* +- a request to suspend execution of the RPC response and to continue it after +some function or job has been executed. A default continuation is supplied +which simply reschedules the job on the JobQueue, or the programmer can supply +their own. + +## The classes. + +Suspension with continuation uses four `std::function`s in the `ripple::RPC` +namespace: + + using Callback = std::function ; + using Continuation = std::function ; + using Suspend = std::function ; + using SuspendCallback = std::function ; + +A `Callback` is a generic 0-argument function. A given `Callback` might or might +not block. Unless otherwise advised, do not hold locks or any resource that +would prevent any other task from making forward progress when you call a +`Callback`. + +A `Continuation` is a function that is given a `Callback` and promises to call +it later. A `Continuation` guarantees to call the `Callback` exactly once at +some point in the future, but it does not have to be immediately or even in the +current thread. + +A `Suspend` is a function belonging to a `Coroutine`. A `Suspend` runs a +`Continuation`, passing it a `Callback` that continues execution of the +`Coroutine`. + +And finally, a `SuspendCallback` is a `std::function` which is given a +`Suspend`. This is what the RPC handler gives to the coroutine manager, +expecting to get called back with a `Suspend` and to be able to start execution. + +## The flow of control. + +Given these functions, the flow of RPC control when using coroutines is +straight-forward. + +1. The instance of `ServerHandler` receives an RPC request. + +2. It creates a `SuspendCallback` and gives it to the coroutine manager. + +3. The coroutine manager creates a `Coroutine`, starts it up, and then calls + the `SuspendCallback` with a `Suspend`. + +4. Now the RPC response starts to be calculated. + +5. When the RPC handler wants to suspend, it calls the `Suspend` function with + a `Continuation`. + +6. Coroutine execution is suspended. + +7. The `Continuation` is called with a `Callback` that the coroutine manager + creates. + +8. The `Continuation` may choose to execute immediately, defer execution on the + job queue, or wait for some resource to be free. + +9. When the `Continuation` is finished, it calls the `Callback` that the + coroutine manager gave it, perhaps a long time ago. + +10. This `Callback` continues execution on the suspended `Coroutine` from where + it left off. diff --git a/src/ripple/rpc/Yield.h b/src/ripple/rpc/Yield.h index cc50fcedc..73dffafbc 100644 --- a/src/ripple/rpc/Yield.h +++ b/src/ripple/rpc/Yield.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_RPC_YIELD_H_INCLUDED #define RIPPLE_RPC_YIELD_H_INCLUDED +#include #include #include #include @@ -27,19 +28,26 @@ namespace ripple { +class JobQueue; class Section; namespace RPC { -/** Yield is a generic placeholder for a function that yields control of - execution - perhaps to another coroutine. +/** See the README.md in this directory for more information about how + the RPC yield mechanism works. + */ - When code calls Yield, it might block for an indeterminate period of time. +/** Callback: do something and eventually return. */ +using Callback = std::function ; - By convention you must not be holding any locks or any resource that would - prevent any other task from making forward progress when you call Yield. -*/ -using Yield = std::function ; +static +Callback const emptyCallback ([] () {}); + +/** Continuation: do something, guarantee to eventually call Callback. */ +using Continuation = std::function ; + +/** Suspend: suspend execution, pending completion of a Continuation. */ +using Suspend = std::function ; /** Wrap an Output so it yields after approximately `chunkSize` bytes. @@ -51,19 +59,19 @@ using Yield = std::function ; then never send more data. */ Json::Output chunkedYieldingOutput ( - Json::Output const&, Yield const&, std::size_t chunkSize); + Json::Output const&, Callback const&, std::size_t chunkSize); /** Yield every yieldCount calls. If yieldCount is 0, never yield. */ class CountedYield { public: - CountedYield (std::size_t yieldCount, Yield const& yield); + CountedYield (std::size_t yieldCount, Callback const& yield); void yield(); private: std::size_t count_ = 0; std::size_t const yieldCount_; - Yield const yield_; + Callback const yield_; }; /** When do we yield when performing a ledger computation? */ @@ -95,6 +103,21 @@ struct YieldStrategy /** Create a yield strategy from a configuration Section. */ YieldStrategy makeYieldStrategy (Section const&); +/** Return a continuation that runs a Callback on a Job Queue with a given + name and JobType. */ +Continuation callbackOnJobQueue ( + JobQueue&, std::string const& jobName, JobType); + +/** Return a Callback that will suspend and then run a continuation. */ +inline +Callback suspendForContinuation ( + Suspend const& suspend, Continuation const& continuation) +{ + return suspend + ? Callback ([=] () { suspend (continuation); }) + : emptyCallback; +} + } // RPC } // ripple diff --git a/src/ripple/rpc/impl/Coroutine.cpp b/src/ripple/rpc/impl/Coroutine.cpp index deaa4f52c..f7d0b1208 100644 --- a/src/ripple/rpc/impl/Coroutine.cpp +++ b/src/ripple/rpc/impl/Coroutine.cpp @@ -20,25 +20,51 @@ #include #include #include +#include namespace ripple { namespace RPC { -using CoroutinePull = boost::coroutines::coroutine ::pull_type; +using CoroutineType = Continuation; +using CoroutinePull = boost::coroutines::coroutine ::pull_type; +using CoroutinePush = boost::coroutines::coroutine ::push_type; -struct Coroutine::Impl : CoroutinePull +struct Coroutine::Impl : public std::enable_shared_from_this { - Impl (CoroutinePull&& p) : CoroutinePull (std::move(p)) {} + Impl (CoroutinePull&& pull_) : pull (std::move (pull_)) + { + } + + CoroutinePull pull; + + void run() + { + while (pull) + { + pull(); + + if (! pull) + return; + + if (auto continuation = pull.get()) + { + auto that = shared_from_this(); + continuation ([that] () { that->run(); }); + return; + } + } + } }; -Coroutine::Coroutine (YieldFunction const& yieldFunction) +Coroutine::Coroutine (SuspendCallback const& suspendCallback) { - CoroutinePull pull ([yieldFunction] ( - boost::coroutines::coroutine ::push_type& push) + CoroutinePull pull ([suspendCallback] (CoroutinePush& push) { - Yield yield = [&push] () { push(); }; - yield (); - yieldFunction (yield); + Suspend suspend = [&push] (CoroutineType const& cbc) { + push (cbc); + }; + suspend ({}); + suspendCallback (suspend); }); impl_ = std::make_shared (std::move (pull)); @@ -46,14 +72,12 @@ Coroutine::Coroutine (YieldFunction const& yieldFunction) Coroutine::~Coroutine() = default; -Coroutine::operator bool() const +void Coroutine::run() { - return bool (*impl_); -} - -void Coroutine::operator()() const -{ - (*impl_)(); + assert (impl_); + if (impl_) + impl_->run(); + impl_.reset(); } } // RPC diff --git a/src/ripple/rpc/impl/Yield.cpp b/src/ripple/rpc/impl/Yield.cpp index 9509891ef..b604e787e 100644 --- a/src/ripple/rpc/impl/Yield.cpp +++ b/src/ripple/rpc/impl/Yield.cpp @@ -25,8 +25,11 @@ namespace ripple { namespace RPC { Json::Output chunkedYieldingOutput ( - Json::Output const& output, Yield const& yield, std::size_t chunkSize) + Json::Output const& output, Callback const& yield, std::size_t chunkSize) { + if (!yield) + return output; + auto count = std::make_shared (0); return [chunkSize, count, output, yield] (boost::string_ref const& bytes) { @@ -41,14 +44,18 @@ Json::Output chunkedYieldingOutput ( } -CountedYield::CountedYield (std::size_t yieldCount, Yield const& yield) +CountedYield::CountedYield (std::size_t yieldCount, Callback const& yield) : yieldCount_ (yieldCount), yield_ (yield) { } void CountedYield::yield() { - if (yieldCount_) { + if (!yield_) + return; + + if (yieldCount_) + { if (++count_ >= yieldCount_) { yield_(); @@ -73,5 +80,13 @@ YieldStrategy makeYieldStrategy (Section const& s) return ys; } +Continuation callbackOnJobQueue ( + JobQueue& jobQueue, std::string const& name, JobType jobType) +{ + return Continuation ([name, jobType, &jobQueue] (Callback const& cb) { + jobQueue.addJob (jobType, name, [cb] (Job&) { cb(); }); + }); +} + } // RPC } // ripple diff --git a/src/ripple/rpc/tests/Coroutine.test.cpp b/src/ripple/rpc/tests/Coroutine.test.cpp index 8ba9645fb..ab42cf53c 100644 --- a/src/ripple/rpc/tests/Coroutine.test.cpp +++ b/src/ripple/rpc/tests/Coroutine.test.cpp @@ -30,41 +30,120 @@ class Coroutine_test : public TestOutputSuite public: using Strings = std::vector ; - void test (std::string const& name, int chunkSize, Strings const& expected) + void test (int chunkSize, Strings const& expected) { + auto name = std::to_string (chunkSize); setup (name); std::string buffer; Json::Output output = Json::stringOutput (buffer); - auto coroutine = Coroutine ([=] (Yield yield) - { - auto out = chunkedYieldingOutput (output, yield, chunkSize); - out ("hello "); - out ("there "); - out ("world."); - }); + auto makeContinuation = [&] (std::string const& data) { + return Continuation ([=] (Callback const& cb) { + output (data + " "); + cb(); + }); + }; Strings result; - while (coroutine) + SuspendCallback suspendCallback ([&] (Suspend const& suspend) { - coroutine(); + Callback yield = suspendForContinuation ( + suspend, makeContinuation ("*")); + auto out = chunkedYieldingOutput (output, yield, chunkSize); + out ("hello "); result.push_back (buffer); - } + suspend (makeContinuation("HELLO")); + result.push_back (buffer); + + out ("there "); + result.push_back (buffer); + + suspend (makeContinuation("THERE")); + result.push_back (buffer); + + out ("world "); + result.push_back (buffer); + + suspend (makeContinuation("WORLD")); + result.push_back (buffer); + }); + + Coroutine (suspendCallback).run(); expectCollectionEquals (result, expected); + + static + auto const printResults = false; + if (! printResults) + return; + + std::string indent1 = " "; + std::string indent2 = indent1 + " "; + + std::cerr << indent1 << "test (" + name + ", {"; + for (auto i = 0; i < result.size(); ++i) { + if (i) + std::cerr << ","; + std::cerr << "\n" << indent2; + std::cerr << '"' << result[i] << '"'; + } + std::cerr << "\n" << indent2 << "});\n"; + expect(true); } void run() override { - test ("zero", 0, {"hello ", "hello there ", "hello there world."}); - test ("three", 3, {"hello ", "hello there ", "hello there world."}); - test ("five", 5, {"hello ", "hello there ", "hello there world."}); - test ("seven", 7, {"hello there ", "hello there world."}); - test ("ten", 10, {"hello there ", "hello there world."}); - test ("thirteen", 13, {"hello there world."}); - test ("fifteen", 15, {"hello there world."}); - } + test (0, {"hello ", + "hello HELLO ", + "hello HELLO * there ", + "hello HELLO * there THERE ", + "hello HELLO * there THERE * world ", + "hello HELLO * there THERE * world WORLD " + }); + test (3, {"hello ", + "hello HELLO ", + "hello HELLO * there ", + "hello HELLO * there THERE ", + "hello HELLO * there THERE * world ", + "hello HELLO * there THERE * world WORLD " + }); + test (5, {"hello ", + "hello HELLO ", + "hello HELLO * there ", + "hello HELLO * there THERE ", + "hello HELLO * there THERE * world ", + "hello HELLO * there THERE * world WORLD " + }); + test (7, {"hello ", + "hello HELLO ", + "hello HELLO there ", + "hello HELLO there THERE ", + "hello HELLO there THERE * world ", + "hello HELLO there THERE * world WORLD " + }); + test (10, {"hello ", + "hello HELLO ", + "hello HELLO there ", + "hello HELLO there THERE ", + "hello HELLO there THERE * world ", + "hello HELLO there THERE * world WORLD " + }); + test (13, {"hello ", + "hello HELLO ", + "hello HELLO there ", + "hello HELLO there THERE ", + "hello HELLO there THERE world ", + "hello HELLO there THERE world WORLD " + }); + test (15, {"hello ", + "hello HELLO ", + "hello HELLO there ", + "hello HELLO there THERE ", + "hello HELLO there THERE world ", + "hello HELLO there THERE world WORLD " + }); + } }; BEAST_DEFINE_TESTSUITE(Coroutine, RPC, ripple); diff --git a/src/ripple/server/impl/ServerHandlerImp.cpp b/src/ripple/server/impl/ServerHandlerImp.cpp index 9030e316f..b1628ae52 100644 --- a/src/ripple/server/impl/ServerHandlerImp.cpp +++ b/src/ripple/server/impl/ServerHandlerImp.cpp @@ -64,6 +64,8 @@ ServerHandlerImp::ServerHandlerImp (Stoppable& parent, , m_networkOPs (networkOPs) , m_server (HTTP::make_Server( *this, io_service, deprecatedLogs().journal("Server"))) + , m_continuation (RPC::callbackOnJobQueue ( + jobQueue, "RPC-Coroutine", jtCLIENT)) { auto const& group (cm.group ("rpc")); rpc_requests_ = group->make_counter ("requests"); @@ -156,27 +158,6 @@ Json::Output makeOutput (HTTP::Session& session) }; } -namespace { - -void runCoroutine (RPC::Coroutine coroutine, JobQueue& jobQueue) -{ - if (!coroutine) - return; - coroutine(); - if (!coroutine) - return; - - // Reschedule the job on the job queue. - jobQueue.addJob ( - jtCLIENT, "RPC-Coroutine", - [coroutine, &jobQueue] (Job&) - { - runCoroutine (coroutine, jobQueue); - }); -} - -} // namespace - void ServerHandlerImp::onRequest (HTTP::Session& session) { @@ -190,8 +171,8 @@ ServerHandlerImp::onRequest (HTTP::Session& session) } // Check user/password authorization - if (! authorized (session.port(), - build_map(session.request().headers))) + if (! authorized ( + session.port(), build_map(session.request().headers))) { HTTPReply (403, "Forbidden", makeOutput (session)); session.close (true); @@ -203,15 +184,20 @@ ServerHandlerImp::onRequest (HTTP::Session& session) if (setup_.yieldStrategy.useCoroutines == RPC::YieldStrategy::UseCoroutines::yes) { - RPC::Coroutine::YieldFunction yieldFunction = - [this, detach] (Yield const& y) { processSession (detach, y); }; - runCoroutine (RPC::Coroutine (yieldFunction), m_jobQueue); + RPC::SuspendCallback suspend ( + [this, detach] (RPC::Suspend const& suspend) { + processSession (detach, suspend); + }); + RPC::Coroutine coroutine (suspend); + coroutine.run(); } else { m_jobQueue.addJob ( jtCLIENT, "RPC-Client", - [=] (Job&) { processSession (detach, RPC::Yield{}); }); + [=] (Job&) { + processSession (detach, RPC::Suspend()); + }); } } @@ -229,21 +215,17 @@ ServerHandlerImp::onStopped (HTTP::Server&) //------------------------------------------------------------------------------ -// Dispatched on the job queue +// Run as a couroutine. void ServerHandlerImp::processSession ( - std::shared_ptr const& session, Yield const& yield) + std::shared_ptr const& session, Suspend const& suspend) { - auto output = makeOutput (*session); - if (auto byteYieldCount = setup_.yieldStrategy.byteYieldCount) - output = RPC::chunkedYieldingOutput (output, yield, byteYieldCount); - processRequest ( session->port(), to_string (session->body()), session->remoteAddress().at_port (0), - output, - yield); + makeOutput (*session), + suspend); if (session->request().keep_alive()) session->complete(); @@ -256,9 +238,18 @@ ServerHandlerImp::processRequest ( HTTP::Port const& port, std::string const& request, beast::IP::Endpoint const& remoteIPAddress, - Output output, - Yield yield) + Output&& output, + Suspend const& suspend) { + auto yield = RPC::suspendForContinuation (suspend, m_continuation); + + // Move off the webserver thread onto the JobQueue. + yield(); + assert (getApp().getJobQueue().getJobForThread()); + + if (auto count = setup_.yieldStrategy.byteYieldCount) + output = RPC::chunkedYieldingOutput (std::move (output), yield, count); + Json::Value jsonRPC; { Json::Reader reader; @@ -374,7 +365,9 @@ ServerHandlerImp::processRequest ( << "doRpcCommand:" << strMethod << ":" << params; auto const start (std::chrono::high_resolution_clock::now ()); - RPC::Context context {params, loadType, m_networkOPs, role, nullptr, yield}; + RPC::Context context { + params, loadType, m_networkOPs, role, nullptr, + std::move (suspend), std::move (yield)}; std::string response; if (setup_.yieldStrategy.streaming == RPC::YieldStrategy::Streaming::yes) @@ -641,7 +634,6 @@ to_Port(ParsedPort const& parsed, std::ostream& log) throw std::exception(); } p.port = *parsed.port; - if (parsed.admin_ip) p.admin_ip = *parsed.admin_ip; diff --git a/src/ripple/server/impl/ServerHandlerImp.h b/src/ripple/server/impl/ServerHandlerImp.h index 3906c5f5c..bd8ef4520 100644 --- a/src/ripple/server/impl/ServerHandlerImp.h +++ b/src/ripple/server/impl/ServerHandlerImp.h @@ -40,6 +40,7 @@ private: JobQueue& m_jobQueue; NetworkOPs& m_networkOPs; std::unique_ptr m_server; + RPC::Continuation m_continuation; Setup setup_; beast::insight::Counter rpc_requests_; beast::insight::Event rpc_io_; @@ -55,7 +56,7 @@ public: private: using Output = Json::Output; - using Yield = RPC::Yield; + using Suspend = RPC::Suspend; void setup (Setup const& setup, beast::Journal journal) override; @@ -107,11 +108,11 @@ private: //-------------------------------------------------------------------------- void - processSession (std::shared_ptr const&, Yield const&); + processSession (std::shared_ptr const&, Suspend const&); void processRequest (HTTP::Port const& port, std::string const& request, - beast::IP::Endpoint const& remoteIPAddress, Output, Yield); + beast::IP::Endpoint const& remoteIPAddress, Output&&, Suspend const&); // // PropertyStream