Implement new coroutines (RIPD-1043)

This commit is contained in:
Miguel Portilla
2015-10-19 10:45:41 -04:00
committed by Nik Bougalis
parent 880f354b90
commit 108906cb20
30 changed files with 1089 additions and 1462 deletions

View File

@@ -2090,6 +2090,10 @@
</ClCompile>
<ClInclude Include="..\..\src\ripple\core\Job.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\core\JobCoro.h">
</ClInclude>
<None Include="..\..\src\ripple\core\JobCoro.ipp">
</None>
<ClInclude Include="..\..\src\ripple\core\JobQueue.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\core\JobTypeData.h">
@@ -2112,6 +2116,12 @@
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='debug.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='release.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile>
<ClCompile Include="..\..\src\ripple\core\tests\Coroutine.test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='debug.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories Condition="'$(Configuration)|$(Platform)'=='release.classic|x64'">..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
</ClCompile>
<ClCompile Include="..\..\src\ripple\core\tests\LoadFeeTrack.test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
@@ -3153,8 +3163,6 @@
</ClCompile>
<ClInclude Include="..\..\src\ripple\rpc\Context.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\rpc\Coroutine.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\rpc\handlers\AccountCurrenciesHandler.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
@@ -3419,10 +3427,6 @@
</ClCompile>
<ClInclude Include="..\..\src\ripple\rpc\impl\Accounts.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\rpc\impl\Coroutine.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\rpc\impl\GetAccountObjects.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
@@ -3485,10 +3489,6 @@
</ClCompile>
<ClInclude Include="..\..\src\ripple\rpc\impl\Utilities.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\rpc\impl\Yield.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClInclude Include="..\..\src\ripple\rpc\InternalHandler.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\rpc\RipplePathFind.h">
@@ -3499,10 +3499,6 @@
</ClInclude>
<ClInclude Include="..\..\src\ripple\rpc\Status.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\rpc\tests\Coroutine.test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\rpc\tests\JSONRPC.test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
@@ -3517,12 +3513,6 @@
</ClCompile>
<ClInclude Include="..\..\src\ripple\rpc\tests\TestOutputSuite.test.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\rpc\tests\Yield.test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClInclude Include="..\..\src\ripple\rpc\Yield.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\server\Handler.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\server\Handoff.h">

View File

@@ -2799,6 +2799,12 @@
<ClInclude Include="..\..\src\ripple\core\Job.h">
<Filter>ripple\core</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\core\JobCoro.h">
<Filter>ripple\core</Filter>
</ClInclude>
<None Include="..\..\src\ripple\core\JobCoro.ipp">
<Filter>ripple\core</Filter>
</None>
<ClInclude Include="..\..\src\ripple\core\JobQueue.h">
<Filter>ripple\core</Filter>
</ClInclude>
@@ -2826,6 +2832,9 @@
<ClCompile Include="..\..\src\ripple\core\tests\Config.test.cpp">
<Filter>ripple\core\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\core\tests\Coroutine.test.cpp">
<Filter>ripple\core\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\core\tests\LoadFeeTrack.test.cpp">
<Filter>ripple\core\tests</Filter>
</ClCompile>
@@ -3843,9 +3852,6 @@
<ClInclude Include="..\..\src\ripple\rpc\Context.h">
<Filter>ripple\rpc</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\rpc\Coroutine.h">
<Filter>ripple\rpc</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\rpc\handlers\AccountCurrenciesHandler.cpp">
<Filter>ripple\rpc\handlers</Filter>
</ClCompile>
@@ -4053,9 +4059,6 @@
<ClInclude Include="..\..\src\ripple\rpc\impl\Accounts.h">
<Filter>ripple\rpc\impl</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\rpc\impl\Coroutine.cpp">
<Filter>ripple\rpc\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\rpc\impl\GetAccountObjects.cpp">
<Filter>ripple\rpc\impl</Filter>
</ClCompile>
@@ -4116,9 +4119,6 @@
<ClInclude Include="..\..\src\ripple\rpc\impl\Utilities.h">
<Filter>ripple\rpc\impl</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\rpc\impl\Yield.cpp">
<Filter>ripple\rpc\impl</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\rpc\InternalHandler.h">
<Filter>ripple\rpc</Filter>
</ClInclude>
@@ -4134,9 +4134,6 @@
<ClInclude Include="..\..\src\ripple\rpc\Status.h">
<Filter>ripple\rpc</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\rpc\tests\Coroutine.test.cpp">
<Filter>ripple\rpc\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\rpc\tests\JSONRPC.test.cpp">
<Filter>ripple\rpc\tests</Filter>
</ClCompile>
@@ -4149,12 +4146,6 @@
<ClInclude Include="..\..\src\ripple\rpc\tests\TestOutputSuite.test.h">
<Filter>ripple\rpc\tests</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\rpc\tests\Yield.test.cpp">
<Filter>ripple\rpc\tests</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\rpc\Yield.h">
<Filter>ripple\rpc</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\server\Handler.h">
<Filter>ripple\server</Filter>
</ClInclude>

View File

