diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj
index 02000b7fe..4c0c2c9aa 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj
+++ b/Builds/VisualStudio2015/RippleD.vcxproj
@@ -2090,6 +2090,10 @@
+
+
+
+
@@ -2112,6 +2116,12 @@
..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)
..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)
+
+ True
+ True
+ ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)
+ ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories)
+
True
True
@@ -3153,8 +3163,6 @@
-
-
True
True
@@ -3419,10 +3427,6 @@
-
- True
- True
-
True
True
@@ -3485,10 +3489,6 @@
-
- True
- True
-
@@ -3499,10 +3499,6 @@
-
- True
- True
-
True
True
@@ -3517,12 +3513,6 @@
-
- True
- True
-
-
-
diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters
index cc6d5a1f7..3680883f6 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters
@@ -2799,6 +2799,12 @@
ripple\core
+
+ ripple\core
+
+
+ ripple\core
+
ripple\core
@@ -2826,6 +2832,9 @@
ripple\core\tests
+
+ ripple\core\tests
+
ripple\core\tests
@@ -3843,9 +3852,6 @@
ripple\rpc
-
- ripple\rpc
-
ripple\rpc\handlers
@@ -4053,9 +4059,6 @@
ripple\rpc\impl
-
- ripple\rpc\impl
-
ripple\rpc\impl
@@ -4116,9 +4119,6 @@
ripple\rpc\impl
-
- ripple\rpc\impl
-
ripple\rpc
@@ -4134,9 +4134,6 @@
ripple\rpc
-
- ripple\rpc\tests
-
ripple\rpc\tests
@@ -4149,12 +4146,6 @@
ripple\rpc\tests
-
- ripple\rpc\tests
-
-
- ripple\rpc
-
ripple\server
diff --git a/src/ripple/app/ledger/LedgerToJson.h b/src/ripple/app/ledger/LedgerToJson.h
index 4ef4314ab..2a0a86efa 100644
--- a/src/ripple/app/ledger/LedgerToJson.h
+++ b/src/ripple/app/ledger/LedgerToJson.h
@@ -25,7 +25,6 @@
#include
#include
#include
-#include
#include
#include
@@ -33,14 +32,9 @@ namespace ripple {
struct LedgerFill
{
- LedgerFill (ReadView const& l,
- int o = 0,
- RPC::Callback const& y = {},
- RPC::YieldStrategy const& ys = {})
- : ledger (l),
- options (o),
- yield (y),
- yieldStrategy (ys)
+ LedgerFill (ReadView const& l, int o = 0)
+ : ledger (l)
+ , options (o)
{
}
@@ -49,8 +43,6 @@ struct LedgerFill
ReadView const& ledger;
int options;
- RPC::Callback yield;
- RPC::YieldStrategy yieldStrategy;
};
/** Given a Ledger and options, fill a Json::Object or Json::Value with a
diff --git a/src/ripple/app/ledger/impl/LedgerToJson.cpp b/src/ripple/app/ledger/impl/LedgerToJson.cpp
index c55ec83ba..5113ba023 100644
--- a/src/ripple/app/ledger/impl/LedgerToJson.cpp
+++ b/src/ripple/app/ledger/impl/LedgerToJson.cpp
@@ -88,15 +88,10 @@ void fillJsonTx (Object& json, LedgerFill const& fill)
auto bBinary = isBinary(fill);
auto bExpanded = isExpanded(fill);
- RPC::CountedYield count (
- fill.yieldStrategy.transactionYieldCount, fill.yield);
-
try
{
for (auto& i: fill.ledger.txs)
{
- count.yield();
-
if (! bExpanded)
{
txns.append(to_string(i.first->getTransactionID()));
@@ -128,15 +123,11 @@ void fillJsonState(Object& json, LedgerFill const& fill)
{
auto& ledger = fill.ledger;
auto&& array = Json::setArray (json, jss::accountState);
- RPC::CountedYield count (
- fill.yieldStrategy.accountYieldCount, fill.yield);
-
auto expanded = isExpanded(fill);
auto binary = isBinary(fill);
for(auto const& sle : ledger.sles)
{
- count.yield();
if (binary)
{
auto&& obj = appendObject(array);
diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp
index a37a2ec01..9e43e42ff 100644
--- a/src/ripple/app/main/Application.cpp
+++ b/src/ripple/app/main/Application.cpp
@@ -406,8 +406,9 @@ public:
// The JobQueue has to come pretty early since
// almost everything is a Stoppable child of the JobQueue.
//
- , m_jobQueue (make_JobQueue (m_collectorManager->group ("jobq"),
- m_nodeStoreScheduler, logs_->journal("JobQueue"), *logs_))
+ , m_jobQueue (std::make_unique(
+ m_collectorManager->group ("jobq"), m_nodeStoreScheduler,
+ logs_->journal("JobQueue"), *logs_))
//
// Anything which calls addJob must be a descendant of the JobQueue
diff --git a/src/ripple/app/main/Main.cpp b/src/ripple/app/main/Main.cpp
index 8eb7a3c42..60075bfcf 100644
--- a/src/ripple/app/main/Main.cpp
+++ b/src/ripple/app/main/Main.cpp
@@ -95,9 +95,8 @@ void startServer (Application& app)
std::cerr << "Startup RPC: " << jvCommand << std::endl;
Resource::Charge loadType = Resource::feeReferenceRPC;
- RPC::Context context {
- app.journal ("RPCHandler"), jvCommand, app, loadType, app.getOPs (),
- app.getLedgerMaster(), Role::ADMIN, app};
+ RPC::Context context {app.journal ("RPCHandler"), jvCommand, app,
+ loadType, app.getOPs (), app.getLedgerMaster(), Role::ADMIN};
Json::Value jvResult;
RPC::doCommand (context, jvResult);
diff --git a/src/ripple/app/paths/PathRequest.cpp b/src/ripple/app/paths/PathRequest.cpp
index 9c7389aab..3766c0084 100644
--- a/src/ripple/app/paths/PathRequest.cpp
+++ b/src/ripple/app/paths/PathRequest.cpp
@@ -232,6 +232,15 @@ bool PathRequest::isValid (RippleLineCache::ref crCache)
return true;
}
+/* If this is a normal path request, we want to run it once "fast" now
+ to give preliminary results.
+
+ If this is a legacy path request, we are only going to run it once,
+ and we can't run it in full now, so we don't want to run it at all.
+
+ If there's an error, we need to be sure to return it to the caller
+ in all cases.
+*/
Json::Value PathRequest::doCreate (
RippleLineCache::ref& cache,
Json::Value const& value,
@@ -242,8 +251,8 @@ Json::Value PathRequest::doCreate (
if (parseJson (value) != PFR_PJ_INVALID)
{
valid = isValid (cache);
- if (! hasCompletion())
- status = valid ? doUpdate(cache, true) : jvStatus;
+ status = valid && ! hasCompletion()
+ ? doUpdate(cache, true) : jvStatus;
}
else
{
diff --git a/src/ripple/core/JobCoro.h b/src/ripple/core/JobCoro.h
new file mode 100644
index 000000000..47e1cda12
--- /dev/null
+++ b/src/ripple/core/JobCoro.h
@@ -0,0 +1,79 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef RIPPLE_CORE_JOBCORO_H_INCLUDED
+#define RIPPLE_CORE_JOBCORO_H_INCLUDED
+
+#include
+#include
+#include
+#include
+#include
+
+namespace ripple {
+
+class JobQueue;
+
+namespace detail {
+
+struct JobCoro_create_t { };
+
+} // detail
+
+class JobCoro : public std::enable_shared_from_this
+{
+private:
+ JobQueue& jq_;
+ JobType type_;
+ std::string name_;
+ std::mutex mutex_;
+ boost::coroutines::asymmetric_coroutine::pull_type coro_;
+ boost::coroutines::asymmetric_coroutine::push_type* yield_;
+
+public:
+ // Private: Used in the implementation
+ template
+ JobCoro (detail::JobCoro_create_t, JobQueue&, JobType,
+ std::string const&, F&&);
+
+ /** Suspend coroutine execution.
+ Effects:
+ The coroutine's stack is saved.
+ The associated Job thread is released.
+ Note:
+ The associated Job function returns.
+ Undefined behavior if called consecutively without a corresponding post.
+ */
+ void yield () const;
+
+ /** Schedule coroutine execution
+ Effects:
+ Returns immediately.
+ A new job is scheduled to resume the execution of the coroutine.
+ When the job runs, the coroutine's stack is restored and execution
+ continues at the beginning of coroutine function or the statement
+ after the previous call to yield.
+ Undefined behavior if called consecutively without a corresponding yield.
+ */
+ void post ();
+};
+
+} // ripple
+
+#endif
diff --git a/src/ripple/rpc/Coroutine.h b/src/ripple/core/JobCoro.ipp
similarity index 56%
rename from src/ripple/rpc/Coroutine.h
rename to src/ripple/core/JobCoro.ipp
index 1a4d47f17..bc79286c6 100644
--- a/src/ripple/rpc/Coroutine.h
+++ b/src/ripple/core/JobCoro.ipp
@@ -17,25 +17,48 @@
*/
//==============================================================================
-#ifndef RIPPLE_RPC_COROUTINE_H_INCLUDED
-#define RIPPLE_RPC_COROUTINE_H_INCLUDED
-
-#include
+#ifndef RIPPLE_CORE_JOBCOROINL_H_INCLUDED
+#define RIPPLE_CORE_JOBCOROINL_H_INCLUDED
namespace ripple {
-namespace RPC {
-/** Coroutine is a function that is given to the coroutine scheduler which
- later gets called with a Suspend. A Coroutine can't be empty. */
-using Coroutine = std::function ;
+template
+JobCoro::JobCoro (detail::JobCoro_create_t, JobQueue& jq, JobType type,
+ std::string const& name, F&& f)
+ : jq_(jq)
+ , type_(type)
+ , name_(name)
+ , coro_(
+ [this, fn = std::forward(f)]
+ (boost::coroutines::asymmetric_coroutine::push_type& do_yield)
+ {
+ yield_ = &do_yield;
+ (*yield_)();
+ fn(shared_from_this());
+ }, boost::coroutines::attributes (1024 * 1024))
+{
+}
-/** Run as a coroutine. */
-void runOnCoroutine(Coroutine const&);
+inline
+void
+JobCoro::yield () const
+{
+ (*yield_)();
+}
-/** Run as coroutine if UseCoroutines::yes, otherwise run immediately. */
-void runOnCoroutine(UseCoroutines, Coroutine const&);
+inline
+void
+JobCoro::post ()
+{
+ // sp keeps 'this' alive
+ jq_.addJob(type_, name_,
+ [this, sp = shared_from_this()](Job&)
+ {
+ std::lock_guard lock (mutex_);
+ coro_();
+ });
+}
-} // RPC
} // ripple
#endif
diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h
index b7e55a6f7..6a90c0761 100644
--- a/src/ripple/core/JobQueue.h
+++ b/src/ripple/core/JobQueue.h
@@ -21,70 +21,277 @@
#define RIPPLE_CORE_JOBQUEUE_H_INCLUDED
#include
+#include
+#include
#include
#include
#include
+#include
#include
-#include
#include
+#include
namespace ripple {
class Logs;
-class JobQueue : public beast::Stoppable
+class JobQueue
+ : public beast::Stoppable
+ , private beast::Workers::Callback
{
-protected:
- JobQueue (char const* name, Stoppable& parent);
-
public:
- using JobFunction = std::function ;
- virtual ~JobQueue () { }
+ using JobFunction = std::function ;
- virtual void addJob (
- JobType, std::string const& name, JobFunction const&) = 0;
+ JobQueue (beast::insight::Collector::ptr const& collector,
+ Stoppable& parent, beast::Journal journal, Logs& logs);
+ ~JobQueue ();
- // Jobs waiting at this priority
- virtual int getJobCount (JobType t) const = 0;
+ void addJob (JobType type, std::string const& name, JobFunction const& func);
- // Jobs waiting plus running at this priority
- virtual int getJobCountTotal (JobType t) const = 0;
+ /** Creates a coroutine and adds a job to the queue which will run it.
- // All waiting jobs at or greater than this priority
- virtual int getJobCountGE (JobType t) const = 0;
+ @param t The type of job.
+ @param name Name of the job.
+ @param f Has a signature of void(std::shared_ptr). Called when the job executes.
+ */
+ template
+ void postCoro (JobType t, std::string const& name, F&& f);
- virtual void shutdown () = 0;
+ /** Jobs waiting at this priority.
+ */
+ int getJobCount (JobType t) const;
- virtual void setThreadCount (int c, bool const standaloneMode) = 0;
+ /** Jobs waiting plus running at this priority.
+ */
+ int getJobCountTotal (JobType t) const;
+
+ /** All waiting jobs at or greater than this priority.
+ */
+ int getJobCountGE (JobType t) const;
+
+ /** Shut down the job queue without completing pending jobs.
+ */
+ void shutdown ();
+
+ /** Set the number of thread serving the job queue to precisely this number.
+ */
+ void setThreadCount (int c, bool const standaloneMode);
// VFALCO TODO Rename these to newLoadEventMeasurement or something similar
// since they create the object.
- //
- virtual LoadEvent::pointer getLoadEvent (
- JobType t, std::string const& name) = 0;
+ LoadEvent::pointer getLoadEvent (JobType t, std::string const& name);
// VFALCO TODO Why do we need two versions, one which returns a shared
// pointer and the other which returns an autoptr?
- //
- virtual LoadEvent::autoptr getLoadEventAP (
- JobType t, std::string const& name) = 0;
+ LoadEvent::autoptr getLoadEventAP (JobType t, std::string const& name);
- // Add multiple load events
- virtual void addLoadEvents (
- JobType t, int count, std::chrono::milliseconds elapsed) = 0;
+ /** Add multiple load events.
+ */
+ void addLoadEvents (JobType t, int count, std::chrono::milliseconds elapsed);
- virtual bool isOverloaded () = 0;
+ // Cannot be const because LoadMonitor has no const methods.
+ bool isOverloaded ();
/** Get the Job corresponding to a thread. If no thread, use the current
thread. */
- virtual Job* getJobForThread (std::thread::id const& id = {}) const = 0;
+ Job* getJobForThread(std::thread::id const& id = {}) const;
- virtual Json::Value getJson (int c = 0) = 0;
+ // Cannot be const because LoadMonitor has no const methods.
+ Json::Value getJson (int c = 0);
+
+private:
+ using JobDataMap = std::map ;
+
+ beast::Journal m_journal;
+ mutable std::mutex m_mutex;
+ std::uint64_t m_lastJob;
+ std::set m_jobSet;
+ JobDataMap m_jobData;
+ JobTypeData m_invalidJobData;
+
+ std::map m_threadIds;
+
+ // The number of jobs currently in processTask()
+ int m_processCount;
+
+ beast::Workers m_workers;
+ Job::CancelCallback m_cancelCallback;
+
+ // Statistics tracking
+ beast::insight::Collector::ptr m_collector;
+ beast::insight::Gauge job_count;
+ beast::insight::Hook hook;
+
+ static JobTypes const& getJobTypes()
+ {
+ static JobTypes types;
+ return types;
+ }
+
+ void collect();
+ JobTypeData& getJobTypeData (JobType type);
+
+ // Signals the service stopped if the stopped condition is met.
+ void checkStopped (std::lock_guard const& lock);
+
+ // Signals an added Job for processing.
+ //
+ // Pre-conditions:
+ // The JobType must be valid.
+ // The Job must exist in mJobSet.
+ // The Job must not have previously been queued.
+ //
+ // Post-conditions:
+ // Count of waiting jobs of that type will be incremented.
+ // If JobQueue exists, and has at least one thread, Job will eventually run.
+ //
+ // Invariants:
+ // The calling thread owns the JobLock
+ void queueJob (Job const& job, std::lock_guard const& lock);
+
+ // Returns the next Job we should run now.
+ //
+ // RunnableJob:
+ // A Job in the JobSet whose slots count for its type is greater than zero.
+ //
+ // Pre-conditions:
+ // mJobSet must not be empty.
+ // mJobSet holds at least one RunnableJob
+ //
+ // Post-conditions:
+ // job is a valid Job object.
+ // job is removed from mJobQueue.
+ // Waiting job count of its type is decremented
+ // Running job count of its type is incremented
+ //
+ // Invariants:
+ // The calling thread owns the JobLock
+ void getNextJob (Job& job);
+
+ // Indicates that a running Job has completed its task.
+ //
+ // Pre-conditions:
+ // Job must not exist in mJobSet.
+ // The JobType must not be invalid.
+ //
+ // Post-conditions:
+ // The running count of that JobType is decremented
+ // A new task is signaled if there are more waiting Jobs than the limit, if any.
+ //
+ // Invariants:
+ //
+ void finishJob (Job const& job);
+
+ template
+ void on_dequeue (JobType type,
+ std::chrono::duration const& value);
+
+ template
+ void on_execute (JobType type,
+ std::chrono::duration const& value);
+
+ // Runs the next appropriate waiting Job.
+ //
+ // Pre-conditions:
+ // A RunnableJob must exist in the JobSet
+ //
+ // Post-conditions:
+ // The chosen RunnableJob will have Job::doJob() called.
+ //
+ // Invariants:
+ //
+ void processTask () override;
+
+ // Returns `true` if all jobs of this type should be skipped when
+ // the JobQueue receives a stop notification. If the job type isn't
+ // skipped, the Job will be called and the job must call Job::shouldCancel
+ // to determine if a long running or non-mandatory operation should be canceled.
+ bool skipOnStop (JobType type);
+
+ // Returns the limit of running jobs for the given job type.
+ // For jobs with no limit, we return the largest int. Hopefully that
+ // will be enough.
+ int getJobLimit (JobType type);
+
+ void onStop () override;
+ void onChildrenStopped () override;
};
-std::unique_ptr
-make_JobQueue (beast::insight::Collector::ptr const& collector,
- beast::Stoppable& parent, beast::Journal journal, Logs& logs);
+/*
+ An RPC command is received and is handled via ServerHandler(HTTP) or
+ Handler(websocket), depending on the connection type. The handler then calls
+ the JobQueue::postCoro() method to create a coroutine and run it at a later
+ point. This frees up the handler thread and allows it to continue handling
+ other requests while the RPC command completes its work asynchronously.
+
+ postCoro() creates a JobCoro object. When the JobCoro ctor is called, and its
+ coro_ member is initialized(a boost::coroutines::pull_type), execution
+ automatically passes to the coroutine, which we don't want at this point,
+ since we are still in the handler thread context. It's important to note here
+ that construction of a boost pull_type automatically passes execution to the
+ coroutine. A pull_type object automatically generates a push_type that is
+ used as the as a parameter(do_yield) in the signature of the function the
+ pull_type was created with. This function is immediately called during coro_
+ construction and within it, JobCoro::yield_ is assigned the push_type
+ parameter(do_yield) address and called(yield()) so we can return execution
+ back to the caller's stack.
+
+ postCoro() then calls JobCoro::post(), which schedules a job on the job
+ queue to continue execution of the coroutine in a JobQueue worker thread at
+ some later time. When the job runs, we lock on the JobCoro::mutex_ and call
+ coro_ which continues where we had left off. Since we the last thing we did
+ in coro_ was call yield(), the next thing we continue with is calling the
+ function param f, that was passed into JobCoro ctor. It is within this
+ function body that the caller specifies what he would like to do while
+ running in the coroutine and allow them to suspend and resume execution.
+ A task that relies on other events to complete, such as path finding, calls
+ JobCoro::yield() to suspend its execution while waiting on those events to
+ complete and continue when signaled via the JobCoro::post() method.
+
+ There is a potential race condition that exists here where post() can get
+ called before yield() after f is called. Technically the problem only occurs
+ if the job that post() scheduled is executed before yield() is called.
+ If the post() job were to be executed before yield(), undefined behavior
+ would occur. The lock ensures that coro_ is not called again until we exit
+ the coroutine. At which point a scheduled resume() job waiting on the lock
+ would gain entry, harmlessly call coro_ and immediately return as we have
+ already completed the coroutine.
+
+ The race condition occurs as follows:
+
+ 1- The coroutine is running.
+ 2- The coroutine is about to suspend, but before it can do so, it must
+ arrange for some event to wake it up.
+ 3- The coroutine arranges for some event to wake it up.
+ 4- Before the coroutine can suspend, that event occurs and the resumption
+ of the coroutine is scheduled on the job queue.
+ 5- Again, before the coroutine can suspend, the resumption of the coroutine
+ is dispatched.
+ 6- Again, before the coroutine can suspend, the resumption code runs the
+ coroutine.
+ The coroutine is now running in two threads.
+
+ The lock prevents this from happening as step 6 will block until the
+ lock is released which only happens after the coroutine completes.
+*/
+
+} // ripple
+
+#include
+
+namespace ripple {
+
+template
+void JobQueue::postCoro (JobType t, std::string const& name, F&& f)
+{
+ /* First param is a detail type to make construction private.
+ Last param is the function the coroutine runs. Signature of
+ void(std::shared_ptr).
+ */
+ auto const coro = std::make_shared(
+ detail::JobCoro_create_t{}, *this, t, name, std::forward(f));
+ coro->post();
+}
}
diff --git a/src/ripple/core/JobTypeData.h b/src/ripple/core/JobTypeData.h
index 1aa78b599..33c5cff35 100644
--- a/src/ripple/core/JobTypeData.h
+++ b/src/ripple/core/JobTypeData.h
@@ -22,6 +22,7 @@
#include
#include
+#include
namespace ripple
{
diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp
index f2ab9d185..c48df62be 100644
--- a/src/ripple/core/impl/JobQueue.cpp
+++ b/src/ripple/core/impl/JobQueue.cpp
@@ -32,672 +32,546 @@
namespace ripple {
-class JobQueueImp
- : public JobQueue
- , private beast::Workers::Callback
+JobQueue::JobQueue (beast::insight::Collector::ptr const& collector,
+ Stoppable& parent, beast::Journal journal, Logs& logs)
+ : Stoppable ("JobQueue", parent)
+ , m_journal (journal)
+ , m_lastJob (0)
+ , m_invalidJobData (getJobTypes ().getInvalid (), collector, logs)
+ , m_processCount (0)
+ , m_workers (*this, "JobQueue", 0)
+ , m_cancelCallback (std::bind (&Stoppable::isStopping, this))
+ , m_collector (collector)
{
-public:
- using JobSet = std::set ;
- using JobDataMap = std::map ;
- using ScopedLock = std::lock_guard ;
- using ThreadIdMap = std::map ;
+ hook = m_collector->make_hook (std::bind (&JobQueue::collect, this));
+ job_count = m_collector->make_gauge ("job_count");
- beast::Journal m_journal;
- mutable std::mutex m_mutex;
- std::uint64_t m_lastJob;
- JobSet m_jobSet;
- JobDataMap m_jobData;
- JobTypeData m_invalidJobData;
-
- ThreadIdMap m_threadIds;
-
- // The number of jobs currently in processTask()
- int m_processCount;
-
- beast::Workers m_workers;
- Job::CancelCallback m_cancelCallback;
-
- // statistics tracking
- beast::insight::Collector::ptr m_collector;
- beast::insight::Gauge job_count;
- beast::insight::Hook hook;
-
- //--------------------------------------------------------------------------
- static JobTypes const& getJobTypes ()
{
- static JobTypes types;
-
- return types;
- }
-
- //--------------------------------------------------------------------------
- JobQueueImp (beast::insight::Collector::ptr const& collector,
- Stoppable& parent, beast::Journal journal, Logs& logs)
- : JobQueue ("JobQueue", parent)
- , m_journal (journal)
- , m_lastJob (0)
- , m_invalidJobData (getJobTypes ().getInvalid (), collector, logs)
- , m_processCount (0)
- , m_workers (*this, "JobQueue", 0)
- , m_cancelCallback (std::bind (&Stoppable::isStopping, this))
- , m_collector (collector)
- {
- hook = m_collector->make_hook (std::bind (
- &JobQueueImp::collect, this));
- job_count = m_collector->make_gauge ("job_count");
+ std::lock_guard lock (m_mutex);
+ for (auto const& x : getJobTypes ())
{
- ScopedLock lock (m_mutex);
+ JobTypeInfo const& jt = x.second;
- for (auto const& x : getJobTypes ())
- {
- JobTypeInfo const& jt = x.second;
-
- // And create dynamic information for all jobs
- auto const result (m_jobData.emplace (std::piecewise_construct,
- std::forward_as_tuple (jt.type ()),
- std::forward_as_tuple (jt, m_collector, logs)));
- assert (result.second == true);
- (void) result.second;
- }
+ // And create dynamic information for all jobs
+ auto const result (m_jobData.emplace (std::piecewise_construct,
+ std::forward_as_tuple (jt.type ()),
+ std::forward_as_tuple (jt, m_collector, logs)));
+ assert (result.second == true);
+ (void) result.second;
}
}
+}
+
+JobQueue::~JobQueue ()
+{
+ // Must unhook before destroying
+ hook = beast::insight::Hook ();
+}
+
+void
+JobQueue::collect ()
+{
+ std::lock_guard lock (m_mutex);
+ job_count = m_jobSet.size ();
+}
+
+void
+JobQueue::addJob (JobType type, std::string const& name,
+ JobFunction const& func)
+{
+ assert (type != jtINVALID);
+
+ auto iter (m_jobData.find (type));
+ assert (iter != m_jobData.end ());
+ if (iter == m_jobData.end ())
+ return;
+
+ JobTypeData& data (iter->second);
+
+ // FIXME: Workaround incorrect client shutdown ordering
+ // do not add jobs to a queue with no threads
+ assert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
- ~JobQueueImp () override
{
- // Must unhook before destroying
- hook = beast::insight::Hook ();
- }
-
- void collect ()
- {
- ScopedLock lock (m_mutex);
- job_count = m_jobSet.size ();
- }
-
- void addJob (
- JobType type, std::string const& name, JobFunction const& func) override
- {
- assert (type != jtINVALID);
-
- JobDataMap::iterator iter (m_jobData.find (type));
- assert (iter != m_jobData.end ());
-
- if (iter == m_jobData.end ())
- return;
-
- JobTypeData& data (iter->second);
-
- // FIXME: Workaround incorrect client shutdown ordering
- // do not add jobs to a queue with no threads
- assert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
-
- {
- // If this goes off it means that a child didn't follow
- // the Stoppable API rules. A job may only be added if:
- //
- // - The JobQueue has NOT stopped
- // AND
- // * We are currently processing jobs
- // OR
- // * We have have pending jobs
- // OR
- // * Not all children are stopped
- //
- ScopedLock lock (m_mutex);
- assert (! isStopped() && (
- m_processCount>0 ||
- ! m_jobSet.empty () ||
- ! areChildrenStopped()));
- }
-
- // Don't even add it to the queue if we're stopping
- // and the job type is marked for skipOnStop.
+ // If this goes off it means that a child didn't follow
+ // the Stoppable API rules. A job may only be added if:
//
- if (isStopping() && skipOnStop (type))
- {
- m_journal.debug <<
- "Skipping addJob ('" << name << "')";
- return;
- }
-
- {
- ScopedLock lock (m_mutex);
-
- std::pair ::iterator, bool> result (
- m_jobSet.insert (Job (type, name, ++m_lastJob,
- data.load (), func, m_cancelCallback)));
- queueJob (*result.first, lock);
- }
- }
-
- int getJobCount (JobType t) const override
- {
- ScopedLock lock (m_mutex);
-
- JobDataMap::const_iterator c = m_jobData.find (t);
-
- return (c == m_jobData.end ())
- ? 0
- : c->second.waiting;
- }
-
- int getJobCountTotal (JobType t) const override
- {
- ScopedLock lock (m_mutex);
-
- JobDataMap::const_iterator c = m_jobData.find (t);
-
- return (c == m_jobData.end ())
- ? 0
- : (c->second.waiting + c->second.running);
- }
-
- int getJobCountGE (JobType t) const override
- {
- // return the number of jobs at this priority level or greater
- int ret = 0;
-
- ScopedLock lock (m_mutex);
-
- for (auto const& x : m_jobData)
- {
- if (x.first >= t)
- ret += x.second.waiting;
- }
-
- return ret;
- }
-
- // shut down the job queue without completing pending jobs
- //
- void shutdown () override
- {
- m_journal.info << "Job queue shutting down";
-
- m_workers.pauseAllThreadsAndWait ();
- }
-
- // set the number of thread serving the job queue to precisely this number
- void setThreadCount (int c, bool const standaloneMode) override
- {
- if (standaloneMode)
- {
- c = 1;
- }
- else if (c == 0)
- {
- c = static_cast(std::thread::hardware_concurrency());
- c = 2 + std::min (c, 4); // I/O will bottleneck
-
- m_journal.info << "Auto-tuning to " << c <<
- " validation/transaction/proposal threads";
- }
-
- m_workers.setNumberOfThreads (c);
- }
-
- LoadEvent::pointer getLoadEvent (
- JobType t, std::string const& name) override
- {
- JobDataMap::iterator iter (m_jobData.find (t));
- assert (iter != m_jobData.end ());
-
- if (iter == m_jobData.end ())
- return std::shared_ptr ();
-
- return std::make_shared (
- std::ref (iter-> second.load ()), name, true);
- }
-
- LoadEvent::autoptr getLoadEventAP (
- JobType t, std::string const& name) override
- {
- JobDataMap::iterator iter (m_jobData.find (t));
- assert (iter != m_jobData.end ());
-
- if (iter == m_jobData.end ())
- return {};
-
- return std::make_unique (iter-> second.load (), name, true);
- }
-
- void addLoadEvents (JobType t,
- int count, std::chrono::milliseconds elapsed) override
- {
- JobDataMap::iterator iter (m_jobData.find (t));
- assert (iter != m_jobData.end ());
- iter->second.load().addSamples (count, elapsed);
- }
-
- // Cannot be const because LoadMonitor has no const methods.
- bool isOverloaded () override
- {
- int count = 0;
-
- for (auto& x : m_jobData)
- {
- if (x.second.load ().isOver ())
- ++count;
- }
-
- return count > 0;
- }
-
- // Cannot be const because LoadMonitor has no const methods.
- Json::Value getJson (int) override
- {
- Json::Value ret (Json::objectValue);
-
- ret["threads"] = m_workers.getNumberOfThreads ();
-
- Json::Value priorities = Json::arrayValue;
-
- ScopedLock lock (m_mutex);
-
- for (auto& x : m_jobData)
- {
- assert (x.first != jtINVALID);
-
- if (x.first == jtGENERIC)
- continue;
-
- JobTypeData& data (x.second);
-
- LoadMonitor::Stats stats (data.stats ());
-
- int waiting (data.waiting);
- int running (data.running);
-
- if ((stats.count != 0) || (waiting != 0) ||
- (stats.latencyPeak != 0) || (running != 0))
- {
- Json::Value& pri = priorities.append (Json::objectValue);
-
- pri["job_type"] = data.name ();
-
- if (stats.isOverloaded)
- pri["over_target"] = true;
-
- if (waiting != 0)
- pri["waiting"] = waiting;
-
- if (stats.count != 0)
- pri["per_second"] = static_cast (stats.count);
-
- if (stats.latencyPeak != 0)
- pri["peak_time"] = static_cast (stats.latencyPeak);
-
- if (stats.latencyAvg != 0)
- pri["avg_time"] = static_cast (stats.latencyAvg);
-
- if (running != 0)
- pri["in_progress"] = running;
- }
- }
-
- ret["job_types"] = priorities;
-
- return ret;
- }
-
- Job* getJobForThread (std::thread::id const& id) const override
- {
- auto tid = (id == std::thread::id()) ? std::this_thread::get_id() : id;
-
- ScopedLock lock (m_mutex);
- auto i = m_threadIds.find (tid);
- return (i == m_threadIds.end()) ? nullptr : i->second;
- }
-
-private:
- //--------------------------------------------------------------------------
- JobTypeData& getJobTypeData (JobType type)
- {
- JobDataMap::iterator c (m_jobData.find (type));
- assert (c != m_jobData.end ());
-
- // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
- // and use something sane.
- if (c == m_jobData.end ())
- return m_invalidJobData;
-
- return c->second;
- }
-
- //--------------------------------------------------------------------------
-
- // Signals the service stopped if the stopped condition is met.
- //
- void checkStopped (ScopedLock const& lock)
- {
- // We are stopped when all of the following are true:
+ // - The JobQueue has NOT stopped
+ // AND
+ // * We are currently processing jobs
+ // OR
+ // * We have have pending jobs
+ // OR
+ // * Not all children are stopped
//
- // 1. A stop notification was received
- // 2. All Stoppable children have stopped
- // 3. There are no executing calls to processTask
- // 4. There are no remaining Jobs in the job set
+ std::lock_guard lock (m_mutex);
+ assert (! isStopped() && (
+ m_processCount>0 ||
+ ! m_jobSet.empty () ||
+ ! areChildrenStopped()));
+ }
+
+ // Don't even add it to the queue if we're stopping
+ // and the job type is marked for skipOnStop.
+ //
+ if (isStopping() && skipOnStop (type))
+ {
+ m_journal.debug <<
+ "Skipping addJob ('" << name << "')";
+ return;
+ }
+
+ {
+ std::lock_guard lock (m_mutex);
+
+ std::pair ::iterator, bool> result (
+ m_jobSet.insert (Job (type, name, ++m_lastJob,
+ data.load (), func, m_cancelCallback)));
+ queueJob (*result.first, lock);
+ }
+}
+
+int
+JobQueue::getJobCount (JobType t) const
+{
+ std::lock_guard lock (m_mutex);
+
+ JobDataMap::const_iterator c = m_jobData.find (t);
+
+ return (c == m_jobData.end ())
+ ? 0
+ : c->second.waiting;
+}
+
+int
+JobQueue::getJobCountTotal (JobType t) const
+{
+ std::lock_guard lock (m_mutex);
+
+ JobDataMap::const_iterator c = m_jobData.find (t);
+
+ return (c == m_jobData.end ())
+ ? 0
+ : (c->second.waiting + c->second.running);
+}
+
+int
+JobQueue::getJobCountGE (JobType t) const
+{
+ // return the number of jobs at this priority level or greater
+ int ret = 0;
+
+ std::lock_guard lock (m_mutex);
+
+ for (auto const& x : m_jobData)
+ {
+ if (x.first >= t)
+ ret += x.second.waiting;
+ }
+
+ return ret;
+}
+
+void
+JobQueue::shutdown ()
+{
+ m_journal.info << "Job queue shutting down";
+
+ m_workers.pauseAllThreadsAndWait ();
+}
+
+void
+JobQueue::setThreadCount (int c, bool const standaloneMode)
+{
+ if (standaloneMode)
+ {
+ c = 1;
+ }
+ else if (c == 0)
+ {
+ c = static_cast(std::thread::hardware_concurrency());
+ c = 2 + std::min (c, 4); // I/O will bottleneck
+
+ m_journal.info << "Auto-tuning to " << c <<
+ " validation/transaction/proposal threads";
+ }
+
+ m_workers.setNumberOfThreads (c);
+}
+
+LoadEvent::pointer
+JobQueue::getLoadEvent (JobType t, std::string const& name)
+{
+ JobDataMap::iterator iter (m_jobData.find (t));
+ assert (iter != m_jobData.end ());
+
+ if (iter == m_jobData.end ())
+ return std::shared_ptr ();
+
+ return std::make_shared (
+ std::ref (iter-> second.load ()), name, true);
+}
+
+LoadEvent::autoptr
+JobQueue::getLoadEventAP (JobType t, std::string const& name)
+{
+ JobDataMap::iterator iter (m_jobData.find (t));
+ assert (iter != m_jobData.end ());
+
+ if (iter == m_jobData.end ())
+ return {};
+
+ return std::make_unique (iter-> second.load (), name, true);
+}
+
+void
+JobQueue::addLoadEvents (JobType t, int count,
+ std::chrono::milliseconds elapsed)
+{
+ JobDataMap::iterator iter (m_jobData.find (t));
+ assert (iter != m_jobData.end ());
+ iter->second.load().addSamples (count, elapsed);
+}
+
+bool
+JobQueue::isOverloaded ()
+{
+ int count = 0;
+
+ for (auto& x : m_jobData)
+ {
+ if (x.second.load ().isOver ())
+ ++count;
+ }
+
+ return count > 0;
+}
+
+Json::Value
+JobQueue::getJson (int c)
+{
+ Json::Value ret (Json::objectValue);
+
+ ret["threads"] = m_workers.getNumberOfThreads ();
+
+ Json::Value priorities = Json::arrayValue;
+
+ std::lock_guard lock (m_mutex);
+
+ for (auto& x : m_jobData)
+ {
+ assert (x.first != jtINVALID);
+
+ if (x.first == jtGENERIC)
+ continue;
+
+ JobTypeData& data (x.second);
+
+ LoadMonitor::Stats stats (data.stats ());
+
+ int waiting (data.waiting);
+ int running (data.running);
+
+ if ((stats.count != 0) || (waiting != 0) ||
+ (stats.latencyPeak != 0) || (running != 0))
+ {
+ Json::Value& pri = priorities.append (Json::objectValue);
+
+ pri["job_type"] = data.name ();
+
+ if (stats.isOverloaded)
+ pri["over_target"] = true;
+
+ if (waiting != 0)
+ pri["waiting"] = waiting;
+
+ if (stats.count != 0)
+ pri["per_second"] = static_cast (stats.count);
+
+ if (stats.latencyPeak != 0)
+ pri["peak_time"] = static_cast (stats.latencyPeak);
+
+ if (stats.latencyAvg != 0)
+ pri["avg_time"] = static_cast (stats.latencyAvg);
+
+ if (running != 0)
+ pri["in_progress"] = running;
+ }
+ }
+
+ ret["job_types"] = priorities;
+
+ return ret;
+}
+
+Job*
+JobQueue::getJobForThread (std::thread::id const& id) const
+{
+ auto tid = (id == std::thread::id()) ? std::this_thread::get_id() : id;
+
+ std::lock_guard lock (m_mutex);
+ auto i = m_threadIds.find (tid);
+ return (i == m_threadIds.end()) ? nullptr : i->second;
+}
+
+JobTypeData&
+JobQueue::getJobTypeData (JobType type)
+{
+ JobDataMap::iterator c (m_jobData.find (type));
+ assert (c != m_jobData.end ());
+
+ // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
+ // and use something sane.
+ if (c == m_jobData.end ())
+ return m_invalidJobData;
+
+ return c->second;
+}
+
+void
+JobQueue::checkStopped (std::lock_guard const& lock)
+{
+ // We are stopped when all of the following are true:
+ //
+ // 1. A stop notification was received
+ // 2. All Stoppable children have stopped
+ // 3. There are no executing calls to processTask
+ // 4. There are no remaining Jobs in the job set
+ //
+ if (isStopping() &&
+ areChildrenStopped() &&
+ (m_processCount == 0) &&
+ m_jobSet.empty())
+ {
+ stopped();
+ }
+}
+
+void
+JobQueue::queueJob (Job const& job, std::lock_guard const& lock)
+{
+ JobType const type (job.getType ());
+ assert (type != jtINVALID);
+ assert (m_jobSet.find (job) != m_jobSet.end ());
+
+ JobTypeData& data (getJobTypeData (type));
+
+ if (data.waiting + data.running < getJobLimit (type))
+ {
+ m_workers.addTask ();
+ }
+ else
+ {
+ // defer the task until we go below the limit
//
- if (isStopping() &&
- areChildrenStopped() &&
- (m_processCount == 0) &&
- m_jobSet.empty())
+ ++data.deferred;
+ }
+ ++data.waiting;
+}
+
+void
+JobQueue::getNextJob (Job& job)
+{
+ assert (! m_jobSet.empty ());
+
+ std::set ::const_iterator iter;
+ for (iter = m_jobSet.begin (); iter != m_jobSet.end (); ++iter)
+ {
+ JobTypeData& data (getJobTypeData (iter->getType ()));
+
+ assert (data.running <= getJobLimit (data.type ()));
+
+ // Run this job if we're running below the limit.
+ if (data.running < getJobLimit (data.type ()))
{
- stopped();
+ assert (data.waiting > 0);
+ break;
}
}
- //--------------------------------------------------------------------------
- //
- // Signals an added Job for processing.
- //
- // Pre-conditions:
- // The JobType must be valid.
- // The Job must exist in mJobSet.
- // The Job must not have previously been queued.
- //
- // Post-conditions:
- // Count of waiting jobs of that type will be incremented.
- // If JobQueue exists, and has at least one thread, Job will eventually run.
- //
- // Invariants:
- // The calling thread owns the JobLock
- //
- void queueJob (Job const& job, ScopedLock const& lock)
+ assert (iter != m_jobSet.end ());
+
+ JobType const type = iter->getType ();
+ JobTypeData& data (getJobTypeData (type));
+
+ assert (type != jtINVALID);
+
+ job = *iter;
+ m_jobSet.erase (iter);
+
+ m_threadIds[std::this_thread::get_id()] = &job;
+
+ --data.waiting;
+ ++data.running;
+}
+
+void
+JobQueue::finishJob (Job const& job)
+{
+ JobType const type = job.getType ();
+
+ assert (m_jobSet.find (job) == m_jobSet.end ());
+ assert (type != jtINVALID);
+
+ JobTypeData& data (getJobTypeData (type));
+
+ // Queue a deferred task if possible
+ if (data.deferred > 0)
{
- JobType const type (job.getType ());
- assert (type != jtINVALID);
- assert (m_jobSet.find (job) != m_jobSet.end ());
+ assert (data.running + data.waiting >= getJobLimit (type));
- JobTypeData& data (getJobTypeData (type));
-
- if (data.waiting + data.running < getJobLimit (type))
- {
- m_workers.addTask ();
- }
- else
- {
- // defer the task until we go below the limit
- //
- ++data.deferred;
- }
- ++data.waiting;
+ --data.deferred;
+ m_workers.addTask ();
}
- //------------------------------------------------------------------------------
- //
- // Returns the next Job we should run now.
- //
- // RunnableJob:
- // A Job in the JobSet whose slots count for its type is greater than zero.
- //
- // Pre-conditions:
- // mJobSet must not be empty.
- // mJobSet holds at least one RunnableJob
- //
- // Post-conditions:
- // job is a valid Job object.
- // job is removed from mJobQueue.
- // Waiting job count of its type is decremented
- // Running job count of its type is incremented
- //
- // Invariants:
- // The calling thread owns the JobLock
- //
- void getNextJob (Job& job)
+ if (! m_threadIds.erase (std::this_thread::get_id()))
{
- assert (! m_jobSet.empty ());
+ assert (false);
+ }
+ --data.running;
+}
- JobSet::const_iterator iter;
- for (iter = m_jobSet.begin (); iter != m_jobSet.end (); ++iter)
- {
- JobTypeData& data (getJobTypeData (iter->getType ()));
+template
+void JobQueue::on_dequeue (JobType type,
+ std::chrono::duration const& value)
+{
+ auto const ms (ceil (value));
- assert (data.running <= getJobLimit (data.type ()));
+ if (ms.count() >= 10)
+ getJobTypeData (type).dequeue.notify (ms);
+}
- // Run this job if we're running below the limit.
- if (data.running < getJobLimit (data.type ()))
- {
- assert (data.waiting > 0);
- break;
- }
- }
+template
+void JobQueue::on_execute (JobType type,
+ std::chrono::duration const& value)
+{
+ auto const ms (ceil (value));
- assert (iter != m_jobSet.end ());
+ if (ms.count() >= 10)
+ getJobTypeData (type).execute.notify (ms);
+}
- JobType const type = iter->getType ();
- JobTypeData& data (getJobTypeData (type));
+void
+JobQueue::processTask ()
+{
+ Job job;
- assert (type != jtINVALID);
-
- job = *iter;
- m_jobSet.erase (iter);
-
- m_threadIds[std::this_thread::get_id()] = &job;
-
- --data.waiting;
- ++data.running;
+ {
+ std::lock_guard lock (m_mutex);
+ getNextJob (job);
+ ++m_processCount;
}
- //------------------------------------------------------------------------------
+ JobTypeData& data (getJobTypeData (job.getType ()));
+
+ // Skip the job if we are stopping and the
+ // skipOnStop flag is set for the job type
//
- // Indicates that a running Job has completed its task.
- //
- // Pre-conditions:
- // Job must not exist in mJobSet.
- // The JobType must not be invalid.
- //
- // Post-conditions:
- // The running count of that JobType is decremented
- // A new task is signaled if there are more waiting Jobs than the limit, if any.
- //
- // Invariants:
- //
- //
- void finishJob (Job const& job)
+ if (!isStopping() || !data.info.skip ())
{
- JobType const type = job.getType ();
+ beast::Thread::setCurrentThreadName (data.name ());
+ m_journal.trace << "Doing " << data.name () << " job";
- assert (m_jobSet.find (job) == m_jobSet.end ());
- assert (type != jtINVALID);
+ Job::clock_type::time_point const start_time (
+ Job::clock_type::now());
- JobTypeData& data (getJobTypeData (type));
-
- // Queue a deferred task if possible
- if (data.deferred > 0)
- {
- assert (data.running + data.waiting >= getJobLimit (type));
-
- --data.deferred;
- m_workers.addTask ();
- }
-
- if (! m_threadIds.erase (std::this_thread::get_id()))
- {
- assert (false);
- }
- --data.running;
+ on_dequeue (job.getType (), start_time - job.queue_time ());
+ job.doJob ();
+ on_execute (job.getType (), Job::clock_type::now() - start_time);
+ }
+ else
+ {
+ m_journal.trace << "Skipping processTask ('" << data.name () << "')";
}
- //--------------------------------------------------------------------------
- template
- void on_dequeue (JobType type,
- std::chrono::duration const& value)
{
- auto const ms (ceil (value));
-
- if (ms.count() >= 10)
- getJobTypeData (type).dequeue.notify (ms);
- }
-
- template
- void on_execute (JobType type,
- std::chrono::duration const& value)
- {
- auto const ms (ceil (value));
-
- if (ms.count() >= 10)
- getJobTypeData (type).execute.notify (ms);
- }
-
- //--------------------------------------------------------------------------
- //
- // Runs the next appropriate waiting Job.
- //
- // Pre-conditions:
- // A RunnableJob must exist in the JobSet
- //
- // Post-conditions:
- // The chosen RunnableJob will have Job::doJob() called.
- //
- // Invariants:
- //
- //
- void processTask () override
- {
- Job job;
-
- {
- ScopedLock lock (m_mutex);
- getNextJob (job);
- ++m_processCount;
- }
-
- JobTypeData& data (getJobTypeData (job.getType ()));
-
- // Skip the job if we are stopping and the
- // skipOnStop flag is set for the job type
- //
- if (!isStopping() || !data.info.skip ())
- {
- beast::Thread::setCurrentThreadName (data.name ());
- m_journal.trace << "Doing " << data.name () << " job";
-
- Job::clock_type::time_point const start_time (
- Job::clock_type::now());
-
- on_dequeue (job.getType (), start_time - job.queue_time ());
- job.doJob ();
- on_execute (job.getType (), Job::clock_type::now() - start_time);
- }
- else
- {
- m_journal.trace << "Skipping processTask ('" << data.name () << "')";
- }
-
- {
- ScopedLock lock (m_mutex);
- finishJob (job);
- --m_processCount;
- checkStopped (lock);
- }
-
- // Note that when Job::~Job is called, the last reference
- // to the associated LoadEvent object (in the Job) may be destroyed.
- }
-
- //------------------------------------------------------------------------------
-
- // Returns `true` if all jobs of this type should be skipped when
- // the JobQueue receives a stop notification. If the job type isn't
- // skipped, the Job will be called and the job must call Job::shouldCancel
- // to determine if a long running or non-mandatory operation should be canceled.
- bool skipOnStop (JobType type)
- {
- JobTypeInfo const& j (getJobTypes ().get (type));
- assert (j.type () != jtINVALID);
-
- return j.skip ();
- }
-
- // Returns the limit of running jobs for the given job type.
- // For jobs with no limit, we return the largest int. Hopefully that
- // will be enough.
- //
- int getJobLimit (JobType type)
- {
- JobTypeInfo const& j (getJobTypes ().get (type));
- assert (j.type () != jtINVALID);
-
- return j.limit ();
- }
-
- //--------------------------------------------------------------------------
-
- void onStop () override
- {
- // VFALCO NOTE I wanted to remove all the jobs that are skippable
- // but then the Workers count of tasks to process
- // goes wrong.
-
- /*
- {
- ScopedLock lock (m_mutex);
-
- // Remove all jobs whose type is skipOnStop
- using JobDataMap = hash_map ;
- JobDataMap counts;
- bool const report (m_journal.debug.active());
-
- for (JobSet::const_iterator iter (m_jobSet.begin());
- iter != m_jobSet.end();)
- {
- if (skipOnStop (iter->getType()))
- {
- if (report)
- {
- std::pair result (
- counts.insert (std::make_pair (iter->getType(), 1)));
- if (! result.second)
- ++(result.first->second);
- }
-
- iter = m_jobSet.erase (iter);
- }
- else
- {
- ++iter;
- }
- }
-
- if (report)
- {
- beast::Journal::ScopedStream s (m_journal.debug);
-
- for (JobDataMap::const_iterator iter (counts.begin());
- iter != counts.end(); ++iter)
- {
- s << std::endl <<
- "Removed " << iter->second <<
- " skiponStop jobs of type " << Job::toString (iter->first);
- }
- }
- }
- */
- }
-
- void onChildrenStopped () override
- {
- ScopedLock lock (m_mutex);
-
+ std::lock_guard lock (m_mutex);
+ finishJob (job);
+ --m_processCount;
checkStopped (lock);
}
-};
-//------------------------------------------------------------------------------
+ // Note that when Job::~Job is called, the last reference
+ // to the associated LoadEvent object (in the Job) may be destroyed.
+}
-JobQueue::JobQueue (char const* name, Stoppable& parent)
- : Stoppable (name, parent)
+bool
+JobQueue::skipOnStop (JobType type)
{
+ JobTypeInfo const& j (getJobTypes ().get (type));
+ assert (j.type () != jtINVALID);
+
+ return j.skip ();
}
-//------------------------------------------------------------------------------
-
-std::unique_ptr make_JobQueue (
- beast::insight::Collector::ptr const& collector,
- beast::Stoppable& parent, beast::Journal journal, Logs& logs)
+int
+JobQueue::getJobLimit (JobType type)
{
- return std::make_unique (collector, parent, journal, logs);
+ JobTypeInfo const& j (getJobTypes ().get (type));
+ assert (j.type () != jtINVALID);
+
+ return j.limit ();
+}
+
+void
+JobQueue::onStop ()
+{
+ // VFALCO NOTE I wanted to remove all the jobs that are skippable
+ // but then the Workers count of tasks to process
+ // goes wrong.
+
+ /*
+ {
+ std::lock_guard lock (m_mutex);
+
+ // Remove all jobs whose type is skipOnStop
+ using JobDataMap = hash_map ;
+ JobDataMap counts;
+ bool const report (m_journal.debug.active());
+
+ for (std::set ::const_iterator iter (m_jobSet.begin());
+ iter != m_jobSet.end();)
+ {
+ if (skipOnStop (iter->getType()))
+ {
+ if (report)
+ {
+ std::pair result (
+ counts.insert (std::make_pair (iter->getType(), 1)));
+ if (! result.second)
+ ++(result.first->second);
+ }
+
+ iter = m_jobSet.erase (iter);
+ }
+ else
+ {
+ ++iter;
+ }
+ }
+
+ if (report)
+ {
+ beast::Journal::ScopedStream s (m_journal.debug);
+
+ for (JobDataMap::const_iterator iter (counts.begin());
+ iter != counts.end(); ++iter)
+ {
+ s << std::endl <<
+ "Removed " << iter->second <<
+ " skiponStop jobs of type " << Job::toString (iter->first);
+ }
+ }
+ }
+ */
+}
+
+void
+JobQueue::onChildrenStopped ()
+{
+ std::lock_guard lock (m_mutex);
+ checkStopped (lock);
}
}
diff --git a/src/ripple/core/tests/Coroutine.test.cpp b/src/ripple/core/tests/Coroutine.test.cpp
new file mode 100644
index 000000000..f970bad48
--- /dev/null
+++ b/src/ripple/core/tests/Coroutine.test.cpp
@@ -0,0 +1,117 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace ripple {
+namespace test {
+
+class Coroutine_test : public beast::unit_test::suite
+{
+public:
+ void
+ test_coroutine()
+ {
+ using namespace std::chrono_literals;
+ using namespace jtx;
+ Env env(*this);
+ std::atomic i{0};
+ std::condition_variable cv;
+ auto& jq = env.app().getJobQueue();
+ jq.setThreadCount(0, false);
+ jq.postCoro(jtCLIENT, "Coroutine-Test",
+ [&](std::shared_ptr jc)
+ {
+ std::thread t(
+ [&i, jc]()
+ {
+ std::this_thread::sleep_for(20ms);
+ ++i;
+ jc->post();
+ });
+ jc->yield();
+ t.join();
+ ++i;
+ cv.notify_one();
+ });
+
+ {
+ std::mutex m;
+ std::unique_lock lk(m);
+ expect(cv.wait_for(lk, 1s,
+ [&]()
+ {
+ return i == 2;
+ }));
+ }
+ jq.shutdown();
+ expect(i == 2);
+ }
+
+ void
+ test_incorrect_order()
+ {
+ using namespace std::chrono_literals;
+ using namespace jtx;
+ Env env(*this);
+ std::atomic i{0};
+ std::condition_variable cv;
+ auto& jq = env.app().getJobQueue();
+ jq.setThreadCount(0, false);
+ jq.postCoro(jtCLIENT, "Coroutine-Test",
+ [&](std::shared_ptr jc)
+ {
+ jc->post();
+ jc->yield();
+ ++i;
+ cv.notify_one();
+ });
+
+ {
+ std::mutex m;
+ std::unique_lock lk(m);
+ expect(cv.wait_for(lk, 1s,
+ [&]()
+ {
+ return i == 1;
+ }));
+ }
+ jq.shutdown();
+ expect(i == 1);
+ }
+
+ void
+ run()
+ {
+ test_coroutine();
+ test_incorrect_order();
+ }
+};
+
+BEAST_DEFINE_TESTSUITE(Coroutine,core,ripple);
+
+} // test
+} // ripple
diff --git a/src/ripple/rpc/Context.h b/src/ripple/rpc/Context.h
index 8efd4d57d..1f44b6037 100644
--- a/src/ripple/rpc/Context.h
+++ b/src/ripple/rpc/Context.h
@@ -21,8 +21,8 @@
#define RIPPLE_RPC_CONTEXT_H_INCLUDED
#include
+#include
#include
-#include
#include
#include
@@ -46,7 +46,7 @@ struct Context
NetworkOPs& netOps;
LedgerMaster& ledgerMaster;
Role role;
- JobQueueSuspender suspend;
+ std::shared_ptr jobCoro;
InfoSub::pointer infoSub;
NodeStore::ScopedMetrics metrics;
};
@@ -54,6 +54,4 @@ struct Context
} // RPC
} // ripple
-
-
#endif
diff --git a/src/ripple/rpc/RPCHandler.h b/src/ripple/rpc/RPCHandler.h
index e5895c217..e53503330 100644
--- a/src/ripple/rpc/RPCHandler.h
+++ b/src/ripple/rpc/RPCHandler.h
@@ -29,13 +29,12 @@ namespace ripple {
namespace RPC {
struct Context;
-struct YieldStrategy;
/** Execute an RPC command and store the results in a Json::Value. */
-Status doCommand (RPC::Context&, Json::Value&, YieldStrategy const& s = {});
+Status doCommand (RPC::Context&, Json::Value&);
/** Execute an RPC command and store the results in an std::string. */
-void executeRPC (RPC::Context&, std::string&, YieldStrategy const& s = {});
+void executeRPC (RPC::Context&, std::string&);
Role roleRequired (std::string const& method );
diff --git a/src/ripple/rpc/Yield.h b/src/ripple/rpc/Yield.h
deleted file mode 100644
index 1b507806c..000000000
--- a/src/ripple/rpc/Yield.h
+++ /dev/null
@@ -1,135 +0,0 @@
-//------------------------------------------------------------------------------
-/*
- This file is part of rippled: https://github.com/ripple/rippled
- Copyright (c) 2012, 2013 Ripple Labs Inc.
-
- Permission to use, copy, modify, and/or distribute this software for any
- purpose with or without fee is hereby granted, provided that the above
- copyright notice and this permission notice appear in all copies.
-
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-*/
-//==============================================================================
-
-#ifndef RIPPLE_RPC_YIELD_H_INCLUDED
-#define RIPPLE_RPC_YIELD_H_INCLUDED
-
-#include
-#include
-#include
-#include
-#include
-
-namespace ripple {
-
-class Application;
-class BasicConfig;
-class JobQueue;
-class Section;
-
-namespace RPC {
-
-/** See the README.md in this directory for more information about how
- the RPC yield mechanism works.
- */
-
-/** Callback: do something and eventually return. Can't be empty. */
-using Callback = std::function ;
-
-/** Continuation: do something, guarantee to eventually call Callback.
- Can't be empty. */
-using Continuation = std::function ;
-
-/** Suspend: suspend execution, pending completion of a Continuation.
- Can't be empty. */
-using Suspend = std::function ;
-
-/** A non-empty Suspend that immediately calls its callback. */
-extern
-Suspend const dontSuspend;
-
-/** Wrap an Output so it yields after approximately `chunkSize` bytes.
-
- chunkedYieldingOutput() only yields after a call to output(), so there might
- more than chunkSize bytes sent between calls to yield().
-
- chunkedYieldingOutput() also only yields before it's about to output more
- data. This is to avoid the case where you yield after outputting data, but
- then never send more data.
- */
-Json::Output chunkedYieldingOutput (
- Json::Output const&, Callback const&, std::size_t chunkSize);
-
-/** Yield every yieldCount calls. If yieldCount is 0, never yield. */
-class CountedYield
-{
-public:
- CountedYield (std::size_t yieldCount, Callback const& yield);
- void yield();
-
-private:
- std::size_t count_ = 0;
- std::size_t const yieldCount_;
- Callback const yield_;
-};
-
-enum class UseCoroutines {no, yes};
-
-/** When do we yield when performing a ledger computation? */
-struct YieldStrategy
-{
- enum class Streaming {no, yes};
-
- /** Is the data streamed, or generated monolithically? */
- Streaming streaming = Streaming::no;
-
- /** Are results generated in a coroutine? If this is no, then the code can
- never yield. */
- UseCoroutines useCoroutines = UseCoroutines::no;
-
- /** How many accounts do we process before yielding? 0 means "never yield
- due to number of accounts processed." */
- std::size_t accountYieldCount = 0;
-
- /** How many transactions do we process before yielding? 0 means "never
- yield due to number of transactions processed." */
- std::size_t transactionYieldCount = 0;
-};
-
-/** Does a BasicConfig require the use of coroutines? */
-UseCoroutines useCoroutines(BasicConfig const&);
-
-/** Create a yield strategy from a BasicConfig. */
-YieldStrategy makeYieldStrategy(BasicConfig const&);
-
-/** JobQueueSuspender is a suspend, with a yield that reschedules the job
- on the job queue. */
-struct JobQueueSuspender
-{
- /** Possibly suspend current execution. */
- Suspend const suspend;
-
- /** Possibly yield and restart on the job queue. */
- Callback const yield;
-
- /** Create a JobQueueSuspender where yield does nothing and the suspend
- immediately executes the continuation. */
- JobQueueSuspender(Application&);
-
- /** Create a JobQueueSuspender with a Suspend.
-
- When yield is called, it reschedules the current job on the JobQueue
- with the given jobName. */
- JobQueueSuspender(Application&, Suspend const&, std::string const& jobName);
-};
-
-} // RPC
-} // ripple
-
-#endif
diff --git a/src/ripple/rpc/handlers/LedgerHandler.h b/src/ripple/rpc/handlers/LedgerHandler.h
index 29c9dce7c..e78096e5f 100644
--- a/src/ripple/rpc/handlers/LedgerHandler.h
+++ b/src/ripple/rpc/handlers/LedgerHandler.h
@@ -29,7 +29,6 @@
#include
#include
#include
-#include
#include
#include
@@ -87,22 +86,21 @@ private:
template
void LedgerHandler::writeResult (Object& value)
{
- auto& yield = context_.suspend.yield;
if (ledger_)
{
Json::copyFrom (value, result_);
- addJson (value, {*ledger_, options_, yield});
+ addJson (value, {*ledger_, options_});
}
else
{
auto& master = context_.app.getLedgerMaster ();
{
auto&& closed = Json::addObject (value, jss::closed);
- addJson (closed, {*master.getClosedLedger(), 0, yield});
+ addJson (closed, {*master.getClosedLedger(), 0});
}
{
auto&& open = Json::addObject (value, jss::open);
- addJson (open, {*master.getCurrentLedger(), 0, yield});
+ addJson (open, {*master.getCurrentLedger(), 0});
}
}
}
diff --git a/src/ripple/rpc/handlers/RipplePathFind.cpp b/src/ripple/rpc/handlers/RipplePathFind.cpp
index 272624bc7..d59230261 100644
--- a/src/ripple/rpc/handlers/RipplePathFind.cpp
+++ b/src/ripple/rpc/handlers/RipplePathFind.cpp
@@ -73,10 +73,6 @@ Json::Value doRipplePathFind (RPC::Context& context)
if (context.app.config().PATH_SEARCH_MAX == 0)
return rpcError (rpcNOT_SUPPORTED);
- RPC::LegacyPathFind lpf (context.role == Role::ADMIN, context.app);
- if (!lpf.isOk ())
- return rpcError (rpcTOO_BUSY);
-
context.loadType = Resource::feeHighBurdenRPC;
AccountID raSrc;
@@ -86,8 +82,7 @@ Json::Value doRipplePathFind (RPC::Context& context)
Json::Value jvResult;
- if (true || // TODO MPORTILLA temp fix to disable broken websocket coroutines
- context.app.config().RUN_STANDALONE ||
+ if (context.app.config().RUN_STANDALONE ||
context.params.isMember(jss::ledger) ||
context.params.isMember(jss::ledger_index) ||
context.params.isMember(jss::ledger_hash))
@@ -105,28 +100,27 @@ Json::Value doRipplePathFind (RPC::Context& context)
return rpcError (rpcNO_NETWORK);
}
+ PathRequest::pointer request;
context.loadType = Resource::feeHighBurdenRPC;
lpLedger = context.ledgerMaster.getClosedLedger();
- PathRequest::pointer request;
- context.suspend.suspend(
- [&request, &context, &jvResult, &lpLedger]
- (RPC::Callback const& callback)
- {
- jvResult = context.app.getPathRequests().makeLegacyPathRequest (
- request, callback, lpLedger, context.params);
- assert(callback);
- if (! request && callback)
- callback();
- });
-
+ jvResult = context.app.getPathRequests().makeLegacyPathRequest (
+ request, std::bind(&JobCoro::post, context.jobCoro),
+ lpLedger, context.params);
if (request)
+ {
+ context.jobCoro->yield();
jvResult = request->doStatus (context.params);
+ }
return jvResult;
}
- if (!context.params.isMember (jss::source_account))
+ RPC::LegacyPathFind lpf (context.role == Role::ADMIN, context.app);
+ if (! lpf.isOk ())
+ return rpcError (rpcTOO_BUSY);
+
+ if (! context.params.isMember (jss::source_account))
{
jvResult = rpcError (rpcSRC_ACT_MISSING);
}
diff --git a/src/ripple/rpc/impl/Coroutine.cpp b/src/ripple/rpc/impl/Coroutine.cpp
deleted file mode 100644
index 5e71ad1d2..000000000
--- a/src/ripple/rpc/impl/Coroutine.cpp
+++ /dev/null
@@ -1,82 +0,0 @@
-//------------------------------------------------------------------------------
-/*
- This file is part of rippled: https://github.com/ripple/rippled
- Copyright (c) 2012, 2013 Ripple Labs Inc.
-
- Permission to use, copy, modify, and/or distribute this software for any
- purpose with or without fee is hereby granted, provided that the above
- copyright notice and this permission notice appear in all copies.
-
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-*/
-//==============================================================================
-
-#include
-#include
-#include
-#include
-
-namespace ripple {
-namespace RPC {
-namespace {
-
-using CoroutineType = Continuation;
-using BoostCoroutine = boost::coroutines::asymmetric_coroutine;
-using Pull = BoostCoroutine::pull_type;
-using Push = BoostCoroutine::push_type;
-
-void runOnCoroutineImpl(std::shared_ptr pull)
-{
- while (*pull)
- {
- (*pull)();
-
- if (! *pull)
- return;
-
- if (auto continuation = pull->get())
- {
- continuation ([pull] () { runOnCoroutineImpl(pull); });
- return;
- }
- }
-}
-
-} // namespace
-
-void runOnCoroutine(Coroutine const& coroutine)
-{
- auto pullFunction = [coroutine] (Push& push)
- {
- Suspend suspend = [&push] (CoroutineType const& cbc)
- {
- if (push)
- push (cbc);
- };
-
- // Run once doing nothing, to get the other side started.
- suspend([] (Callback const& callback) { callback(); });
-
- // Now run the coroutine.
- coroutine(suspend);
- };
-
- runOnCoroutineImpl(std::make_shared(pullFunction));
-}
-
-void runOnCoroutine(UseCoroutines useCoroutines, Coroutine const& coroutine)
-{
- if (useCoroutines == UseCoroutines::yes)
- runOnCoroutine(coroutine);
- else
- coroutine(dontSuspend);
-}
-
-} // RPC
-} // ripple
diff --git a/src/ripple/rpc/impl/RPCHandler.cpp b/src/ripple/rpc/impl/RPCHandler.cpp
index 2b19dce87..c0be57864 100644
--- a/src/ripple/rpc/impl/RPCHandler.cpp
+++ b/src/ripple/rpc/impl/RPCHandler.cpp
@@ -20,7 +20,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -223,7 +222,7 @@ void getResult (
} // namespace
Status doCommand (
- RPC::Context& context, Json::Value& result, YieldStrategy const&)
+ RPC::Context& context, Json::Value& result)
{
boost::optional handler;
if (auto error = fillHandler (context, handler))
@@ -240,7 +239,7 @@ Status doCommand (
/** Execute an RPC command and store the results in a string. */
void executeRPC (
- RPC::Context& context, std::string& output, YieldStrategy const& strategy)
+ RPC::Context& context, std::string& output)
{
boost::optional handler;
if (auto error = fillHandler (context, handler))
@@ -258,10 +257,7 @@ void executeRPC (
{
auto object = Json::Value (Json::objectValue);
getResult (context, method, object, handler->name_);
- if (strategy.streaming == YieldStrategy::Streaming::yes)
- output = jsonAsString (object);
- else
- output = to_string (object);
+ output = to_string (object);
}
else
{
diff --git a/src/ripple/rpc/impl/Yield.cpp b/src/ripple/rpc/impl/Yield.cpp
deleted file mode 100644
index 6f6251096..000000000
--- a/src/ripple/rpc/impl/Yield.cpp
+++ /dev/null
@@ -1,131 +0,0 @@
-//------------------------------------------------------------------------------
-/*
- This file is part of rippled: https://github.com/ripple/rippled
- Copyright (c) 2012, 2013 Ripple Labs Inc.
-
- Permission to use, copy, modify, and/or distribute this software for any
- purpose with or without fee is hereby granted, provided that the above
- copyright notice and this permission notice appear in all copies.
-
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-*/
-//==============================================================================
-
-#include
-#include
-#include
-#include
-#include
-
-namespace ripple {
-namespace RPC {
-
-static
-UseCoroutines defaultUseCoroutines = UseCoroutines::no;
-
-Suspend const dontSuspend = [] (Continuation const& continuation)
-{
- continuation([] () {});
-};
-
-namespace {
-
-void runOnJobQueue(
- Application& app, std::string const& name, Callback const& callback)
-{
- auto cb = [callback] (Job&) { callback(); };
- app.getJobQueue().addJob(jtCLIENT, name, cb);
-};
-
-Callback suspendForJobQueue(
- Application& app, Suspend const& suspend, std::string const& jobName)
-{
- assert(suspend);
- return Callback( [suspend, jobName, &app] () {
- suspend([jobName, &app] (Callback const& callback) {
- runOnJobQueue(app, jobName, callback);
- });
- });
-}
-
-} // namespace
-
-Json::Output chunkedYieldingOutput (
- Json::Output const& output, Callback const& yield, std::size_t chunkSize)
-{
- if (!yield)
- return output;
-
- auto count = std::make_shared (0);
- return [chunkSize, count, output, yield] (boost::string_ref const& bytes)
- {
- if (*count > chunkSize)
- {
- yield();
- *count = 0;
- }
- output (bytes);
- *count += bytes.size();
- };
-}
-
-CountedYield::CountedYield (std::size_t yieldCount, Callback const& yield)
- : yieldCount_ (yieldCount), yield_ (yield)
-{
-}
-
-void CountedYield::yield()
-{
- if (yieldCount_ && yield_)
- {
- if (++count_ >= yieldCount_)
- {
- yield_();
- count_ = 0;
- }
- }
-}
-
-UseCoroutines useCoroutines(BasicConfig const& config)
-{
- if (auto use = config["section"].get("use_coroutines"))
- return *use ? UseCoroutines::yes : UseCoroutines::no;
- return defaultUseCoroutines;
-}
-
-YieldStrategy makeYieldStrategy (BasicConfig const& config)
-{
- auto s = config["section"];
- YieldStrategy ys;
- ys.streaming = get (s, "streaming") ?
- YieldStrategy::Streaming::yes :
- YieldStrategy::Streaming::no;
- ys.useCoroutines = useCoroutines(config);
- ys.accountYieldCount = get (s, "account_yield_count");
- ys.transactionYieldCount = get (s, "transaction_yield_count");
-
- return ys;
-}
-
-JobQueueSuspender::JobQueueSuspender(
- Application& app, Suspend const& susp, std::string const& jobName)
- : suspend(susp ? susp : dontSuspend),
- yield(suspendForJobQueue(app, suspend, jobName))
-{
- // There's a non-empty jobName exactly if there's a non-empty Suspend.
- assert(!(susp && jobName.empty()));
-}
-
-JobQueueSuspender::JobQueueSuspender(Application &app)
- : JobQueueSuspender(app, {}, {})
-{
-}
-
-} // RPC
-} // ripple
diff --git a/src/ripple/rpc/tests/Coroutine.test.cpp b/src/ripple/rpc/tests/Coroutine.test.cpp
deleted file mode 100644
index a0344745d..000000000
--- a/src/ripple/rpc/tests/Coroutine.test.cpp
+++ /dev/null
@@ -1,133 +0,0 @@
-//------------------------------------------------------------------------------
-/*
- This file is part of rippled: https://github.com/ripple/rippled
- Copyright (c) 2012, 2013 Ripple Labs Inc.
-
- Permission to use, copy, modify, and/or distribute this software for any
- purpose with or without fee is hereby granted, provided that the above
- copyright notice and this permission notice appear in all copies.
-
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-*/
-//==============================================================================
-
-#include
-#include
-#include
-#include
-
-namespace ripple {
-namespace RPC {
-
-class Coroutine_test : public TestOutputSuite
-{
-public:
- using Strings = std::vector ;
-
- void test (int chunkSize, Strings const& expected)
- {
- auto name = std::to_string (chunkSize);
- setup (name);
-
- std::string buffer;
- Json::Output output = Json::stringOutput (buffer);
-
- auto makeContinuation = [&] (std::string const& data) {
- return Continuation ([=] (Callback const& cb) {
- output (data + " ");
- cb();
- });
- };
-
- Strings result;
- Coroutine coroutine ([&] (Suspend const& suspend)
- {
- Callback yield ([=] () { suspend (makeContinuation ("*")); });
- auto out = chunkedYieldingOutput (output, yield, chunkSize);
- out ("hello ");
- result.push_back (buffer);
-
- suspend (makeContinuation("HELLO"));
- result.push_back (buffer);
-
- out ("there ");
- result.push_back (buffer);
-
- suspend (makeContinuation("THERE"));
- result.push_back (buffer);
-
- out ("world ");
- result.push_back (buffer);
-
- suspend (makeContinuation("WORLD"));
- result.push_back (buffer);
- });
-
- runOnCoroutine(UseCoroutines::yes, coroutine);
- expectCollectionEquals (result, expected);
- }
-
- void run() override
- {
- test (0, {"hello ",
- "hello HELLO ",
- "hello HELLO * there ",
- "hello HELLO * there THERE ",
- "hello HELLO * there THERE * world ",
- "hello HELLO * there THERE * world WORLD "
- });
- test (3, {"hello ",
- "hello HELLO ",
- "hello HELLO * there ",
- "hello HELLO * there THERE ",
- "hello HELLO * there THERE * world ",
- "hello HELLO * there THERE * world WORLD "
- });
- test (5, {"hello ",
- "hello HELLO ",
- "hello HELLO * there ",
- "hello HELLO * there THERE ",
- "hello HELLO * there THERE * world ",
- "hello HELLO * there THERE * world WORLD "
- });
- test (7, {"hello ",
- "hello HELLO ",
- "hello HELLO there ",
- "hello HELLO there THERE ",
- "hello HELLO there THERE * world ",
- "hello HELLO there THERE * world WORLD "
- });
- test (10, {"hello ",
- "hello HELLO ",
- "hello HELLO there ",
- "hello HELLO there THERE ",
- "hello HELLO there THERE * world ",
- "hello HELLO there THERE * world WORLD "
- });
- test (13, {"hello ",
- "hello HELLO ",
- "hello HELLO there ",
- "hello HELLO there THERE ",
- "hello HELLO there THERE world ",
- "hello HELLO there THERE world WORLD "
- });
- test (15, {"hello ",
- "hello HELLO ",
- "hello HELLO there ",
- "hello HELLO there THERE ",
- "hello HELLO there THERE world ",
- "hello HELLO there THERE world WORLD "
- });
- }
-};
-
-BEAST_DEFINE_TESTSUITE(Coroutine, RPC, ripple);
-
-} // RPC
-} // ripple
diff --git a/src/ripple/rpc/tests/Yield.test.cpp b/src/ripple/rpc/tests/Yield.test.cpp
deleted file mode 100644
index 7bb26dc56..000000000
--- a/src/ripple/rpc/tests/Yield.test.cpp
+++ /dev/null
@@ -1,107 +0,0 @@
-//------------------------------------------------------------------------------
-/*
- This file is part of rippled: https://github.com/ripple/rippled
- Copyright (c) 2012, 2013 Ripple Labs Inc.
-
- Permission to use, copy, modify, and/or distribute this software for any
- purpose with or without fee is hereby granted, provided that the above
- copyright notice and this permission notice appear in all copies.
-
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-*/
-//==============================================================================
-
-#include
-#include
-#include
-
-namespace ripple {
-namespace RPC {
-
-struct Yield_test : TestOutputSuite
-{
- void chunkedYieldingTest ()
- {
- setup ("chunkedYieldingTest");
- std::string lastYield;
-
- auto yield = [&]() { lastYield = output_; };
- auto output = chunkedYieldingOutput (
- Json::stringOutput (output_), yield, 5);
- output ("hello");
- expectResult ("hello");
- expectEquals (lastYield, "");
-
- output (", th"); // Goes over the boundary.
- expectResult ("hello, th");
- expectEquals (lastYield, "");
-
- output ("ere!"); // Forces a yield.
- expectResult ("hello, there!");
- expectEquals (lastYield, "hello, th");
-
- output ("!!");
- expectResult ("hello, there!!!");
- expectEquals (lastYield, "hello, th");
-
- output (""); // Forces a yield.
- expectResult ("hello, there!!!");
- expectEquals (lastYield, "hello, there!!!");
- }
-
- void trivialCountedYieldTest()
- {
- setup ("trivialCountedYield");
-
- auto didYield = false;
- auto yield = [&]() { didYield = true; };
-
- CountedYield cy (0, yield);
-
- for (auto i = 0; i < 4; ++i)
- {
- cy.yield();
- expect (!didYield, "We yielded when we shouldn't have.");
- }
- }
-
- void countedYieldTest()
- {
- setup ("countedYield");
-
- auto didYield = false;
- auto yield = [&]() { didYield = true; };
-
- CountedYield cy (5, yield);
-
- for (auto j = 0; j < 3; ++j)
- {
- for (auto i = 0; i < 4; ++i)
- {
- cy.yield();
- expect (!didYield, "We yielded when we shouldn't have.");
- }
- cy.yield();
- expect (didYield, "We didn't yield");
- didYield = false;
- }
- }
-
- void run () override
- {
- chunkedYieldingTest();
- trivialCountedYieldTest();
- countedYieldTest();
- }
-};
-
-BEAST_DEFINE_TESTSUITE(Yield, ripple_basics, ripple);
-
-} // RPC
-} // ripple
diff --git a/src/ripple/server/ServerHandler.h b/src/ripple/server/ServerHandler.h
index 25293e4e3..2b92d7991 100644
--- a/src/ripple/server/ServerHandler.h
+++ b/src/ripple/server/ServerHandler.h
@@ -23,7 +23,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -67,7 +66,6 @@ public:
};
overlay_t overlay;
- RPC::YieldStrategy yieldStrategy;
void
makeContexts();
diff --git a/src/ripple/server/impl/ServerHandlerImp.cpp b/src/ripple/server/impl/ServerHandlerImp.cpp
index bb8f7a8d6..927ab17ef 100644
--- a/src/ripple/server/impl/ServerHandlerImp.cpp
+++ b/src/ripple/server/impl/ServerHandlerImp.cpp
@@ -32,7 +32,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -179,17 +178,11 @@ ServerHandlerImp::onRequest (HTTP::Session& session)
return;
}
- auto detach = session.detach();
-
- // We can copy `this` because ServerHandlerImp is a long-lasting singleton.
- auto job = [this, detach] (Job&) {
- RPC::runOnCoroutine(
- setup_.yieldStrategy.useCoroutines,
- [this, detach] (RPC::Suspend const& suspend) {
- processSession(detach, suspend);
- });
- };
- m_jobQueue.addJob(jtCLIENT, "RPC-Client", job);
+ m_jobQueue.postCoro(jtCLIENT, "RPC-Client",
+ [this, detach = session.detach()](std::shared_ptr jc)
+ {
+ processSession(detach, jc);
+ });
}
void
@@ -208,15 +201,11 @@ ServerHandlerImp::onStopped (HTTP::Server&)
// Run as a couroutine.
void
-ServerHandlerImp::processSession (
- std::shared_ptr const& session, Suspend const& suspend)
+ServerHandlerImp::processSession (std::shared_ptr const& session,
+ std::shared_ptr jobCoro)
{
- processRequest (
- session->port(),
- to_string (session->body()),
- session->remoteAddress().at_port (0),
- makeOutput (*session),
- suspend);
+ processRequest (session->port(), to_string (session->body()),
+ session->remoteAddress().at_port (0), makeOutput (*session), jobCoro);
if (session->request().keep_alive())
session->complete();
@@ -225,12 +214,9 @@ ServerHandlerImp::processSession (
}
void
-ServerHandlerImp::processRequest (
- HTTP::Port const& port,
- std::string const& request,
- beast::IP::Endpoint const& remoteIPAddress,
- Output&& output,
- Suspend const& suspend)
+ServerHandlerImp::processRequest (HTTP::Port const& port,
+ std::string const& request, beast::IP::Endpoint const& remoteIPAddress,
+ Output&& output, std::shared_ptr jobCoro)
{
auto rpcJ = app_.journal ("RPC");
// Move off the webserver thread onto the JobQueue.
@@ -352,40 +338,29 @@ ServerHandlerImp::processRequest (
auto const start (std::chrono::high_resolution_clock::now ());
- RPC::Context context {
- m_journal, params, app_, loadType, m_networkOPs, app_.getLedgerMaster(), role,
- {app_, suspend, "RPC-Coroutine"}};
+ RPC::Context context {m_journal, params, app_, loadType, m_networkOPs,
+ app_.getLedgerMaster(), role, jobCoro};
+ Json::Value result;
+ RPC::doCommand (context, result);
- std::string response;
-
- if (setup_.yieldStrategy.streaming == RPC::YieldStrategy::Streaming::yes)
+ // Always report "status". On an error report the request as received.
+ if (result.isMember (jss::error))
{
- executeRPC (context, response, setup_.yieldStrategy);
+ result[jss::status] = jss::error;
+ result[jss::request] = params;
+ JLOG (m_journal.debug) <<
+ "rpcError: " << result [jss::error] <<
+ ": " << result [jss::error_message];
}
else
{
- Json::Value result;
- RPC::doCommand (context, result, setup_.yieldStrategy);
-
- // Always report "status". On an error report the request as received.
- if (result.isMember (jss::error))
- {
- result[jss::status] = jss::error;
- result[jss::request] = params;
- JLOG (m_journal.debug) <<
- "rpcError: " << result [jss::error] <<
- ": " << result [jss::error_message];
- }
- else
- {
- result[jss::status] = jss::success;
- }
-
- Json::Value reply (Json::objectValue);
- reply[jss::result] = std::move (result);
- response = to_string (reply);
+ result[jss::status] = jss::success;
}
+ Json::Value reply (Json::objectValue);
+ reply[jss::result] = std::move (result);
+ auto response = to_string (reply);
+
rpc_time_.notify (static_cast (
std::chrono::duration_cast (
std::chrono::high_resolution_clock::now () - start)));
@@ -747,7 +722,6 @@ setup_ServerHandler(BasicConfig const& config, std::ostream& log)
{
ServerHandler::Setup setup;
setup.ports = detail::parse_Ports(config, log);
- setup.yieldStrategy = RPC::makeYieldStrategy(config);
detail::setup_Client(setup);
detail::setup_Overlay(setup);
diff --git a/src/ripple/server/impl/ServerHandlerImp.h b/src/ripple/server/impl/ServerHandlerImp.h
index 11f6489be..a4dd3c356 100644
--- a/src/ripple/server/impl/ServerHandlerImp.h
+++ b/src/ripple/server/impl/ServerHandlerImp.h
@@ -21,6 +21,7 @@
#define RIPPLE_SERVER_SERVERHANDLERIMP_H_INCLUDED
#include
+#include
#include
#include
#include
@@ -57,7 +58,6 @@ public:
private:
using Output = Json::Output;
- using Suspend = RPC::Suspend;
void
setup (Setup const& setup, beast::Journal journal) override;
@@ -109,11 +109,13 @@ private:
//--------------------------------------------------------------------------
void
- processSession (std::shared_ptr const&, Suspend const&);
+ processSession (std::shared_ptr const&,
+ std::shared_ptr jobCoro);
void
processRequest (HTTP::Port const& port, std::string const& request,
- beast::IP::Endpoint const& remoteIPAddress, Output&&, Suspend const&);
+ beast::IP::Endpoint const& remoteIPAddress, Output&&,
+ std::shared_ptr jobCoro);
//
// PropertyStream
diff --git a/src/ripple/unity/core.cpp b/src/ripple/unity/core.cpp
index 3df0ebdbf..09e8faf55 100644
--- a/src/ripple/unity/core.cpp
+++ b/src/ripple/unity/core.cpp
@@ -29,5 +29,6 @@
#include
#include
-#include
#include
+#include
+#include
diff --git a/src/ripple/unity/rpcx.cpp b/src/ripple/unity/rpcx.cpp
index f2f83c240..22b395b22 100644
--- a/src/ripple/unity/rpcx.cpp
+++ b/src/ripple/unity/rpcx.cpp
@@ -26,10 +26,8 @@
#include
-#include
#include
#include
-#include
#include
#include
@@ -106,8 +104,6 @@
#include
#include
-#include
#include
#include
#include
-#include
diff --git a/src/ripple/websocket/Connection.h b/src/ripple/websocket/Connection.h
index a456a5703..e444276e0 100644
--- a/src/ripple/websocket/Connection.h
+++ b/src/ripple/websocket/Connection.h
@@ -24,6 +24,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -31,13 +32,11 @@
#include
#include
#include
-#include
#include
#include
#include
#include
#include
-#include
#include
#include