@@ -25,7 +25,6 @@
#include <ripple/basics/StringUtilities.h>
#include <ripple/protocol/JsonFields.h>
#include <ripple/protocol/STTx.h>
#include <ripple/rpc/Yield.h>
#include <ripple/json/Object.h>
#include <boost/date_time/posix_time/posix_time.hpp>
@@ -33,14 +32,9 @@ namespace ripple {
struct LedgerFill
{
LedgerFill (ReadView const& l,
int o = 0,
RPC::Callback const& y = {},
RPC::YieldStrategy const& ys = {})
: ledger (l),
options (o),
yield (y),
yieldStrategy (ys)
LedgerFill (ReadView const& l, int o = 0)
: ledger (l)
, options (o)
{
}
@@ -49,8 +43,6 @@ struct LedgerFill
ReadView const& ledger;
int options;
RPC::Callback yield;
RPC::YieldStrategy yieldStrategy;
};
/** Given a Ledger and options, fill a Json::Object or Json::Value with a

View File

@@ -88,15 +88,10 @@ void fillJsonTx (Object& json, LedgerFill const& fill)
auto bBinary = isBinary(fill);
auto bExpanded = isExpanded(fill);
RPC::CountedYield count (
fill.yieldStrategy.transactionYieldCount, fill.yield);
try
{
for (auto& i: fill.ledger.txs)
{
count.yield();
if (! bExpanded)
{
txns.append(to_string(i.first->getTransactionID()));
@@ -128,15 +123,11 @@ void fillJsonState(Object& json, LedgerFill const& fill)
{
auto& ledger = fill.ledger;
auto&& array = Json::setArray (json, jss::accountState);
RPC::CountedYield count (
fill.yieldStrategy.accountYieldCount, fill.yield);
auto expanded = isExpanded(fill);
auto binary = isBinary(fill);
for(auto const& sle : ledger.sles)
{
count.yield();
if (binary)
{
auto&& obj = appendObject(array);

View File

@@ -406,8 +406,9 @@ public:
// The JobQueue has to come pretty early since
// almost everything is a Stoppable child of the JobQueue.
//
, m_jobQueue (make_JobQueue (m_collectorManager->group ("jobq"),
m_nodeStoreScheduler, logs_->journal("JobQueue"), *logs_))
, m_jobQueue (std::make_unique<JobQueue>(
m_collectorManager->group ("jobq"), m_nodeStoreScheduler,
logs_->journal("JobQueue"), *logs_))
//
// Anything which calls addJob must be a descendant of the JobQueue

View File

@@ -95,9 +95,8 @@ void startServer (Application& app)
std::cerr << "Startup RPC: " << jvCommand << std::endl;
Resource::Charge loadType = Resource::feeReferenceRPC;
RPC::Context context {
app.journal ("RPCHandler"), jvCommand, app, loadType, app.getOPs (),
app.getLedgerMaster(), Role::ADMIN, app};
RPC::Context context {app.journal ("RPCHandler"), jvCommand, app,
loadType, app.getOPs (), app.getLedgerMaster(), Role::ADMIN};
Json::Value jvResult;
RPC::doCommand (context, jvResult);

View File

@@ -232,6 +232,15 @@ bool PathRequest::isValid (RippleLineCache::ref crCache)
return true;
}
/* If this is a normal path request, we want to run it once "fast" now
to give preliminary results.
If this is a legacy path request, we are only going to run it once,
and we can't run it in full now, so we don't want to run it at all.
If there's an error, we need to be sure to return it to the caller
in all cases.
*/
Json::Value PathRequest::doCreate (
RippleLineCache::ref& cache,
Json::Value const& value,
@@ -242,8 +251,8 @@ Json::Value PathRequest::doCreate (
if (parseJson (value) != PFR_PJ_INVALID)
{
valid = isValid (cache);
if (! hasCompletion())
status = valid ? doUpdate(cache, true) : jvStatus;
status = valid && ! hasCompletion()
? doUpdate(cache, true) : jvStatus;
}
else
{

79
src/ripple/core/JobCoro.h Normal file
View File

@@ -0,0 +1,79 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_CORE_JOBCORO_H_INCLUDED
#define RIPPLE_CORE_JOBCORO_H_INCLUDED
#include <ripple/core/Job.h>
#include <beast/win32_workaround.h>
#include <boost/coroutine/all.hpp>
#include <string>
#include <mutex>
namespace ripple {
class JobQueue;
namespace detail {
struct JobCoro_create_t { };
} // detail
class JobCoro : public std::enable_shared_from_this<JobCoro>
{
private:
JobQueue& jq_;
JobType type_;
std::string name_;
std::mutex mutex_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
public:
// Private: Used in the implementation
template <class F>
JobCoro (detail::JobCoro_create_t, JobQueue&, JobType,
std::string const&, F&&);
/** Suspend coroutine execution.
Effects:
The coroutine's stack is saved.
The associated Job thread is released.
Note:
The associated Job function returns.
Undefined behavior if called consecutively without a corresponding post.
*/
void yield () const;
/** Schedule coroutine execution
Effects:
Returns immediately.
A new job is scheduled to resume the execution of the coroutine.
When the job runs, the coroutine's stack is restored and execution
continues at the beginning of coroutine function or the statement
after the previous call to yield.
Undefined behavior if called consecutively without a corresponding yield.
*/
void post ();
};
} // ripple
#endif

View File

@@ -17,25 +17,48 @@
*/
//==============================================================================
#ifndef RIPPLE_RPC_COROUTINE_H_INCLUDED
#define RIPPLE_RPC_COROUTINE_H_INCLUDED
#include <ripple/rpc/Yield.h>
#ifndef RIPPLE_CORE_JOBCOROINL_H_INCLUDED
#define RIPPLE_CORE_JOBCOROINL_H_INCLUDED
namespace ripple {
namespace RPC {
/** Coroutine is a function that is given to the coroutine scheduler which
later gets called with a Suspend. A Coroutine can't be empty. */
using Coroutine = std::function <void (Suspend const&)>;
template <class F>
JobCoro::JobCoro (detail::JobCoro_create_t, JobQueue& jq, JobType type,
std::string const& name, F&& f)
: jq_(jq)
, type_(type)
, name_(name)
, coro_(
[this, fn = std::forward<F>(f)]
(boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield)
{
yield_ = &do_yield;
(*yield_)();
fn(shared_from_this());
}, boost::coroutines::attributes (1024 * 1024))
{
}
/** Run as a coroutine. */
void runOnCoroutine(Coroutine const&);
inline
void
JobCoro::yield () const
{
(*yield_)();
}
/** Run as coroutine if UseCoroutines::yes, otherwise run immediately. */
void runOnCoroutine(UseCoroutines, Coroutine const&);
inline
void
JobCoro::post ()
{
// sp keeps 'this' alive
jq_.addJob(type_, name_,
[this, sp = shared_from_this()](Job&)
{
std::lock_guard<std::mutex> lock (mutex_);
coro_();
});
}
} // RPC
} // ripple
#endif

View File

@@ -21,70 +21,277 @@
#define RIPPLE_CORE_JOBQUEUE_H_INCLUDED
#include <ripple/core/JobTypes.h>
#include <ripple/core/JobTypeData.h>
#include <ripple/core/JobCoro.h>
#include <ripple/json/json_value.h>
#include <beast/insight/Collector.h>
#include <beast/threads/Stoppable.h>
#include <beast/module/core/thread/Workers.h>
#include <boost/function.hpp>
#include <boost/optional.hpp>
#include <thread>
#include <set>
namespace ripple {
class Logs;
class JobQueue : public beast::Stoppable
class JobQueue
: public beast::Stoppable
, private beast::Workers::Callback
{
protected:
JobQueue (char const* name, Stoppable& parent);
public:
using JobFunction = std::function <void(Job&)>;
virtual ~JobQueue () { }
virtual void addJob (
JobType, std::string const& name, JobFunction const&) = 0;
JobQueue (beast::insight::Collector::ptr const& collector,
Stoppable& parent, beast::Journal journal, Logs& logs);
~JobQueue ();
// Jobs waiting at this priority
virtual int getJobCount (JobType t) const = 0;
void addJob (JobType type, std::string const& name, JobFunction const& func);
// Jobs waiting plus running at this priority
virtual int getJobCountTotal (JobType t) const = 0;
/** Creates a coroutine and adds a job to the queue which will run it.
// All waiting jobs at or greater than this priority
virtual int getJobCountGE (JobType t) const = 0;
@param t The type of job.
@param name Name of the job.
@param f Has a signature of void(std::shared_ptr<JobCoro>). Called when the job executes.
*/
template <class F>
void postCoro (JobType t, std::string const& name, F&& f);
virtual void shutdown () = 0;
/** Jobs waiting at this priority.
*/
int getJobCount (JobType t) const;
virtual void setThreadCount (int c, bool const standaloneMode) = 0;
/** Jobs waiting plus running at this priority.
*/
int getJobCountTotal (JobType t) const;
/** All waiting jobs at or greater than this priority.
*/
int getJobCountGE (JobType t) const;
/** Shut down the job queue without completing pending jobs.
*/
void shutdown ();
/** Set the number of thread serving the job queue to precisely this number.
*/
void setThreadCount (int c, bool const standaloneMode);
// VFALCO TODO Rename these to newLoadEventMeasurement or something similar
// since they create the object.
//
virtual LoadEvent::pointer getLoadEvent (
JobType t, std::string const& name) = 0;
LoadEvent::pointer getLoadEvent (JobType t, std::string const& name);
// VFALCO TODO Why do we need two versions, one which returns a shared
// pointer and the other which returns an autoptr?
//
virtual LoadEvent::autoptr getLoadEventAP (
JobType t, std::string const& name) = 0;
LoadEvent::autoptr getLoadEventAP (JobType t, std::string const& name);
// Add multiple load events
virtual void addLoadEvents (
JobType t, int count, std::chrono::milliseconds elapsed) = 0;
/** Add multiple load events.
*/
void addLoadEvents (JobType t, int count, std::chrono::milliseconds elapsed);
virtual bool isOverloaded () = 0;
// Cannot be const because LoadMonitor has no const methods.
bool isOverloaded ();
/** Get the Job corresponding to a thread. If no thread, use the current
thread. */
virtual Job* getJobForThread (std::thread::id const& id = {}) const = 0;
Job* getJobForThread(std::thread::id const& id = {}) const;
virtual Json::Value getJson (int c = 0) = 0;
// Cannot be const because LoadMonitor has no const methods.
Json::Value getJson (int c = 0);
private:
using JobDataMap = std::map <JobType, JobTypeData>;
beast::Journal m_journal;
mutable std::mutex m_mutex;
std::uint64_t m_lastJob;
std::set <Job> m_jobSet;
JobDataMap m_jobData;
JobTypeData m_invalidJobData;
std::map <std::thread::id, Job*> m_threadIds;
// The number of jobs currently in processTask()
int m_processCount;
beast::Workers m_workers;
Job::CancelCallback m_cancelCallback;
// Statistics tracking
beast::insight::Collector::ptr m_collector;
beast::insight::Gauge job_count;
beast::insight::Hook hook;
static JobTypes const& getJobTypes()
{
static JobTypes types;
return types;
}
void collect();
JobTypeData& getJobTypeData (JobType type);
// Signals the service stopped if the stopped condition is met.
void checkStopped (std::lock_guard <std::mutex> const& lock);
// Signals an added Job for processing.
//
// Pre-conditions:
// The JobType must be valid.
// The Job must exist in mJobSet.
// The Job must not have previously been queued.
//
// Post-conditions:
// Count of waiting jobs of that type will be incremented.
// If JobQueue exists, and has at least one thread, Job will eventually run.
//
// Invariants:
// The calling thread owns the JobLock
void queueJob (Job const& job, std::lock_guard <std::mutex> const& lock);
// Returns the next Job we should run now.
//
// RunnableJob:
// A Job in the JobSet whose slots count for its type is greater than zero.
//
// Pre-conditions:
// mJobSet must not be empty.
// mJobSet holds at least one RunnableJob
//
// Post-conditions:
// job is a valid Job object.
// job is removed from mJobQueue.
// Waiting job count of its type is decremented
// Running job count of its type is incremented
//
// Invariants:
// The calling thread owns the JobLock
void getNextJob (Job& job);
// Indicates that a running Job has completed its task.
//
// Pre-conditions:
// Job must not exist in mJobSet.
// The JobType must not be invalid.
//
// Post-conditions:
// The running count of that JobType is decremented
// A new task is signaled if there are more waiting Jobs than the limit, if any.
//
// Invariants:
// <none>
void finishJob (Job const& job);
template <class Rep, class Period>
void on_dequeue (JobType type,
std::chrono::duration <Rep, Period> const& value);
template <class Rep, class Period>
void on_execute (JobType type,
std::chrono::duration <Rep, Period> const& value);
// Runs the next appropriate waiting Job.
//
// Pre-conditions:
// A RunnableJob must exist in the JobSet
//
// Post-conditions:
// The chosen RunnableJob will have Job::doJob() called.
//
// Invariants:
// <none>
void processTask () override;
// Returns `true` if all jobs of this type should be skipped when
// the JobQueue receives a stop notification. If the job type isn't
// skipped, the Job will be called and the job must call Job::shouldCancel
// to determine if a long running or non-mandatory operation should be canceled.
bool skipOnStop (JobType type);
// Returns the limit of running jobs for the given job type.
// For jobs with no limit, we return the largest int. Hopefully that
// will be enough.
int getJobLimit (JobType type);
void onStop () override;
void onChildrenStopped () override;
};
std::unique_ptr <JobQueue>
make_JobQueue (beast::insight::Collector::ptr const& collector,
beast::Stoppable& parent, beast::Journal journal, Logs& logs);
/*
An RPC command is received and is handled via ServerHandler(HTTP) or
Handler(websocket), depending on the connection type. The handler then calls
the JobQueue::postCoro() method to create a coroutine and run it at a later
point. This frees up the handler thread and allows it to continue handling
other requests while the RPC command completes its work asynchronously.
postCoro() creates a JobCoro object. When the JobCoro ctor is called, and its
coro_ member is initialized(a boost::coroutines::pull_type), execution
automatically passes to the coroutine, which we don't want at this point,
since we are still in the handler thread context. It's important to note here
that construction of a boost pull_type automatically passes execution to the
coroutine. A pull_type object automatically generates a push_type that is
used as the as a parameter(do_yield) in the signature of the function the
pull_type was created with. This function is immediately called during coro_
construction and within it, JobCoro::yield_ is assigned the push_type
parameter(do_yield) address and called(yield()) so we can return execution
back to the caller's stack.
postCoro() then calls JobCoro::post(), which schedules a job on the job
queue to continue execution of the coroutine in a JobQueue worker thread at
some later time. When the job runs, we lock on the JobCoro::mutex_ and call
coro_ which continues where we had left off. Since we the last thing we did
in coro_ was call yield(), the next thing we continue with is calling the
function param f, that was passed into JobCoro ctor. It is within this
function body that the caller specifies what he would like to do while
running in the coroutine and allow them to suspend and resume execution.
A task that relies on other events to complete, such as path finding, calls
JobCoro::yield() to suspend its execution while waiting on those events to
complete and continue when signaled via the JobCoro::post() method.
There is a potential race condition that exists here where post() can get
called before yield() after f is called. Technically the problem only occurs
if the job that post() scheduled is executed before yield() is called.
If the post() job were to be executed before yield(), undefined behavior
would occur. The lock ensures that coro_ is not called again until we exit
the coroutine. At which point a scheduled resume() job waiting on the lock
would gain entry, harmlessly call coro_ and immediately return as we have
already completed the coroutine.
The race condition occurs as follows:
1- The coroutine is running.
2- The coroutine is about to suspend, but before it can do so, it must
arrange for some event to wake it up.
3- The coroutine arranges for some event to wake it up.
4- Before the coroutine can suspend, that event occurs and the resumption
of the coroutine is scheduled on the job queue.
5- Again, before the coroutine can suspend, the resumption of the coroutine
is dispatched.
6- Again, before the coroutine can suspend, the resumption code runs the
coroutine.
The coroutine is now running in two threads.
The lock prevents this from happening as step 6 will block until the
lock is released which only happens after the coroutine completes.
*/
} // ripple
#include <ripple/core/JobCoro.ipp>
namespace ripple {
template <class F>
void JobQueue::postCoro (JobType t, std::string const& name, F&& f)
{
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<JobCoro>).
*/
auto const coro = std::make_shared<JobCoro>(
detail::JobCoro_create_t{}, *this, t, name, std::forward<F>(f));
coro->post();
}
}

View File

@@ -22,6 +22,7 @@
#include <ripple/basics/Log.h>
#include <ripple/core/JobTypeInfo.h>
#include <beast/insight/Collector.h>
namespace ripple
{

View File

@@ -32,48 +32,9 @@
namespace ripple {
class JobQueueImp
: public JobQueue
, private beast::Workers::Callback
{
public:
using JobSet = std::set <Job>;
using JobDataMap = std::map <JobType, JobTypeData>;
using ScopedLock = std::lock_guard <std::mutex>;
using ThreadIdMap = std::map <std::thread::id, Job*>;
beast::Journal m_journal;
mutable std::mutex m_mutex;
std::uint64_t m_lastJob;
JobSet m_jobSet;
JobDataMap m_jobData;
JobTypeData m_invalidJobData;
ThreadIdMap m_threadIds;
// The number of jobs currently in processTask()
int m_processCount;
beast::Workers m_workers;
Job::CancelCallback m_cancelCallback;
// statistics tracking
beast::insight::Collector::ptr m_collector;
beast::insight::Gauge job_count;
beast::insight::Hook hook;
//--------------------------------------------------------------------------
static JobTypes const& getJobTypes ()
{
static JobTypes types;
return types;
}
//--------------------------------------------------------------------------
JobQueueImp (beast::insight::Collector::ptr const& collector,
JobQueue::JobQueue (beast::insight::Collector::ptr const& collector,
Stoppable& parent, beast::Journal journal, Logs& logs)
: JobQueue ("JobQueue", parent)
: Stoppable ("JobQueue", parent)
, m_journal (journal)
, m_lastJob (0)
, m_invalidJobData (getJobTypes ().getInvalid (), collector, logs)
@@ -82,12 +43,11 @@ public:
, m_cancelCallback (std::bind (&Stoppable::isStopping, this))
, m_collector (collector)
{
hook = m_collector->make_hook (std::bind (
&JobQueueImp::collect, this));
hook = m_collector->make_hook (std::bind (&JobQueue::collect, this));
job_count = m_collector->make_gauge ("job_count");
{
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
for (auto const& x : getJobTypes ())
{
@@ -103,26 +63,27 @@ public:
}
}
~JobQueueImp () override
JobQueue::~JobQueue ()
{
// Must unhook before destroying
hook = beast::insight::Hook ();
}
void collect ()
void
JobQueue::collect ()
{
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
job_count = m_jobSet.size ();
}
void addJob (
JobType type, std::string const& name, JobFunction const& func) override
void
JobQueue::addJob (JobType type, std::string const& name,
JobFunction const& func)
{
assert (type != jtINVALID);
JobDataMap::iterator iter (m_jobData.find (type));
auto iter (m_jobData.find (type));
assert (iter != m_jobData.end ());
if (iter == m_jobData.end ())
return;
@@ -144,7 +105,7 @@ public:
// OR
// * Not all children are stopped
//
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
assert (! isStopped() && (
m_processCount>0 ||
! m_jobSet.empty () ||
@@ -162,7 +123,7 @@ public:
}
{
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
std::pair <std::set <Job>::iterator, bool> result (
m_jobSet.insert (Job (type, name, ++m_lastJob,
@@ -171,9 +132,10 @@ public:
}
}
int getJobCount (JobType t) const override
int
JobQueue::getJobCount (JobType t) const
{
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
JobDataMap::const_iterator c = m_jobData.find (t);
@@ -182,9 +144,10 @@ public:
: c->second.waiting;
}
int getJobCountTotal (JobType t) const override
int
JobQueue::getJobCountTotal (JobType t) const
{
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
JobDataMap::const_iterator c = m_jobData.find (t);
@@ -193,12 +156,13 @@ public:
: (c->second.waiting + c->second.running);
}
int getJobCountGE (JobType t) const override
int
JobQueue::getJobCountGE (JobType t) const
{
// return the number of jobs at this priority level or greater
int ret = 0;
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
for (auto const& x : m_jobData)
{
@@ -209,17 +173,16 @@ public:
return ret;
}
// shut down the job queue without completing pending jobs
//
void shutdown () override
void
JobQueue::shutdown ()
{
m_journal.info << "Job queue shutting down";
m_workers.pauseAllThreadsAndWait ();
}
// set the number of thread serving the job queue to precisely this number
void setThreadCount (int c, bool const standaloneMode) override
void
JobQueue::setThreadCount (int c, bool const standaloneMode)
{
if (standaloneMode)
{
@@ -237,8 +200,8 @@ public:
m_workers.setNumberOfThreads (c);
}
LoadEvent::pointer getLoadEvent (
JobType t, std::string const& name) override
LoadEvent::pointer
JobQueue::getLoadEvent (JobType t, std::string const& name)
{
JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ());
@@ -250,8 +213,8 @@ public:
std::ref (iter-> second.load ()), name, true);
}
LoadEvent::autoptr getLoadEventAP (
JobType t, std::string const& name) override
LoadEvent::autoptr
JobQueue::getLoadEventAP (JobType t, std::string const& name)
{
JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ());
@@ -262,16 +225,17 @@ public:
return std::make_unique<LoadEvent> (iter-> second.load (), name, true);
}
void addLoadEvents (JobType t,
int count, std::chrono::milliseconds elapsed) override
void
JobQueue::addLoadEvents (JobType t, int count,
std::chrono::milliseconds elapsed)
{
JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ());
iter->second.load().addSamples (count, elapsed);
}
// Cannot be const because LoadMonitor has no const methods.
bool isOverloaded () override
bool
JobQueue::isOverloaded ()
{
int count = 0;
@@ -284,8 +248,8 @@ public:
return count > 0;
}
// Cannot be const because LoadMonitor has no const methods.
Json::Value getJson (int) override
Json::Value
JobQueue::getJson (int c)
{
Json::Value ret (Json::objectValue);
@@ -293,7 +257,7 @@ public:
Json::Value priorities = Json::arrayValue;
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
for (auto& x : m_jobData)
{
@@ -341,18 +305,18 @@ public:
return ret;
}
Job* getJobForThread (std::thread::id const& id) const override
Job*
JobQueue::getJobForThread (std::thread::id const& id) const
{
auto tid = (id == std::thread::id()) ? std::this_thread::get_id() : id;
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
auto i = m_threadIds.find (tid);
return (i == m_threadIds.end()) ? nullptr : i->second;
}
private:
//--------------------------------------------------------------------------
JobTypeData& getJobTypeData (JobType type)
JobTypeData&
JobQueue::getJobTypeData (JobType type)
{
JobDataMap::iterator c (m_jobData.find (type));
assert (c != m_jobData.end ());
@@ -365,11 +329,8 @@ private:
return c->second;
}
//--------------------------------------------------------------------------
// Signals the service stopped if the stopped condition is met.
//
void checkStopped (ScopedLock const& lock)
void
JobQueue::checkStopped (std::lock_guard <std::mutex> const& lock)
{
// We are stopped when all of the following are true:
//
@@ -387,23 +348,8 @@ private:
}
}
//--------------------------------------------------------------------------
//
// Signals an added Job for processing.
//
// Pre-conditions:
// The JobType must be valid.
// The Job must exist in mJobSet.
// The Job must not have previously been queued.
//
// Post-conditions:
// Count of waiting jobs of that type will be incremented.
// If JobQueue exists, and has at least one thread, Job will eventually run.
//
// Invariants:
// The calling thread owns the JobLock
//
void queueJob (Job const& job, ScopedLock const& lock)
void
JobQueue::queueJob (Job const& job, std::lock_guard <std::mutex> const& lock)
{
JobType const type (job.getType ());
assert (type != jtINVALID);
@@ -424,31 +370,12 @@ private:
++data.waiting;
}
//------------------------------------------------------------------------------
//
// Returns the next Job we should run now.
//
// RunnableJob:
// A Job in the JobSet whose slots count for its type is greater than zero.
//
// Pre-conditions:
// mJobSet must not be empty.
// mJobSet holds at least one RunnableJob
//
// Post-conditions:
// job is a valid Job object.
// job is removed from mJobQueue.
// Waiting job count of its type is decremented
// Running job count of its type is incremented
//
// Invariants:
// The calling thread owns the JobLock
//
void getNextJob (Job& job)
void
JobQueue::getNextJob (Job& job)
{
assert (! m_jobSet.empty ());
JobSet::const_iterator iter;
std::set <Job>::const_iterator iter;
for (iter = m_jobSet.begin (); iter != m_jobSet.end (); ++iter)
{
JobTypeData& data (getJobTypeData (iter->getType ()));
@@ -479,22 +406,8 @@ private:
++data.running;
}
//------------------------------------------------------------------------------
//
// Indicates that a running Job has completed its task.
//
// Pre-conditions:
// Job must not exist in mJobSet.
// The JobType must not be invalid.
//
// Post-conditions:
// The running count of that JobType is decremented
// A new task is signaled if there are more waiting Jobs than the limit, if any.
//
// Invariants:
// <none>
//
void finishJob (Job const& job)
void
JobQueue::finishJob (Job const& job)
{
JobType const type = job.getType ();
@@ -519,9 +432,8 @@ private:
--data.running;
}
//--------------------------------------------------------------------------
template <class Rep, class Period>
void on_dequeue (JobType type,
void JobQueue::on_dequeue (JobType type,
std::chrono::duration <Rep, Period> const& value)
{
auto const ms (ceil <std::chrono::milliseconds> (value));
@@ -531,7 +443,7 @@ private:
}
template <class Rep, class Period>
void on_execute (JobType type,
void JobQueue::on_execute (JobType type,
std::chrono::duration <Rep, Period> const& value)
{
auto const ms (ceil <std::chrono::milliseconds> (value));
@@ -540,25 +452,13 @@ private:
getJobTypeData (type).execute.notify (ms);
}
//--------------------------------------------------------------------------
//
// Runs the next appropriate waiting Job.
//
// Pre-conditions:
// A RunnableJob must exist in the JobSet
//
// Post-conditions:
// The chosen RunnableJob will have Job::doJob() called.
//
// Invariants:
// <none>
//
void processTask () override
void
JobQueue::processTask ()
{
Job job;
{
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
getNextJob (job);
++m_processCount;
}
@@ -586,7 +486,7 @@ private:
}
{
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
finishJob (job);
--m_processCount;
checkStopped (lock);
@@ -596,13 +496,8 @@ private:
// to the associated LoadEvent object (in the Job) may be destroyed.
}
//------------------------------------------------------------------------------
// Returns `true` if all jobs of this type should be skipped when
// the JobQueue receives a stop notification. If the job type isn't
// skipped, the Job will be called and the job must call Job::shouldCancel
// to determine if a long running or non-mandatory operation should be canceled.
bool skipOnStop (JobType type)
bool
JobQueue::skipOnStop (JobType type)
{
JobTypeInfo const& j (getJobTypes ().get (type));
assert (j.type () != jtINVALID);
@@ -610,11 +505,8 @@ private:
return j.skip ();
}
// Returns the limit of running jobs for the given job type.
// For jobs with no limit, we return the largest int. Hopefully that
// will be enough.
//
int getJobLimit (JobType type)
int
JobQueue::getJobLimit (JobType type)
{
JobTypeInfo const& j (getJobTypes ().get (type));
assert (j.type () != jtINVALID);
@@ -622,9 +514,8 @@ private:
return j.limit ();
}
//--------------------------------------------------------------------------
void onStop () override
void
JobQueue::onStop ()
{
// VFALCO NOTE I wanted to remove all the jobs that are skippable
// but then the Workers count of tasks to process
@@ -632,14 +523,14 @@ private:
/*
{
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
// Remove all jobs whose type is skipOnStop
using JobDataMap = hash_map <JobType, std::size_t>;
JobDataMap counts;
bool const report (m_journal.debug.active());
for (JobSet::const_iterator iter (m_jobSet.begin());
for (std::set <Job>::const_iterator iter (m_jobSet.begin());
iter != m_jobSet.end();)
{
if (skipOnStop (iter->getType()))
@@ -676,28 +567,11 @@ private:
*/
}
void onChildrenStopped () override
void
JobQueue::onChildrenStopped ()
{
ScopedLock lock (m_mutex);
std::lock_guard <std::mutex> lock (m_mutex);
checkStopped (lock);
}
};
//------------------------------------------------------------------------------
JobQueue::JobQueue (char const* name, Stoppable& parent)
: Stoppable (name, parent)
{
}
//------------------------------------------------------------------------------
std::unique_ptr <JobQueue> make_JobQueue (
beast::insight::Collector::ptr const& collector,
beast::Stoppable& parent, beast::Journal journal, Logs& logs)
{
return std::make_unique <JobQueueImp> (collector, parent, journal, logs);
}
}

View File

@@ -0,0 +1,117 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/core/JobCoro.h>
#include <ripple/core/JobQueue.h>
#include <ripple/test/jtx.h>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
namespace ripple {
namespace test {
class Coroutine_test : public beast::unit_test::suite
{
public:
void
test_coroutine()
{
using namespace std::chrono_literals;
using namespace jtx;
Env env(*this);
std::atomic<int> i{0};
std::condition_variable cv;
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, false);
jq.postCoro(jtCLIENT, "Coroutine-Test",
[&](std::shared_ptr<JobCoro> jc)
{
std::thread t(
[&i, jc]()
{
std::this_thread::sleep_for(20ms);
++i;
jc->post();
});
jc->yield();
t.join();
++i;
cv.notify_one();
});
{
std::mutex m;
std::unique_lock<std::mutex> lk(m);
expect(cv.wait_for(lk, 1s,
[&]()
{
return i == 2;
}));
}
jq.shutdown();
expect(i == 2);
}
void
test_incorrect_order()
{
using namespace std::chrono_literals;
using namespace jtx;
Env env(*this);
std::atomic<int> i{0};
std::condition_variable cv;
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, false);
jq.postCoro(jtCLIENT, "Coroutine-Test",
[&](std::shared_ptr<JobCoro> jc)
{
jc->post();
jc->yield();
++i;
cv.notify_one();
});
{
std::mutex m;
std::unique_lock<std::mutex> lk(m);
expect(cv.wait_for(lk, 1s,
[&]()
{
return i == 1;
}));
}
jq.shutdown();
expect(i == 1);
}
void
run()
{
test_coroutine();
test_incorrect_order();
}
};
BEAST_DEFINE_TESTSUITE(Coroutine,core,ripple);
} // test
} // ripple

View File

@@ -21,8 +21,8 @@
#define RIPPLE_RPC_CONTEXT_H_INCLUDED
#include <ripple/core/Config.h>
#include <ripple/core/JobCoro.h>
#include <ripple/net/InfoSub.h>
#include <ripple/rpc/Yield.h>
#include <ripple/server/Role.h>
#include <ripple/nodestore/ScopedMetrics.h>
@@ -46,7 +46,7 @@ struct Context
NetworkOPs& netOps;
LedgerMaster& ledgerMaster;
Role role;
JobQueueSuspender suspend;
std::shared_ptr<JobCoro> jobCoro;
InfoSub::pointer infoSub;
NodeStore::ScopedMetrics metrics;
};
@@ -54,6 +54,4 @@ struct Context
} // RPC
} // ripple
#endif

View File

@@ -29,13 +29,12 @@ namespace ripple {
namespace RPC {
struct Context;
struct YieldStrategy;
/** Execute an RPC command and store the results in a Json::Value. */
Status doCommand (RPC::Context&, Json::Value&, YieldStrategy const& s = {});
Status doCommand (RPC::Context&, Json::Value&);
/** Execute an RPC command and store the results in an std::string. */
void executeRPC (RPC::Context&, std::string&, YieldStrategy const& s = {});
void executeRPC (RPC::Context&, std::string&);
Role roleRequired (std::string const& method );

View File

@@ -1,135 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_RPC_YIELD_H_INCLUDED
#define RIPPLE_RPC_YIELD_H_INCLUDED
#include <ripple/core/JobQueue.h>
#include <ripple/json/Output.h>
#include <beast/win32_workaround.h>
#include <boost/coroutine/all.hpp>
#include <functional>
namespace ripple {
class Application;
class BasicConfig;
class JobQueue;
class Section;
namespace RPC {
/** See the README.md in this directory for more information about how
the RPC yield mechanism works.
*/
/** Callback: do something and eventually return. Can't be empty. */
using Callback = std::function <void ()>;
/** Continuation: do something, guarantee to eventually call Callback.
Can't be empty. */
using Continuation = std::function <void (Callback const&)>;
/** Suspend: suspend execution, pending completion of a Continuation.
Can't be empty. */
using Suspend = std::function <void (Continuation const&)>;
/** A non-empty Suspend that immediately calls its callback. */
extern
Suspend const dontSuspend;
/** Wrap an Output so it yields after approximately `chunkSize` bytes.
chunkedYieldingOutput() only yields after a call to output(), so there might
more than chunkSize bytes sent between calls to yield().
chunkedYieldingOutput() also only yields before it's about to output more
data. This is to avoid the case where you yield after outputting data, but
then never send more data.
*/
Json::Output chunkedYieldingOutput (
Json::Output const&, Callback const&, std::size_t chunkSize);
/** Yield every yieldCount calls. If yieldCount is 0, never yield. */
class CountedYield
{
public:
CountedYield (std::size_t yieldCount, Callback const& yield);
void yield();
private:
std::size_t count_ = 0;
std::size_t const yieldCount_;
Callback const yield_;
};
enum class UseCoroutines {no, yes};
/** When do we yield when performing a ledger computation? */
struct YieldStrategy
{
enum class Streaming {no, yes};
/** Is the data streamed, or generated monolithically? */
Streaming streaming = Streaming::no;
/** Are results generated in a coroutine? If this is no, then the code can
never yield. */
UseCoroutines useCoroutines = UseCoroutines::no;
/** How many accounts do we process before yielding? 0 means "never yield
due to number of accounts processed." */
std::size_t accountYieldCount = 0;
/** How many transactions do we process before yielding? 0 means "never
yield due to number of transactions processed." */
std::size_t transactionYieldCount = 0;
};
/** Does a BasicConfig require the use of coroutines? */
UseCoroutines useCoroutines(BasicConfig const&);
/** Create a yield strategy from a BasicConfig. */
YieldStrategy makeYieldStrategy(BasicConfig const&);
/** JobQueueSuspender is a suspend, with a yield that reschedules the job
on the job queue. */
struct JobQueueSuspender
{
/** Possibly suspend current execution. */
Suspend const suspend;
/** Possibly yield and restart on the job queue. */
Callback const yield;
/** Create a JobQueueSuspender where yield does nothing and the suspend
immediately executes the continuation. */
JobQueueSuspender(Application&);
/** Create a JobQueueSuspender with a Suspend.
When yield is called, it reschedules the current job on the JobQueue
with the given jobName. */
JobQueueSuspender(Application&, Suspend const&, std::string const& jobName);
};
} // RPC
} // ripple
#endif

View File

@@ -29,7 +29,6 @@
#include <ripple/protocol/JsonFields.h>
#include <ripple/rpc/Context.h>
#include <ripple/rpc/Status.h>
#include <ripple/rpc/Yield.h>
#include <ripple/rpc/impl/Handler.h>
#include <ripple/server/Role.h>
@@ -87,22 +86,21 @@ private:
template <class Object>
void LedgerHandler::writeResult (Object& value)
{
auto& yield = context_.suspend.yield;
if (ledger_)
{
Json::copyFrom (value, result_);
addJson (value, {*ledger_, options_, yield});
addJson (value, {*ledger_, options_});
}
else
{
auto& master = context_.app.getLedgerMaster ();
{
auto&& closed = Json::addObject (value, jss::closed);
addJson (closed, {*master.getClosedLedger(), 0, yield});
addJson (closed, {*master.getClosedLedger(), 0});
}
{
auto&& open = Json::addObject (value, jss::open);
addJson (open, {*master.getCurrentLedger(), 0, yield});
addJson (open, {*master.getCurrentLedger(), 0});
}
}
}

View File

@@ -73,10 +73,6 @@ Json::Value doRipplePathFind (RPC::Context& context)
if (context.app.config().PATH_SEARCH_MAX == 0)
return rpcError (rpcNOT_SUPPORTED);
RPC::LegacyPathFind lpf (context.role == Role::ADMIN, context.app);
if (!lpf.isOk ())
return rpcError (rpcTOO_BUSY);
context.loadType = Resource::feeHighBurdenRPC;
AccountID raSrc;
@@ -86,8 +82,7 @@ Json::Value doRipplePathFind (RPC::Context& context)
Json::Value jvResult;
if (true || // TODO MPORTILLA temp fix to disable broken websocket coroutines
context.app.config().RUN_STANDALONE ||
if (context.app.config().RUN_STANDALONE ||
context.params.isMember(jss::ledger) ||
context.params.isMember(jss::ledger_index) ||
context.params.isMember(jss::ledger_hash))
@@ -105,27 +100,26 @@ Json::Value doRipplePathFind (RPC::Context& context)
return rpcError (rpcNO_NETWORK);
}
PathRequest::pointer request;
context.loadType = Resource::feeHighBurdenRPC;
lpLedger = context.ledgerMaster.getClosedLedger();
PathRequest::pointer request;
context.suspend.suspend(
[&request, &context, &jvResult, &lpLedger]
(RPC::Callback const& callback)
{
jvResult = context.app.getPathRequests().makeLegacyPathRequest (
request, callback, lpLedger, context.params);
assert(callback);
if (! request && callback)
callback();
});
request, std::bind(&JobCoro::post, context.jobCoro),
lpLedger, context.params);
if (request)
{
context.jobCoro->yield();
jvResult = request->doStatus (context.params);
}
return jvResult;
}
RPC::LegacyPathFind lpf (context.role == Role::ADMIN, context.app);
if (! lpf.isOk ())
return rpcError (rpcTOO_BUSY);
if (! context.params.isMember (jss::source_account))
{
jvResult = rpcError (rpcSRC_ACT_MISSING);

View File

@@ -1,82 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/rpc/Coroutine.h>
#include <ripple/rpc/tests/TestOutputSuite.test.h>
#include <iostream>
namespace ripple {
namespace RPC {
namespace {
using CoroutineType = Continuation;
using BoostCoroutine = boost::coroutines::asymmetric_coroutine<CoroutineType>;
using Pull = BoostCoroutine::pull_type;
using Push = BoostCoroutine::push_type;
void runOnCoroutineImpl(std::shared_ptr<Pull> pull)
{
while (*pull)
{
(*pull)();
if (! *pull)
return;
if (auto continuation = pull->get())
{
continuation ([pull] () { runOnCoroutineImpl(pull); });
return;
}
}
}
} // namespace
void runOnCoroutine(Coroutine const& coroutine)
{
auto pullFunction = [coroutine] (Push& push)
{
Suspend suspend = [&push] (CoroutineType const& cbc)
{
if (push)
push (cbc);
};
// Run once doing nothing, to get the other side started.
suspend([] (Callback const& callback) { callback(); });
// Now run the coroutine.
coroutine(suspend);
};
runOnCoroutineImpl(std::make_shared<Pull>(pullFunction));
}
void runOnCoroutine(UseCoroutines useCoroutines, Coroutine const& coroutine)
{
if (useCoroutines == UseCoroutines::yes)
runOnCoroutine(coroutine);
else
coroutine(dontSuspend);
}
} // RPC
} // ripple

View File

@@ -20,7 +20,6 @@
#include <BeastConfig.h>
#include <ripple/app/main/Application.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/rpc/Yield.h>
#include <ripple/rpc/impl/Tuning.h>
#include <ripple/rpc/impl/Handler.h>
#include <ripple/app/main/Application.h>
@@ -223,7 +222,7 @@ void getResult (
} // namespace
Status doCommand (
RPC::Context& context, Json::Value& result, YieldStrategy const&)
RPC::Context& context, Json::Value& result)
{
boost::optional <Handler const&> handler;
if (auto error = fillHandler (context, handler))
@@ -240,7 +239,7 @@ Status doCommand (
/** Execute an RPC command and store the results in a string. */
void executeRPC (
RPC::Context& context, std::string& output, YieldStrategy const& strategy)
RPC::Context& context, std::string& output)
{
boost::optional <Handler const&> handler;
if (auto error = fillHandler (context, handler))
@@ -258,9 +257,6 @@ void executeRPC (
{
auto object = Json::Value (Json::objectValue);
getResult (context, method, object, handler->name_);
if (strategy.streaming == YieldStrategy::Streaming::yes)
output = jsonAsString (object);
else
output = to_string (object);
}
else

View File

@@ -1,131 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/app/main/Application.h>
#include <ripple/basics/BasicConfig.h>
#include <ripple/rpc/Yield.h>
#include <ripple/rpc/tests/TestOutputSuite.test.h>
namespace ripple {
namespace RPC {
static
UseCoroutines defaultUseCoroutines = UseCoroutines::no;
Suspend const dontSuspend = [] (Continuation const& continuation)
{
continuation([] () {});
};
namespace {
void runOnJobQueue(
Application& app, std::string const& name, Callback const& callback)
{
auto cb = [callback] (Job&) { callback(); };
app.getJobQueue().addJob(jtCLIENT, name, cb);
};
Callback suspendForJobQueue(
Application& app, Suspend const& suspend, std::string const& jobName)
{
assert(suspend);
return Callback( [suspend, jobName, &app] () {
suspend([jobName, &app] (Callback const& callback) {
runOnJobQueue(app, jobName, callback);
});
});
}
} // namespace
Json::Output chunkedYieldingOutput (
Json::Output const& output, Callback const& yield, std::size_t chunkSize)
{
if (!yield)
return output;
auto count = std::make_shared <std::size_t> (0);
return [chunkSize, count, output, yield] (boost::string_ref const& bytes)
{
if (*count > chunkSize)
{
yield();
*count = 0;
}
output (bytes);
*count += bytes.size();
};
}
CountedYield::CountedYield (std::size_t yieldCount, Callback const& yield)
: yieldCount_ (yieldCount), yield_ (yield)
{
}
void CountedYield::yield()
{
if (yieldCount_ && yield_)
{
if (++count_ >= yieldCount_)
{
yield_();
count_ = 0;
}
}
}
UseCoroutines useCoroutines(BasicConfig const& config)
{
if (auto use = config["section"].get<bool>("use_coroutines"))
return *use ? UseCoroutines::yes : UseCoroutines::no;
return defaultUseCoroutines;
}
YieldStrategy makeYieldStrategy (BasicConfig const& config)
{
auto s = config["section"];
YieldStrategy ys;
ys.streaming = get<bool> (s, "streaming") ?
YieldStrategy::Streaming::yes :
YieldStrategy::Streaming::no;
ys.useCoroutines = useCoroutines(config);
ys.accountYieldCount = get<std::size_t> (s, "account_yield_count");
ys.transactionYieldCount = get<std::size_t> (s, "transaction_yield_count");
return ys;
}
JobQueueSuspender::JobQueueSuspender(
Application& app, Suspend const& susp, std::string const& jobName)
: suspend(susp ? susp : dontSuspend),
yield(suspendForJobQueue(app, suspend, jobName))
{
// There's a non-empty jobName exactly if there's a non-empty Suspend.
assert(!(susp && jobName.empty()));
}
JobQueueSuspender::JobQueueSuspender(Application &app)
: JobQueueSuspender(app, {}, {})
{
}
} // RPC
} // ripple

View File

@@ -1,133 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/rpc/Coroutine.h>
#include <ripple/rpc/Yield.h>
#include <ripple/rpc/tests/TestOutputSuite.test.h>
namespace ripple {
namespace RPC {
class Coroutine_test : public TestOutputSuite
{
public:
using Strings = std::vector <std::string>;
void test (int chunkSize, Strings const& expected)
{
auto name = std::to_string (chunkSize);
setup (name);
std::string buffer;
Json::Output output = Json::stringOutput (buffer);
auto makeContinuation = [&] (std::string const& data) {
return Continuation ([=] (Callback const& cb) {
output (data + " ");
cb();
});
};
Strings result;
Coroutine coroutine ([&] (Suspend const& suspend)
{
Callback yield ([=] () { suspend (makeContinuation ("*")); });
auto out = chunkedYieldingOutput (output, yield, chunkSize);
out ("hello ");
result.push_back (buffer);
suspend (makeContinuation("HELLO"));
result.push_back (buffer);
out ("there ");
result.push_back (buffer);
suspend (makeContinuation("THERE"));
result.push_back (buffer);
out ("world ");
result.push_back (buffer);
suspend (makeContinuation("WORLD"));
result.push_back (buffer);
});
runOnCoroutine(UseCoroutines::yes, coroutine);
expectCollectionEquals (result, expected);
}
void run() override
{
test (0, {"hello ",
"hello HELLO ",
"hello HELLO * there ",
"hello HELLO * there THERE ",
"hello HELLO * there THERE * world ",
"hello HELLO * there THERE * world WORLD "
});
test (3, {"hello ",
"hello HELLO ",
"hello HELLO * there ",
"hello HELLO * there THERE ",
"hello HELLO * there THERE * world ",
"hello HELLO * there THERE * world WORLD "
});
test (5, {"hello ",
"hello HELLO ",
"hello HELLO * there ",
"hello HELLO * there THERE ",
"hello HELLO * there THERE * world ",
"hello HELLO * there THERE * world WORLD "
});
test (7, {"hello ",
"hello HELLO ",
"hello HELLO there ",
"hello HELLO there THERE ",
"hello HELLO there THERE * world ",
"hello HELLO there THERE * world WORLD "
});
test (10, {"hello ",
"hello HELLO ",
"hello HELLO there ",
"hello HELLO there THERE ",
"hello HELLO there THERE * world ",
"hello HELLO there THERE * world WORLD "
});
test (13, {"hello ",
"hello HELLO ",
"hello HELLO there ",
"hello HELLO there THERE ",
"hello HELLO there THERE world ",
"hello HELLO there THERE world WORLD "
});
test (15, {"hello ",
"hello HELLO ",
"hello HELLO there ",
"hello HELLO there THERE ",
"hello HELLO there THERE world ",
"hello HELLO there THERE world WORLD "
});
}
};
BEAST_DEFINE_TESTSUITE(Coroutine, RPC, ripple);
} // RPC
} // ripple

View File

@@ -1,107 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/rpc/Yield.h>
#include <ripple/rpc/tests/TestOutputSuite.test.h>
namespace ripple {
namespace RPC {
struct Yield_test : TestOutputSuite
{
void chunkedYieldingTest ()
{
setup ("chunkedYieldingTest");
std::string lastYield;
auto yield = [&]() { lastYield = output_; };
auto output = chunkedYieldingOutput (
Json::stringOutput (output_), yield, 5);
output ("hello");
expectResult ("hello");
expectEquals (lastYield, "");
output (", th"); // Goes over the boundary.
expectResult ("hello, th");
expectEquals (lastYield, "");
output ("ere!"); // Forces a yield.
expectResult ("hello, there!");
expectEquals (lastYield, "hello, th");
output ("!!");
expectResult ("hello, there!!!");
expectEquals (lastYield, "hello, th");
output (""); // Forces a yield.
expectResult ("hello, there!!!");
expectEquals (lastYield, "hello, there!!!");
}
void trivialCountedYieldTest()
{
setup ("trivialCountedYield");
auto didYield = false;
auto yield = [&]() { didYield = true; };
CountedYield cy (0, yield);
for (auto i = 0; i < 4; ++i)
{
cy.yield();
expect (!didYield, "We yielded when we shouldn't have.");
}
}
void countedYieldTest()
{
setup ("countedYield");
auto didYield = false;
auto yield = [&]() { didYield = true; };
CountedYield cy (5, yield);
for (auto j = 0; j < 3; ++j)
{
for (auto i = 0; i < 4; ++i)
{
cy.yield();
expect (!didYield, "We yielded when we shouldn't have.");
}
cy.yield();
expect (didYield, "We didn't yield");
didYield = false;
}
}
void run () override
{
chunkedYieldingTest();
trivialCountedYieldTest();
countedYieldTest();
}
};
BEAST_DEFINE_TESTSUITE(Yield, ripple_basics, ripple);
} // RPC
} // ripple

View File

@@ -23,7 +23,6 @@
#include <ripple/basics/BasicConfig.h>
#include <ripple/server/Port.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/rpc/Yield.h>
#include <beast/utility/Journal.h>
#include <beast/utility/PropertyStream.h>
#include <boost/asio/ip/address.hpp>
@@ -67,7 +66,6 @@ public:
};
overlay_t overlay;
RPC::YieldStrategy yieldStrategy;
void
makeContexts();

View File

@@ -32,7 +32,6 @@
#include <ripple/overlay/Overlay.h>
#include <ripple/resource/ResourceManager.h>
#include <ripple/resource/Fees.h>
#include <ripple/rpc/Coroutine.h>
#include <ripple/rpc/impl/Tuning.h>
#include <beast/crypto/base64.h>
#include <ripple/rpc/RPCHandler.h>
@@ -179,17 +178,11 @@ ServerHandlerImp::onRequest (HTTP::Session& session)
return;
}
auto detach = session.detach();
// We can copy `this` because ServerHandlerImp is a long-lasting singleton.
auto job = [this, detach] (Job&) {
RPC::runOnCoroutine(
setup_.yieldStrategy.useCoroutines,
[this, detach] (RPC::Suspend const& suspend) {
processSession(detach, suspend);
m_jobQueue.postCoro(jtCLIENT, "RPC-Client",
[this, detach = session.detach()](std::shared_ptr<JobCoro> jc)
{
processSession(detach, jc);
});
};
m_jobQueue.addJob(jtCLIENT, "RPC-Client", job);
}
void
@@ -208,15 +201,11 @@ ServerHandlerImp::onStopped (HTTP::Server&)
// Run as a couroutine.
void
ServerHandlerImp::processSession (
std::shared_ptr<HTTP::Session> const& session, Suspend const& suspend)
ServerHandlerImp::processSession (std::shared_ptr<HTTP::Session> const& session,
std::shared_ptr<JobCoro> jobCoro)
{
processRequest (
session->port(),
to_string (session->body()),
session->remoteAddress().at_port (0),
makeOutput (*session),
suspend);
processRequest (session->port(), to_string (session->body()),
session->remoteAddress().at_port (0), makeOutput (*session), jobCoro);
if (session->request().keep_alive())
session->complete();
@@ -225,12 +214,9 @@ ServerHandlerImp::processSession (
}
void
ServerHandlerImp::processRequest (
HTTP::Port const& port,
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&& output,
Suspend const& suspend)
ServerHandlerImp::processRequest (HTTP::Port const& port,
std::string const& request, beast::IP::Endpoint const& remoteIPAddress,
Output&& output, std::shared_ptr<JobCoro> jobCoro)
{
auto rpcJ = app_.journal ("RPC");
// Move off the webserver thread onto the JobQueue.
@@ -352,20 +338,10 @@ ServerHandlerImp::processRequest (
auto const start (std::chrono::high_resolution_clock::now ());
RPC::Context context {
m_journal, params, app_, loadType, m_networkOPs, app_.getLedgerMaster(), role,
{app_, suspend, "RPC-Coroutine"}};
std::string response;
if (setup_.yieldStrategy.streaming == RPC::YieldStrategy::Streaming::yes)
{
executeRPC (context, response, setup_.yieldStrategy);
}
else
{
RPC::Context context {m_journal, params, app_, loadType, m_networkOPs,
app_.getLedgerMaster(), role, jobCoro};
Json::Value result;
RPC::doCommand (context, result, setup_.yieldStrategy);
RPC::doCommand (context, result);
// Always report "status". On an error report the request as received.
if (result.isMember (jss::error))
@@ -383,8 +359,7 @@ ServerHandlerImp::processRequest (
Json::Value reply (Json::objectValue);
reply[jss::result] = std::move (result);
response = to_string (reply);
}
auto response = to_string (reply);
rpc_time_.notify (static_cast <beast::insight::Event::value_type> (
std::chrono::duration_cast <std::chrono::milliseconds> (
@@ -747,7 +722,6 @@ setup_ServerHandler(BasicConfig const& config, std::ostream& log)
{
ServerHandler::Setup setup;
setup.ports = detail::parse_Ports(config, log);
setup.yieldStrategy = RPC::makeYieldStrategy(config);
detail::setup_Client(setup);
detail::setup_Overlay(setup);

View File

@@ -21,6 +21,7 @@
#define RIPPLE_SERVER_SERVERHANDLERIMP_H_INCLUDED
#include <ripple/core/Job.h>
#include <ripple/core/JobCoro.h>
#include <ripple/json/Output.h>
#include <ripple/server/ServerHandler.h>
#include <ripple/server/Session.h>
@@ -57,7 +58,6 @@ public:
private:
using Output = Json::Output;
using Suspend = RPC::Suspend;
void
setup (Setup const& setup, beast::Journal journal) override;
@@ -109,11 +109,13 @@ private:
//--------------------------------------------------------------------------
void
processSession (std::shared_ptr<HTTP::Session> const&, Suspend const&);
processSession (std::shared_ptr<HTTP::Session> const&,
std::shared_ptr<JobCoro> jobCoro);
void
processRequest (HTTP::Port const& port, std::string const& request,
beast::IP::Endpoint const& remoteIPAddress, Output&&, Suspend const&);
beast::IP::Endpoint const& remoteIPAddress, Output&&,
std::shared_ptr<JobCoro> jobCoro);
//
// PropertyStream

View File

@@ -29,5 +29,6 @@
#include <ripple/core/impl/SNTPClock.cpp>
#include <ripple/core/impl/TimeKeeper.cpp>
#include <ripple/core/tests/LoadFeeTrack.test.cpp>
#include <ripple/core/tests/Config.test.cpp>
#include <ripple/core/tests/Coroutine.test.cpp>
#include <ripple/core/tests/LoadFeeTrack.test.cpp>

View File

@@ -26,10 +26,8 @@
#include <ripple/rpc/RPCHandler.h>
#include <ripple/rpc/impl/Coroutine.cpp>
#include <ripple/rpc/impl/RPCHandler.cpp>
#include <ripple/rpc/impl/Status.cpp>
#include <ripple/rpc/impl/Yield.cpp>
#include <ripple/rpc/impl/Utilities.cpp>
#include <ripple/rpc/handlers/Handlers.h>
@@ -106,8 +104,6 @@
#include <ripple/rpc/impl/TransactionSign.cpp>
#include <ripple/rpc/impl/RPCVersion.cpp>
#include <ripple/rpc/tests/Coroutine.test.cpp>
#include <ripple/rpc/tests/JSONRPC.test.cpp>
#include <ripple/rpc/tests/KeyGeneration.test.cpp>
#include <ripple/rpc/tests/Status.test.cpp>
#include <ripple/rpc/tests/Yield.test.cpp>

View File

@@ -24,6 +24,7 @@
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/CountedObject.h>
#include <ripple/basics/Log.h>
#include <ripple/core/JobCoro.h>
#include <ripple/json/to_string.h>
#include <ripple/net/InfoSub.h>
#include <ripple/net/RPCErr.h>
@@ -31,13 +32,11 @@
#include <ripple/protocol/JsonFields.h>
#include <ripple/resource/Fees.h>
#include <ripple/resource/ResourceManager.h>
#include <ripple/rpc/Coroutine.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/server/Port.h>
#include <ripple/server/Role.h>
#include <ripple/json/to_string.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/rpc/Yield.h>
#include <ripple/server/Role.h>
#include <ripple/websocket/WebSocket.h>
@@ -97,7 +96,8 @@ public:
message_ptr getMessage ();
bool checkMessage ();
void returnMessage (message_ptr const&);
Json::Value invokeCommand (Json::Value const& jvRequest, RPC::Suspend const&);
Json::Value invokeCommand (Json::Value const& jvRequest,
std::shared_ptr<JobCoro> jobCoro);
// Generically implemented per version.
void setPingTimer ();
@@ -240,7 +240,7 @@ void ConnectionImpl <WebSocket>::returnMessage (message_ptr const& ptr)
template <class WebSocket>
Json::Value ConnectionImpl <WebSocket>::invokeCommand (
Json::Value const& jvRequest, RPC::Suspend const& suspend)
Json::Value const& jvRequest, std::shared_ptr<JobCoro> jobCoro)
{
if (getConsumer().disconnect ())
{
@@ -281,10 +281,9 @@ Json::Value ConnectionImpl <WebSocket>::invokeCommand (
}
else
{
RPC::Context context {
app_.journal ("RPCHandler"), jvRequest, app_, loadType, m_netOPs, app_.getLedgerMaster(),
role, {app_, suspend, "WSClient::command"},
this->shared_from_this ()};
RPC::Context context {app_.journal ("RPCHandler"), jvRequest,
app_, loadType, m_netOPs, app_.getLedgerMaster(), role,
jobCoro, this->shared_from_this ()};
RPC::doCommand (context, jvResult[jss::result]);
}

View File

@@ -378,8 +378,8 @@ public:
message_job("more", cpClient);
}
bool do_message (Job& job, const connection_ptr& cpClient,
const wsc_ptr& conn, const message_ptr& mpMessage)
bool do_message (Job& job, const connection_ptr cpClient,
wsc_ptr conn, const message_ptr& mpMessage)
{
Json::Value jvRequest;
Json::Reader jrReader;
@@ -425,37 +425,23 @@ public:
job.rename ("WSClient::" + jCmd.asString());
}
auto const start = std::chrono::high_resolution_clock::now ();
struct HandlerCoroutineData
app_.getJobQueue().postCoro(jtCLIENT, "WSClient",
[this, conn, cpClient, jvRequest = std::move(jvRequest)]
(std::shared_ptr<JobCoro> jc)
{
Json::Value jvRequest;
std::string buffer;
wsc_ptr conn;
};
auto data = std::make_shared<HandlerCoroutineData>();
data->jvRequest = std::move(jvRequest);
data->conn = conn;
auto j = app_.journal ("RPCHandler");
auto coroutine = [data, j] (RPC::Suspend const& suspend) {
data->buffer = to_string(
data->conn->invokeCommand(
data->jvRequest, suspend));
};
static auto const disableWebsocketsCoroutines = true;
auto useCoroutines = disableWebsocketsCoroutines ?
RPC::UseCoroutines::no : RPC::useCoroutines(desc_.config);
runOnCoroutine(useCoroutines, coroutine);
rpc_time_.notify (static_cast <beast::insight::Event::value_type> (
std::chrono::duration_cast <std::chrono::milliseconds> (
std::chrono::high_resolution_clock::now () - start)));
using namespace std::chrono;
auto const start = high_resolution_clock::now();
auto buffer = to_string(conn->invokeCommand(jvRequest, jc));
rpc_time_.notify (
static_cast <beast::insight::Event::value_type> (
duration_cast <milliseconds> (
high_resolution_clock::now () - start)));
++rpc_requests_;
rpc_size_.notify (static_cast <beast::insight::Event::value_type>
(data->buffer.size()));
send (cpClient, data->buffer, false);
rpc_size_.notify (
static_cast <beast::insight::Event::value_type>
(buffer.size()));
send (cpClient, buffer, false);
});
}
return true;