diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj
index 76041f25a..629ce32ba 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj
+++ b/Builds/VisualStudio2015/RippleD.vcxproj
@@ -4511,6 +4511,10 @@
True
True
+
+ True
+ True
+
True
True
diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters
index c75f01036..244c56165 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters
@@ -5265,6 +5265,9 @@
test\core
+
+ test\core
+
test\core
diff --git a/src/ripple/app/consensus/RCLValidations.cpp b/src/ripple/app/consensus/RCLValidations.cpp
index f1de8f55a..14c352fc2 100644
--- a/src/ripple/app/consensus/RCLValidations.cpp
+++ b/src/ripple/app/consensus/RCLValidations.cpp
@@ -59,8 +59,8 @@ RCLValidationsPolicy::onStale(RCLValidation&& v)
if (staleWriting_)
return;
- staleWriting_ = true;
- app_.getJobQueue().addJob(
+ // addJob() may return false (Job not added) at shutdown.
+ staleWriting_ = app_.getJobQueue().addJob(
jtWRITE, "Validations::doStaleWrite", [this](Job&) {
auto event =
app_.getJobQueue().makeLoadEvent(jtDISK, "ValidationWrite");
diff --git a/src/ripple/app/ledger/Ledger.cpp b/src/ripple/app/ledger/Ledger.cpp
index 2881a67fe..61543b9b9 100644
--- a/src/ripple/app/ledger/Ledger.cpp
+++ b/src/ripple/app/ledger/Ledger.cpp
@@ -1048,25 +1048,22 @@ bool pendSaveValidated (
return true;
}
- if (isSynchronous)
- return saveValidatedLedger(app, ledger, isCurrent);
+ JobType const jobType {isCurrent ? jtPUBLEDGER : jtPUBOLDLEDGER};
+ char const* const jobName {
+ isCurrent ? "Ledger::pendSave" : "Ledger::pendOldSave"};
- auto job = [ledger, &app, isCurrent] (Job&) {
- saveValidatedLedger(app, ledger, isCurrent);
- };
-
- if (isCurrent)
+ // See if we can use the JobQueue.
+ if (!isSynchronous &&
+ app.getJobQueue().addJob (jobType, jobName,
+ [&app, ledger, isCurrent] (Job&) {
+ saveValidatedLedger(app, ledger, isCurrent);
+ }))
{
- app.getJobQueue().addJob(
- jtPUBLEDGER, "Ledger::pendSave", job);
- }
- else
- {
- app.getJobQueue ().addJob(
- jtPUBOLDLEDGER, "Ledger::pendOldSave", job);
+ return true;
}
- return true;
+ // The JobQueue won't do the Job. Do the save synchronously.
+ return saveValidatedLedger(app, ledger, isCurrent);
}
void
diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h
index 21006d361..9f522e1b4 100644
--- a/src/ripple/app/ledger/LedgerMaster.h
+++ b/src/ripple/app/ledger/LedgerMaster.h
@@ -203,9 +203,9 @@ public:
void setBuildingLedger (LedgerIndex index);
void tryAdvance ();
- void newPathRequest ();
+ bool newPathRequest (); // Returns true if path request successfully placed.
bool isNewPathRequest ();
- void newOrderBookDB ();
+ bool newOrderBookDB (); // Returns true if able to fulfill request.
bool fixIndex (
LedgerIndex ledgerIndex, LedgerHash const& ledgerHash);
@@ -242,6 +242,9 @@ public:
std::size_t getFetchPackCacheSize () const;
private:
+ using ScopedLockType = std::lock_guard ;
+ using ScopedUnlockType = GenericScopedUnlock ;
+
void setValidLedger(
std::shared_ptr const& l);
void setPubLedger(
@@ -255,8 +258,9 @@ private:
boost::optional getLedgerHashForHistory(LedgerIndex index);
std::size_t getNeededValidations();
void advanceThread();
- // Try to publish ledgers, acquire missing ledgers
- void doAdvance();
+ // Try to publish ledgers, acquire missing ledgers. Always called with
+ // m_mutex locked. The passed ScopedLockType is a reminder to callers.
+ void doAdvance(ScopedLockType&);
bool shouldFetchPack(std::uint32_t seq) const;
bool shouldAcquire(
std::uint32_t const currentLedger,
@@ -268,12 +272,12 @@ private:
findNewLedgersToPublish();
void updatePaths(Job& job);
- void newPFWork(const char *name);
+
+ // Returns true if work started. Always called with m_mutex locked.
+ // The passed ScopedLockType is a reminder to callers.
+ bool newPFWork(const char *name, ScopedLockType&);
private:
- using ScopedLockType = std::lock_guard ;
- using ScopedUnlockType = GenericScopedUnlock ;
-
Application& app_;
beast::Journal m_journal;
diff --git a/src/ripple/app/ledger/OrderBookDB.cpp b/src/ripple/app/ledger/OrderBookDB.cpp
index f28c0afb0..183007d9f 100644
--- a/src/ripple/app/ledger/OrderBookDB.cpp
+++ b/src/ripple/app/ledger/OrderBookDB.cpp
@@ -101,6 +101,15 @@ void OrderBookDB::update(
{
for(auto& sle : ledger->sles)
{
+ if (isStopping())
+ {
+ JLOG (j_.info())
+ << "OrderBookDB::update exiting due to isStopping";
+ std::lock_guard sl (mLock);
+ mSeq = 0;
+ return;
+ }
+
if (sle->getType () == ltDIR_NODE &&
sle->isFieldPresent (sfExchangeRate) &&
sle->getFieldH256 (sfRootIndex) == sle->key())
diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp
index 5deed8de7..959ad2b6b 100644
--- a/src/ripple/app/ledger/impl/LedgerMaster.cpp
+++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp
@@ -919,7 +919,7 @@ LedgerMaster::advanceThread()
try
{
- doAdvance();
+ doAdvance(sl);
}
catch (std::exception const&)
{
@@ -1159,6 +1159,7 @@ LedgerMaster::updatePaths (Job& job)
{
JLOG (m_journal.debug())
<< "Published ledger too old for updating paths";
+ ScopedLockType ml (m_mutex);
--mPathFindThread;
return;
}
@@ -1193,48 +1194,51 @@ LedgerMaster::updatePaths (Job& job)
}
}
-void
+bool
LedgerMaster::newPathRequest ()
{
ScopedLockType ml (m_mutex);
- mPathFindNewRequest = true;
-
- newPFWork("pf:newRequest");
+ mPathFindNewRequest = newPFWork("pf:newRequest", ml);
+ return mPathFindNewRequest;
}
bool
LedgerMaster::isNewPathRequest ()
{
ScopedLockType ml (m_mutex);
- if (!mPathFindNewRequest)
- return false;
+ bool const ret = mPathFindNewRequest;
mPathFindNewRequest = false;
- return true;
+ return ret;
}
// If the order book is radically updated, we need to reprocess all
// pathfinding requests.
-void
+bool
LedgerMaster::newOrderBookDB ()
{
ScopedLockType ml (m_mutex);
mPathLedger.reset();
- newPFWork("pf:newOBDB");
+ return newPFWork("pf:newOBDB", ml);
}
/** A thread needs to be dispatched to handle pathfinding work of some kind.
*/
-void
-LedgerMaster::newPFWork (const char *name)
+bool
+LedgerMaster::newPFWork (const char *name, ScopedLockType&)
{
if (mPathFindThread < 2)
{
- ++mPathFindThread;
- app_.getJobQueue().addJob (
+ if (app_.getJobQueue().addJob (
jtUPDATE_PF, name,
- [this] (Job& j) { updatePaths(j); });
+ [this] (Job& j) { updatePaths(j); }))
+ {
+ ++mPathFindThread;
+ }
}
+ // If we're stopping don't give callers the expectation that their
+ // request will be fulfilled, even if it may be serviced.
+ return mPathFindThread > 0 && !isStopping();
}
std::recursive_mutex&
@@ -1520,7 +1524,7 @@ LedgerMaster::shouldAcquire (
}
// Try to publish ledgers, acquire missing ledgers
-void LedgerMaster::doAdvance ()
+void LedgerMaster::doAdvance (ScopedLockType& sl)
{
// TODO NIKB: simplify and unindent this a bit!
@@ -1707,9 +1711,8 @@ void LedgerMaster::doAdvance ()
}
}
- progress = true;
app_.getOPs().clearNeedNetworkLedger();
- newPFWork ("pf:newLedger");
+ progress = newPFWork ("pf:newLedger", sl);
}
if (progress)
mAdvanceWork = true;
diff --git a/src/ripple/app/ledger/impl/TransactionAcquire.cpp b/src/ripple/app/ledger/impl/TransactionAcquire.cpp
index 23da237b5..f846a92a2 100644
--- a/src/ripple/app/ledger/impl/TransactionAcquire.cpp
+++ b/src/ripple/app/ledger/impl/TransactionAcquire.cpp
@@ -82,6 +82,11 @@ void TransactionAcquire::done ()
uint256 const& hash (mHash);
std::shared_ptr const& map (mMap);
auto const pap = &app_;
+ // Note that, when we're in the process of shutting down, addJob()
+ // may reject the request. If that happens then giveSet() will
+ // not be called. That's fine. According to David the giveSet() call
+ // just updates the consensus and related structures when we acquire
+ // a transaction set. No need to update them if we're shutting down.
app_.getJobQueue().addJob (jtTXN_DATA, "completeAcquire",
[pap, hash, map](Job&)
{
diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp
index a19ea27db..d952b1736 100644
--- a/src/ripple/app/main/Application.cpp
+++ b/src/ripple/app/main/Application.cpp
@@ -905,29 +905,27 @@ public:
if (timer == m_sweepTimer)
{
- // VFALCO TODO Move all this into doSweep
-
- if (! config_->standalone())
- {
- boost::filesystem::space_info space =
- boost::filesystem::space (config_->legacy ("database_path"));
-
- // VFALCO TODO Give this magic constant a name and move it into a well documented header
- //
- if (space.available < (512 * 1024 * 1024))
- {
- JLOG(m_journal.fatal())
- << "Remaining free disk space is less than 512MB";
- signalStop ();
- }
- }
-
- m_jobQueue->addJob(jtSWEEP, "sweep", [this] (Job&) { doSweep(); });
+ m_jobQueue->addJob(
+ jtSWEEP, "sweep", [this] (Job&) { doSweep(); });
}
}
void doSweep ()
{
+ if (! config_->standalone())
+ {
+ boost::filesystem::space_info space =
+ boost::filesystem::space (config_->legacy ("database_path"));
+
+ constexpr std::uintmax_t bytes512M = 512 * 1024 * 1024;
+ if (space.available < (bytes512M))
+ {
+ JLOG(m_journal.fatal())
+ << "Remaining free disk space is less than 512MB";
+ signalStop ();
+ }
+ }
+
// VFALCO NOTE Does the order of calls matter?
// VFALCO TODO fix the dependency inversion using an observer,
// have listeners register for "onSweep ()" notification.
@@ -943,7 +941,7 @@ public:
family().treecache().sweep();
cachedSLEs_.expire();
- // VFALCO NOTE does the call to sweep() happen on another thread?
+ // Set timer to do another sweep later.
m_sweepTimer.setExpiration (
std::chrono::seconds {config_->getSize (siSweepInterval)});
}
diff --git a/src/ripple/app/main/NodeStoreScheduler.cpp b/src/ripple/app/main/NodeStoreScheduler.cpp
index 54351c471..dfe0121df 100644
--- a/src/ripple/app/main/NodeStoreScheduler.cpp
+++ b/src/ripple/app/main/NodeStoreScheduler.cpp
@@ -25,8 +25,6 @@ namespace ripple {
NodeStoreScheduler::NodeStoreScheduler (Stoppable& parent)
: Stoppable ("NodeStoreScheduler", parent)
- , m_jobQueue (nullptr)
- , m_taskCount (0)
{
}
@@ -48,16 +46,29 @@ void NodeStoreScheduler::onChildrenStopped ()
void NodeStoreScheduler::scheduleTask (NodeStore::Task& task)
{
++m_taskCount;
- m_jobQueue->addJob (
+ if (!m_jobQueue->addJob (
jtWRITE,
"NodeObject::store",
- [this, &task] (Job&) { doTask(task); });
+ [this, &task] (Job&) { doTask(task); }))
+ {
+ // Job not added, presumably because we're shutting down.
+ // Recover by executing the task synchronously.
+ doTask (task);
+ }
}
void NodeStoreScheduler::doTask (NodeStore::Task& task)
{
task.performScheduledTask ();
- if ((--m_taskCount == 0) && isStopping())
+
+ // NOTE: It feels a bit off that there are two different methods that
+ // call stopped(): onChildrenStopped() and doTask(). There's a
+ // suspicion that, as long as the Stoppable tree is configured
+ // correctly, this call to stopped() in doTask() can never occur.
+ //
+ // However, until we increase our confidence that the suspicion is
+ // correct, we will leave this code in place.
+ if ((--m_taskCount == 0) && isStopping() && areChildrenStopped())
stopped();
}
diff --git a/src/ripple/app/main/NodeStoreScheduler.h b/src/ripple/app/main/NodeStoreScheduler.h
index e6689011b..657fe6499 100644
--- a/src/ripple/app/main/NodeStoreScheduler.h
+++ b/src/ripple/app/main/NodeStoreScheduler.h
@@ -49,8 +49,8 @@ public:
private:
void doTask (NodeStore::Task& task);
- JobQueue* m_jobQueue;
- std::atomic m_taskCount;
+ JobQueue* m_jobQueue {nullptr};
+ std::atomic m_taskCount {0};
};
} // ripple
diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp
index c749b7330..e0e84c80b 100644
--- a/src/ripple/app/misc/NetworkOPs.cpp
+++ b/src/ripple/app/misc/NetworkOPs.cpp
@@ -44,7 +44,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -218,8 +217,7 @@ public:
~NetworkOPsImp() override
{
- jobCounter_.join();
- // this clear() is necessary to ensure the shared_ptrs in this map get
+ // This clear() is necessary to ensure the shared_ptrs in this map get
// destroyed NOW because the objects in this map invoke methods on this
// class when they are destroyed
mRpcSubMap.clear();
@@ -486,9 +484,6 @@ public:
m_heartbeatTimer.cancel();
m_clusterTimer.cancel();
- // Wait until all our in-flight Jobs are completed.
- jobCounter_.join();
-
stopped ();
}
@@ -540,7 +535,6 @@ private:
DeadlineTimer m_heartbeatTimer;
DeadlineTimer m_clusterTimer;
- JobCounter jobCounter_;
RCLConsensus mConsensus;
@@ -657,14 +651,14 @@ void NetworkOPsImp::onDeadlineTimer (DeadlineTimer& timer)
{
if (timer == m_heartbeatTimer)
{
- m_job_queue.addCountedJob (
- jtNETOP_TIMER, "NetOPs.heartbeat", jobCounter_,
+ m_job_queue.addJob (
+ jtNETOP_TIMER, "NetOPs.heartbeat",
[this] (Job&) { processHeartbeatTimer(); });
}
else if (timer == m_clusterTimer)
{
- m_job_queue.addCountedJob (
- jtNETOP_CLUSTER, "NetOPs.cluster", jobCounter_,
+ m_job_queue.addJob (
+ jtNETOP_CLUSTER, "NetOPs.cluster",
[this] (Job&) { processClusterTimer(); });
}
}
@@ -837,8 +831,8 @@ void NetworkOPsImp::submitTransaction (std::shared_ptr const& iTrans
auto tx = std::make_shared (
trans, reason, app_);
- m_job_queue.addCountedJob (
- jtTRANSACTION, "submitTxn", jobCounter_,
+ m_job_queue.addJob (
+ jtTRANSACTION, "submitTxn",
[this, tx] (Job&) {
auto t = tx;
processTransaction(t, false, false, FailHard::no);
@@ -904,8 +898,8 @@ void NetworkOPsImp::doTransactionAsync (std::shared_ptr transaction
if (mDispatchState == DispatchState::none)
{
- if (m_job_queue.addCountedJob (
- jtBATCH, "transactionBatch", jobCounter_,
+ if (m_job_queue.addJob (
+ jtBATCH, "transactionBatch",
[this] (Job&) { transactionBatch(); }))
{
mDispatchState = DispatchState::scheduled;
@@ -939,8 +933,8 @@ void NetworkOPsImp::doTransactionSync (std::shared_ptr transaction,
if (mTransactions.size())
{
// More transactions need to be applied, but by another job.
- if (m_job_queue.addCountedJob (
- jtBATCH, "transactionBatch", jobCounter_,
+ if (m_job_queue.addJob (
+ jtBATCH, "transactionBatch",
[this] (Job&) { transactionBatch(); }))
{
mDispatchState = DispatchState::scheduled;
@@ -2466,8 +2460,8 @@ void NetworkOPsImp::reportFeeChange ()
// only schedule the job if something has changed
if (f != mLastFeeSummary)
{
- m_job_queue.addCountedJob (
- jtCLIENT, "reportFeeChange->pubServer", jobCounter_,
+ m_job_queue.addJob (
+ jtCLIENT, "reportFeeChange->pubServer",
[this] (Job&) { pubServer(); });
}
}
@@ -3307,10 +3301,6 @@ NetworkOPs::NetworkOPs (Stoppable& parent)
{
}
-NetworkOPs::~NetworkOPs ()
-{
-}
-
//------------------------------------------------------------------------------
diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h
index c47e3ffb5..555605a3c 100644
--- a/src/ripple/app/misc/NetworkOPs.h
+++ b/src/ripple/app/misc/NetworkOPs.h
@@ -20,14 +20,14 @@
#ifndef RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED
#define RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED
-#include
-#include
#include
#include
+#include
+#include
#include
#include
+#include
#include
-#include
#include
#include
@@ -96,7 +96,7 @@ public:
}
public:
- virtual ~NetworkOPs () = 0;
+ ~NetworkOPs () override = default;
//--------------------------------------------------------------------------
//
diff --git a/src/ripple/app/paths/PathRequest.h b/src/ripple/app/paths/PathRequest.h
index 4d2fa1ecf..f5d2254db 100644
--- a/src/ripple/app/paths/PathRequest.h
+++ b/src/ripple/app/paths/PathRequest.h
@@ -70,7 +70,7 @@ public:
beast::Journal journal);
// ripple_path_find semantics
- // Completion function is called
+ // Completion function is called after path update is complete
PathRequest (
Application& app,
std::function const& completion,
@@ -83,6 +83,8 @@ public:
bool isNew ();
bool needsUpdate (bool newOnly, LedgerIndex index);
+
+ // Called when the PathRequest update is complete.
void updateComplete ();
std::pair doCreate (
diff --git a/src/ripple/app/paths/PathRequests.cpp b/src/ripple/app/paths/PathRequests.cpp
index 69d76ceee..0c71c48b4 100644
--- a/src/ripple/app/paths/PathRequests.cpp
+++ b/src/ripple/app/paths/PathRequests.cpp
@@ -23,6 +23,8 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
@@ -246,7 +248,12 @@ PathRequests::makeLegacyPathRequest(
else
{
insertPathRequest (req);
- app_.getLedgerMaster().newPathRequest();
+ if (! app_.getLedgerMaster().newPathRequest())
+ {
+ // The newPathRequest failed. Tell the caller.
+ result.second = rpcError (rpcTOO_BUSY);
+ req.reset();
+ }
}
return std::move (result.second);
diff --git a/src/ripple/app/paths/PathRequests.h b/src/ripple/app/paths/PathRequests.h
index c8359566f..fb5cc614e 100644
--- a/src/ripple/app/paths/PathRequests.h
+++ b/src/ripple/app/paths/PathRequests.h
@@ -33,6 +33,7 @@ namespace ripple {
class PathRequests
{
public:
+ /** A collection of all PathRequest instances. */
PathRequests (Application& app,
beast::Journal journal, beast::insight::Collector::ptr const& collector)
: app_ (app)
@@ -43,6 +44,11 @@ public:
mFull = collector->make_event ("pathfind_full");
}
+ /** Update all of the contained PathRequest instances.
+
+ @param ledger Ledger we are pathfinding in.
+ @param shouldCancel Invocable that returns whether to cancel.
+ */
void updateAll (std::shared_ptr const& ledger,
Job::CancelCallback shouldCancel);
diff --git a/src/ripple/core/Coro.ipp b/src/ripple/core/Coro.ipp
index 8eeadd32f..73198e922 100644
--- a/src/ripple/core/Coro.ipp
+++ b/src/ripple/core/Coro.ipp
@@ -48,7 +48,9 @@ inline
JobQueue::Coro::
~Coro()
{
+#ifndef NDEBUG
assert(finished_);
+#endif
}
inline
@@ -64,7 +66,7 @@ yield() const
}
inline
-void
+bool
JobQueue::Coro::
post()
{
@@ -74,23 +76,76 @@ post()
}
// sp keeps 'this' alive
- jq_.addJob(type_, name_,
+ if (jq_.addJob(type_, name_,
[this, sp = shared_from_this()](Job&)
{
- {
- std::lock_guard lock(jq_.m_mutex);
- --jq_.nSuspend_;
- }
- auto saved = detail::getLocalValues().release();
- detail::getLocalValues().reset(&lvs_);
- std::lock_guard lock(mutex_);
- coro_();
- detail::getLocalValues().release();
- detail::getLocalValues().reset(saved);
- std::lock_guard lk(mutex_run_);
- running_ = false;
- cv_.notify_all();
- });
+ resume();
+ }))
+ {
+ return true;
+ }
+
+ // The coroutine will not run. Clean up running_.
+ std::lock_guard lk(mutex_run_);
+ running_ = false;
+ cv_.notify_all();
+ return false;
+}
+
+inline
+void
+JobQueue::Coro::
+resume()
+{
+ {
+ std::lock_guard lk(mutex_run_);
+ running_ = true;
+ }
+ {
+ std::lock_guard lock(jq_.m_mutex);
+ --jq_.nSuspend_;
+ }
+ auto saved = detail::getLocalValues().release();
+ detail::getLocalValues().reset(&lvs_);
+ std::lock_guard lock(mutex_);
+ assert (coro_);
+ coro_();
+ detail::getLocalValues().release();
+ detail::getLocalValues().reset(saved);
+ std::lock_guard lk(mutex_run_);
+ running_ = false;
+ cv_.notify_all();
+}
+
+inline
+bool
+JobQueue::Coro::
+runnable() const
+{
+ return static_cast(coro_);
+}
+
+inline
+void
+JobQueue::Coro::
+expectEarlyExit()
+{
+#ifndef NDEBUG
+ if (! finished_)
+#endif
+ {
+ // expectEarlyExit() must only ever be called from outside the
+ // Coro's stack. It you're inside the stack you can simply return
+ // and be done.
+ //
+ // That said, since we're outside the Coro's stack, we need to
+ // decrement the nSuspend that the Coro's call to yield caused.
+ std::lock_guard lock(jq_.m_mutex);
+ --jq_.nSuspend_;
+#ifndef NDEBUG
+ finished_ = true;
+#endif
+ }
}
inline
diff --git a/src/ripple/core/JobCounter.h b/src/ripple/core/JobCounter.h
index 834f3bd67..a80e24824 100644
--- a/src/ripple/core/JobCounter.h
+++ b/src/ripple/core/JobCounter.h
@@ -20,6 +20,7 @@
#ifndef RIPPLE_CORE_JOB_COUNTER_H_INCLUDED
#define RIPPLE_CORE_JOB_COUNTER_H_INCLUDED
+#include
#include
#include
#include
@@ -126,18 +127,31 @@ public:
/** Destructor verifies all in-flight jobs are complete. */
~JobCounter()
{
- join();
+ using namespace std::chrono_literals;
+ join ("JobCounter", 1s, debugLog());
}
- /** Returns once all counted in-flight Jobs are destroyed. */
- void join()
+ /** Returns once all counted in-flight Jobs are destroyed.
+
+ @param name Name reported if join time exceeds wait.
+ @param wait If join() exceeds this duration report to Journal.
+ @param j Journal written to if wait is exceeded.
+ */
+ void join (char const* name,
+ std::chrono::milliseconds wait, beast::Journal j)
{
std::unique_lock lock {mutex_};
waitForJobs_ = true;
if (jobCount_ > 0)
{
- allJobsDoneCond_.wait (
- lock, [this] { return jobCount_ == 0; });
+ if (! allJobsDoneCond_.wait_for (
+ lock, wait, [this] { return jobCount_ == 0; }))
+ {
+ if (auto stream = j.error())
+ stream << name << " waiting for JobCounter::join().";
+ allJobsDoneCond_.wait (lock, [this] { return jobCount_ == 0; });
+ }
+
}
}
diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h
index cf2a87319..8c46c10d2 100644
--- a/src/ripple/core/JobQueue.h
+++ b/src/ripple/core/JobQueue.h
@@ -97,9 +97,31 @@ public:
When the job runs, the coroutine's stack is restored and execution
continues at the beginning of coroutine function or the statement
after the previous call to yield.
- Undefined behavior if called consecutively without a corresponding yield.
+ Undefined behavior if called after the coroutine has completed
+ with a return (as opposed to a yield()).
+ Undefined behavior if post() or resume() called consecutively
+ without a corresponding yield.
+
+ @return true if the Coro's job is added to the JobQueue.
*/
- void post();
+ bool post();
+
+ /** Resume coroutine execution.
+ Effects:
+ The coroutine continues execution from where it last left off
+ using this same thread.
+ Undefined behavior if called after the coroutine has completed
+ with a return (as opposed to a yield()).
+ Undefined behavior if resume() or post() called consecutively
+ without a corresponding yield.
+ */
+ void resume();
+
+ /** Returns true if the Coro is still runnable (has not returned). */
+ bool runnable() const;
+
+ /** Once called, the Coro allows early exit without an assert. */
+ void expectEarlyExit();
/** Waits until coroutine returns from the user function. */
void join();
@@ -113,29 +135,23 @@ public:
/** Adds a job to the JobQueue.
- @param t The type of job.
+ @param type The type of job.
@param name Name of the job.
- @param func std::function with signature void (Job&). Called when the job is executed.
- */
- void addJob (JobType type, std::string const& name, JobFunction const& func);
-
- /** Adds a counted job to the JobQueue.
-
- @param t The type of job.
- @param name Name of the job.
- @param counter JobCounter for counting the Job.
@param jobHandler Lambda with signature void (Job&). Called when the job is executed.
- @return true if JobHandler added, false if JobCounter is already joined.
+ @return true if jobHandler added to queue.
*/
- template
- bool addCountedJob (JobType type,
- std::string const& name, JobCounter& counter, JobHandler&& jobHandler)
+ template ()(std::declval())), void>::value>>
+ bool addJob (JobType type,
+ std::string const& name, JobHandler&& jobHandler)
{
- if (auto optionalCountedJob = counter.wrap (std::move (jobHandler)))
+ if (auto optionalCountedJob =
+ Stoppable::jobCounter().wrap (std::forward(jobHandler)))
{
- addJob (type, name, std::move (*optionalCountedJob));
- return true;
+ return addRefCountedJob (
+ type, name, std::move (*optionalCountedJob));
}
return false;
}
@@ -145,9 +161,11 @@ public:
@param t The type of job.
@param name Name of the job.
@param f Has a signature of void(std::shared_ptr). Called when the job executes.
+
+ @return shared_ptr to posted Coro. nullptr if post was not successful.
*/
template
- void postCoro (JobType t, std::string const& name, F&& f);
+ std::shared_ptr postCoro (JobType t, std::string const& name, F&& f);
/** Jobs waiting at this priority.
*/
@@ -227,6 +245,16 @@ private:
// Signals the service stopped if the stopped condition is met.
void checkStopped (std::lock_guard const& lock);
+ // Adds a reference counted job to the JobQueue.
+ //
+ // param type The type of job.
+ // param name Name of the job.
+ // param func std::function with signature void (Job&). Called when the job is executed.
+ //
+ // return true if func added to queue.
+ bool addRefCountedJob (
+ JobType type, std::string const& name, JobFunction const& func);
+
// Signals an added Job for processing.
//
// Pre-conditions:
@@ -311,15 +339,15 @@ private:
other requests while the RPC command completes its work asynchronously.
postCoro() creates a Coro object. When the Coro ctor is called, and its
- coro_ member is initialized(a boost::coroutines::pull_type), execution
+ coro_ member is initialized (a boost::coroutines::pull_type), execution
automatically passes to the coroutine, which we don't want at this point,
since we are still in the handler thread context. It's important to note here
that construction of a boost pull_type automatically passes execution to the
coroutine. A pull_type object automatically generates a push_type that is
- used as the as a parameter(do_yield) in the signature of the function the
+ passed as a parameter (do_yield) in the signature of the function the
pull_type was created with. This function is immediately called during coro_
construction and within it, Coro::yield_ is assigned the push_type
- parameter(do_yield) address and called(yield()) so we can return execution
+ parameter (do_yield) address and called (yield()) so we can return execution
back to the caller's stack.
postCoro() then calls Coro::post(), which schedules a job on the job
@@ -368,15 +396,23 @@ private:
namespace ripple {
template
-void JobQueue::postCoro (JobType t, std::string const& name, F&& f)
+std::shared_ptr
+JobQueue::postCoro (JobType t, std::string const& name, F&& f)
{
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr).
*/
- auto const coro = std::make_shared(
+ auto coro = std::make_shared(
Coro_create_t{}, *this, t, name, std::forward(f));
- coro->post();
+ if (! coro->post())
+ {
+ // The Coro was not successfully posted. Disable it so it's destructor
+ // can run with no negative side effects. Then destroy it.
+ coro->expectEarlyExit();
+ coro.reset();
+ }
+ return coro;
}
}
diff --git a/src/ripple/core/Stoppable.h b/src/ripple/core/Stoppable.h
index fde3eb591..aae8adef0 100644
--- a/src/ripple/core/Stoppable.h
+++ b/src/ripple/core/Stoppable.h
@@ -23,6 +23,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -167,6 +168,30 @@ class RootStoppable;
when the last thread is about to exit it would call stopped().
@note A Stoppable may not be restarted.
+
+ The form of the Stoppable tree in the rippled application evolves as
+ the source code changes and reacts to new demands. As of March in 2017
+ the Stoppable tree had this form:
+
+ @code
+
+ Application
+ |
+ +--------------------+--------------------+
+ | | |
+ LoadManager SHAMapStore NodeStoreScheduler
+ |
+ JobQueue
+ |
+ +-----------+-----------+-----------+-----------+----+--------+
+ | | | | | |
+ | NetworkOPs | InboundLedgers | OrderbookDB
+ | | |
+ Overlay InboundTransactions LedgerMaster
+ | |
+ PeerFinder LedgerCleaner
+
+ @endcode
*/
/** @{ */
class Stoppable
@@ -190,6 +215,9 @@ public:
/** Returns `true` if all children have stopped. */
bool areChildrenStopped () const;
+ /* JobQueue uses this method for Job counting. */
+ inline JobCounter& jobCounter ();
+
/** Sleep or wake up on stop.
@return `true` if we are stopping
@@ -282,9 +310,8 @@ private:
std::string m_name;
RootStoppable& m_root;
Child m_child;
- std::atomic m_started;
- std::atomic m_stopped;
- std::atomic m_childrenStopped;
+ std::atomic m_stopped {false};
+ std::atomic m_childrenStopped {false};
Children m_children;
beast::WaitableEvent m_stoppedEvent;
};
@@ -296,7 +323,7 @@ class RootStoppable : public Stoppable
public:
explicit RootStoppable (std::string name);
- ~RootStoppable () = default;
+ ~RootStoppable ();
bool isStopping() const;
@@ -326,6 +353,18 @@ public:
*/
void stop (beast::Journal j);
+ /** Return true if start() was ever called. */
+ bool started () const
+ {
+ return m_started;
+ }
+
+ /* JobQueue uses this method for Job counting. */
+ JobCounter& rootJobCounter ()
+ {
+ return jobCounter_;
+ }
+
/** Sleep or wake up on stop.
@return `true` if we are stopping
@@ -346,15 +385,24 @@ private:
*/
bool stopAsync(beast::Journal j);
- std::atomic m_prepared;
- std::atomic m_calledStop;
+ std::atomic m_prepared {false};
+ std::atomic m_started {false};
+ std::atomic m_calledStop {false};
std::mutex m_;
std::condition_variable c_;
+ JobCounter jobCounter_;
};
/** @} */
//------------------------------------------------------------------------------
+JobCounter& Stoppable::jobCounter ()
+{
+ return m_root.rootJobCounter();
+}
+
+//------------------------------------------------------------------------------
+
template
bool
RootStoppable::alertable_sleep_for(
diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp
index ffb919a43..f68897e80 100644
--- a/src/ripple/core/impl/JobQueue.cpp
+++ b/src/ripple/core/impl/JobQueue.cpp
@@ -67,8 +67,8 @@ JobQueue::collect ()
job_count = m_jobSet.size ();
}
-void
-JobQueue::addJob (JobType type, std::string const& name,
+bool
+JobQueue::addRefCountedJob (JobType type, std::string const& name,
JobFunction const& func)
{
assert (type != jtINVALID);
@@ -76,7 +76,7 @@ JobQueue::addJob (JobType type, std::string const& name,
auto iter (m_jobData.find (type));
assert (iter != m_jobData.end ());
if (iter == m_jobData.end ())
- return;
+ return false;
JobTypeData& data (iter->second);
@@ -108,6 +108,7 @@ JobQueue::addJob (JobType type, std::string const& name,
data.load (), func, m_cancelCallback)));
queueJob (*result.first, lock);
}
+ return true;
}
int
diff --git a/src/ripple/core/impl/SociDB.cpp b/src/ripple/core/impl/SociDB.cpp
index aeae7b872..c43daa05b 100644
--- a/src/ripple/core/impl/SociDB.cpp
+++ b/src/ripple/core/impl/SociDB.cpp
@@ -226,7 +226,13 @@ private:
running_ = true;
}
- jobQueue_.addJob (jtWAL, "WAL", [this] (Job&) { checkpoint(); });
+ // If the Job is not added to the JobQueue then we're not running_.
+ if (! jobQueue_.addJob (
+ jtWAL, "WAL", [this] (Job&) { checkpoint(); }))
+ {
+ std::lock_guard lock (mutex_);
+ running_ = false;
+ }
}
void checkpoint ()
diff --git a/src/ripple/core/impl/Stoppable.cpp b/src/ripple/core/impl/Stoppable.cpp
index 862e592a0..ecd55475e 100644
--- a/src/ripple/core/impl/Stoppable.cpp
+++ b/src/ripple/core/impl/Stoppable.cpp
@@ -26,9 +26,6 @@ Stoppable::Stoppable (std::string name, RootStoppable& root)
: m_name (std::move (name))
, m_root (root)
, m_child (this)
- , m_started (false)
- , m_stopped (false)
- , m_childrenStopped (false)
{
}
@@ -36,9 +33,6 @@ Stoppable::Stoppable (std::string name, Stoppable& parent)
: m_name (std::move (name))
, m_root (parent.m_root)
, m_child (this)
- , m_started (false)
- , m_stopped (false)
- , m_childrenStopped (false)
{
// Must not have stopping parent.
assert (! parent.isStopping());
@@ -48,8 +42,8 @@ Stoppable::Stoppable (std::string name, Stoppable& parent)
Stoppable::~Stoppable ()
{
- // Children must be stopped.
- assert (!m_started || m_childrenStopped);
+ // Either we must not have started, or Children must be stopped.
+ assert (!m_root.started() || m_childrenStopped);
}
bool Stoppable::isStopping() const
@@ -113,12 +107,13 @@ void Stoppable::stopAsyncRecursive (beast::Journal j)
auto const start = high_resolution_clock::now();
onStop ();
auto const ms = duration_cast(
- high_resolution_clock::now() - start).count();
+ high_resolution_clock::now() - start);
#ifdef NDEBUG
- if (ms >= 10)
+ using namespace std::chrono_literals;
+ if (ms >= 10ms)
if (auto stream = j.fatal())
- stream << m_name << "::onStop took " << ms << "ms";
+ stream << m_name << "::onStop took " << ms.count() << "ms";
#else
(void)ms;
#endif
@@ -159,11 +154,15 @@ void Stoppable::stopRecursive (beast::Journal j)
RootStoppable::RootStoppable (std::string name)
: Stoppable (std::move (name), *this)
- , m_prepared (false)
- , m_calledStop (false)
{
}
+RootStoppable::~RootStoppable ()
+{
+ using namespace std::chrono_literals;
+ jobCounter_.join(m_name.c_str(), 1s, debugLog());
+}
+
bool RootStoppable::isStopping() const
{
return m_calledStop;
@@ -194,7 +193,7 @@ void RootStoppable::stop (beast::Journal j)
stopRecursive (j);
}
-bool RootStoppable::stopAsync(beast::Journal j)
+bool RootStoppable::stopAsync (beast::Journal j)
{
bool alreadyCalled;
{
@@ -211,6 +210,11 @@ bool RootStoppable::stopAsync(beast::Journal j)
stream << "Stoppable::stop called again";
return false;
}
+
+ // Wait until all in-flight JobQueue Jobs are completed.
+ using namespace std::chrono_literals;
+ jobCounter_.join (m_name.c_str(), 1s, j);
+
c_.notify_all();
stopAsyncRecursive(j);
return true;
diff --git a/src/ripple/net/impl/RPCSub.cpp b/src/ripple/net/impl/RPCSub.cpp
index ac7b74501..a08d38294 100644
--- a/src/ripple/net/impl/RPCSub.cpp
+++ b/src/ripple/net/impl/RPCSub.cpp
@@ -93,11 +93,9 @@ public:
if (!mSending)
{
// Start a sending thread.
- mSending = true;
-
JLOG (j_.info()) << "RPCCall::fromNetwork start";
- m_jobQueue.addJob (
+ mSending = m_jobQueue.addJob (
jtCLIENT, "RPCSub::sendThread", [this] (Job&) {
sendThread();
});
diff --git a/src/ripple/rpc/handlers/RipplePathFind.cpp b/src/ripple/rpc/handlers/RipplePathFind.cpp
index 0e59886fe..48fab268f 100644
--- a/src/ripple/rpc/handlers/RipplePathFind.cpp
+++ b/src/ripple/rpc/handlers/RipplePathFind.cpp
@@ -55,9 +55,93 @@ Json::Value doRipplePathFind (RPC::Context& context)
PathRequest::pointer request;
lpLedger = context.ledgerMaster.getClosedLedger();
+ // It doesn't look like there's much odd happening here, but you should
+ // be aware this code runs in a JobQueue::Coro, which is a coroutine.
+ // And we may be flipping around between threads. Here's an overview:
+ //
+ // 1. We're running doRipplePathFind() due to a call to
+ // ripple_path_find. doRipplePathFind() is currently running
+ // inside of a JobQueue::Coro using a JobQueue thread.
+ //
+ // 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the
+ // path-finding request. That request will (probably) run at some
+ // indeterminate future time on a (probably different) JobQueue
+ // thread.
+ //
+ // 3. As a continuation from that path-finding JobQueue thread, the
+ // coroutine we're currently running in (!) is posted to the
+ // JobQueue. Because it is a continuation, that post won't
+ // happen until the path-finding request completes.
+ //
+ // 4. Once the continuation is enqueued, and we have reason to think
+ // the path-finding job is likely to run, then the coroutine we're
+ // running in yield()s. That means it surrenders its thread in
+ // the JobQueue. The coroutine is suspended, but ready to run,
+ // because it is kept resident by a shared_ptr in the
+ // path-finding continuation.
+ //
+ // 5. If all goes well then path-finding runs on a JobQueue thread
+ // and executes its continuation. The continuation posts this
+ // same coroutine (!) to the JobQueue.
+ //
+ // 6. When the JobQueue calls this coroutine, this coroutine resumes
+ // from the line below the coro->yield() and returns the
+ // path-finding result.
+ //
+ // With so many moving parts, what could go wrong?
+ //
+ // Just in terms of the JobQueue refusing to add jobs at shutdown
+ // there are two specific things that can go wrong.
+ //
+ // 1. The path-finding Job queued by makeLegacyPathRequest() might be
+ // rejected (because we're shutting down).
+ //
+ // Fortunately this problem can be addressed by looking at the
+ // return value of makeLegacyPathRequest(). If
+ // makeLegacyPathRequest() cannot get a thread to run the path-find
+ // on, then it returns an empty request.
+ //
+ // 2. The path-finding job might run, but the Coro::post() might be
+ // rejected by the JobQueue (because we're shutting down).
+ //
+ // We handle this case by resuming (not posting) the Coro.
+ // By resuming the Coro, we allow the Coro to run to completion
+ // on the current thread instead of requiring that it run on a
+ // new thread from the JobQueue.
+ //
+ // Both of these failure modes are hard to recreate in a unit test
+ // because they are so dependent on inter-thread timing. However
+ // the failure modes can be observed by synchronously (inside the
+ // rippled source code) shutting down the application. The code to
+ // do so looks like this:
+ //
+ // context.app.signalStop();
+ // while (! context.app.getJobQueue().jobCounter().joined()) { }
+ //
+ // The first line starts the process of shutting down the app.
+ // The second line waits until no more jobs can be added to the
+ // JobQueue before letting the thread continue.
+ //
+ // May 2017
jvResult = context.app.getPathRequests().makeLegacyPathRequest (
- request, std::bind(&JobQueue::Coro::post, context.coro),
- context.consumer, lpLedger, context.params);
+ request,
+ [&context] () {
+ // Copying the shared_ptr keeps the coroutine alive up
+ // through the return. Otherwise the storage under the
+ // captured reference could evaporate when we return from
+ // coroCopy->resume(). This is not strictly necessary, but
+ // will make maintenance easier.
+ std::shared_ptr coroCopy {context.coro};
+ if (!coroCopy->post())
+ {
+ // The post() failed, so we won't get a thread to let
+ // the Coro finish. We'll call Coro::resume() so the
+ // Coro can finish on our thread. Otherwise the
+ // application will hang on shutdown.
+ coroCopy->resume();
+ }
+ },
+ context.consumer, lpLedger, context.params);
if (request)
{
context.coro->yield();
diff --git a/src/ripple/rpc/impl/ServerHandlerImp.cpp b/src/ripple/rpc/impl/ServerHandlerImp.cpp
index b94801740..0e806c134 100644
--- a/src/ripple/rpc/impl/ServerHandlerImp.cpp
+++ b/src/ripple/rpc/impl/ServerHandlerImp.cpp
@@ -309,11 +309,20 @@ ServerHandlerImp::onRequest (Session& session)
return;
}
- m_jobQueue.postCoro(jtCLIENT, "RPC-Client",
- [this, detach = session.detach()](std::shared_ptr c)
+ std::shared_ptr detachedSession = session.detach();
+ auto const postResult = m_jobQueue.postCoro(jtCLIENT, "RPC-Client",
+ [this, detachedSession](std::shared_ptr coro)
{
- processSession(detach, c);
+ processSession(detachedSession, coro);
});
+ if (postResult == nullptr)
+ {
+ // The coroutine was rejected, probably because we're shutting down.
+ HTTPReply(503, "Service Unavailable",
+ makeOutput(*detachedSession), app_.journal("RPC"));
+ detachedSession->close(true);
+ return;
+ }
}
void
@@ -350,12 +359,12 @@ ServerHandlerImp::onWSMessage(
JLOG(m_journal.trace())
<< "Websocket received '" << jv << "'";
- m_jobQueue.postCoro(jtCLIENT, "WS-Client",
- [this, session = std::move(session),
- jv = std::move(jv)](auto const& c)
+ auto const postResult = m_jobQueue.postCoro(jtCLIENT, "WS-Client",
+ [this, session, jv = std::move(jv)]
+ (std::shared_ptr const& coro)
{
auto const jr =
- this->processSession(session, c, jv);
+ this->processSession(session, coro, jv);
auto const s = to_string(jr);
auto const n = s.length();
beast::multi_buffer sb(n);
@@ -365,6 +374,11 @@ ServerHandlerImp::onWSMessage(
StreambufWSMsg>(std::move(sb)));
session->complete();
});
+ if (postResult == nullptr)
+ {
+ // The coroutine was rejected, probably because we're shutting down.
+ session->close();
+ }
}
void
diff --git a/src/test/core/JobCounter_test.cpp b/src/test/core/JobCounter_test.cpp
index 435fcceca..4cec0249a 100644
--- a/src/test/core/JobCounter_test.cpp
+++ b/src/test/core/JobCounter_test.cpp
@@ -20,16 +20,22 @@
#include
#include
#include
+#include
#include
#include
#include
namespace ripple {
+namespace test {
//------------------------------------------------------------------------------
class JobCounter_test : public beast::unit_test::suite
{
+ // We're only using Env for its Journal.
+ jtx::Env env {*this};
+ beast::Journal j {env.app().journal ("JobCounter_test")};
+
void testWrap()
{
// Verify reference counting.
@@ -41,8 +47,8 @@ class JobCounter_test : public beast::unit_test::suite
// wrapped1 should be callable with a Job.
{
- Job j;
- (*wrapped1)(j);
+ Job job;
+ (*wrapped1)(job);
}
{
// Copy should increase reference count.
@@ -66,7 +72,8 @@ class JobCounter_test : public beast::unit_test::suite
BEAST_EXPECT (jobCounter.count() == 0);
// Join with 0 count should not stall.
- jobCounter.join();
+ using namespace std::chrono_literals;
+ jobCounter.join("testWrap", 1ms, j);
// Wrapping a Job after join() should return boost::none.
BEAST_EXPECT (jobCounter.wrap ([] (Job&) {}) == boost::none);
@@ -83,21 +90,22 @@ class JobCounter_test : public beast::unit_test::suite
// Calling join() now should stall, so do it on a different thread.
std::atomic threadExited {false};
- std::thread localThread ([&jobCounter, &threadExited] ()
+ std::thread localThread ([&jobCounter, &threadExited, this] ()
{
// Should stall after calling join.
- jobCounter.join();
+ using namespace std::chrono_literals;
+ jobCounter.join("testWaitOnJoin", 1ms, j);
threadExited.store (true);
});
// Wait for the thread to call jobCounter.join().
while (! jobCounter.joined());
- // The thread should still be active after waiting a millisecond.
+ // The thread should still be active after waiting 5 milliseconds.
// This is not a guarantee that join() stalled the thread, but it
// improves confidence.
using namespace std::chrono_literals;
- std::this_thread::sleep_for (1ms);
+ std::this_thread::sleep_for (5ms);
BEAST_EXPECT (threadExited == false);
// Destroy the Job and expect the thread to exit (asynchronously).
@@ -119,4 +127,5 @@ public:
BEAST_DEFINE_TESTSUITE(JobCounter, core, ripple);
-}
+} // test
+} // ripple
diff --git a/src/test/core/JobQueue_test.cpp b/src/test/core/JobQueue_test.cpp
new file mode 100644
index 000000000..11f355dd6
--- /dev/null
+++ b/src/test/core/JobQueue_test.cpp
@@ -0,0 +1,154 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2017 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#include
+#include
+#include
+#include
+
+namespace ripple {
+namespace test {
+
+//------------------------------------------------------------------------------
+
+class JobQueue_test : public beast::unit_test::suite
+{
+ void testAddJob()
+ {
+ jtx::Env env {*this};
+
+ JobQueue& jQueue = env.app().getJobQueue();
+ {
+ // addJob() should run the Job (and return true).
+ std::atomic jobRan {false};
+ BEAST_EXPECT (jQueue.addJob (jtCLIENT, "JobAddTest1",
+ [&jobRan] (Job&) { jobRan = true; }) == true);
+
+ // Wait for the Job to run.
+ while (jobRan == false);
+ }
+ {
+ // If the JobQueue's JobCounter is join()ed we should no
+ // longer be able to add Jobs (and calling addJob() should
+ // return false).
+ using namespace std::chrono_literals;
+ beast::Journal j {env.app().journal ("JobQueue_test")};
+ JobCounter& jCounter = jQueue.jobCounter();
+ jCounter.join("JobQueue_test", 1s, j);
+
+ // The Job should never run, so having the Job access this
+ // unprotected variable on the stack should be completely safe.
+ // Not recommended for the faint of heart...
+ bool unprotected;
+ BEAST_EXPECT (jQueue.addJob (jtCLIENT, "JobAddTest2",
+ [&unprotected] (Job&) { unprotected = false; }) == false);
+ }
+ }
+
+ void testPostCoro()
+ {
+ jtx::Env env {*this};
+
+ JobQueue& jQueue = env.app().getJobQueue();
+ {
+ // Test repeated post()s until the Coro completes.
+ std::atomic yieldCount {0};
+ auto const coro = jQueue.postCoro (jtCLIENT, "PostCoroTest1",
+ [&yieldCount] (std::shared_ptr const& coroCopy)
+ {
+ while (++yieldCount < 4)
+ coroCopy->yield();
+ });
+ BEAST_EXPECT (coro != nullptr);
+
+ // Wait for the Job to run and yield.
+ while (yieldCount == 0);
+
+ // Now re-post until the Coro says it is done.
+ int old = yieldCount;
+ while (coro->runnable())
+ {
+ BEAST_EXPECT (coro->post());
+ while (old == yieldCount) { }
+ coro->join();
+ BEAST_EXPECT (++old == yieldCount);
+ }
+ BEAST_EXPECT (yieldCount == 4);
+ }
+ {
+ // Test repeated resume()s until the Coro completes.
+ int yieldCount {0};
+ auto const coro = jQueue.postCoro (jtCLIENT, "PostCoroTest2",
+ [&yieldCount] (std::shared_ptr const& coroCopy)
+ {
+ while (++yieldCount < 4)
+ coroCopy->yield();
+ });
+ if (! coro)
+ {
+ // There's no good reason we should not get a Coro, but we
+ // can't continue without one.
+ BEAST_EXPECT (false);
+ return;
+ }
+
+ // Wait for the Job to run and yield.
+ coro->join();
+
+ // Now resume until the Coro says it is done.
+ int old = yieldCount;
+ while (coro->runnable())
+ {
+ coro->resume(); // Resume runs synchronously on this thread.
+ BEAST_EXPECT (++old == yieldCount);
+ }
+ BEAST_EXPECT (yieldCount == 4);
+ }
+ {
+ // If the JobQueue's JobCounter is join()ed we should no
+ // longer be able to add a Coro (and calling postCoro() should
+ // return false).
+ using namespace std::chrono_literals;
+ beast::Journal j {env.app().journal ("JobQueue_test")};
+ JobCounter& jCounter = jQueue.jobCounter();
+ jCounter.join("JobQueue_test", 1s, j);
+
+ // The Coro should never run, so having the Coro access this
+ // unprotected variable on the stack should be completely safe.
+ // Not recommended for the faint of heart...
+ bool unprotected;
+ auto const coro = jQueue.postCoro (jtCLIENT, "PostCoroTest3",
+ [&unprotected] (std::shared_ptr const&)
+ { unprotected = false; });
+ BEAST_EXPECT (coro == nullptr);
+ }
+ }
+
+public:
+ void run()
+ {
+ testAddJob();
+ testPostCoro();
+ }
+};
+
+BEAST_DEFINE_TESTSUITE(JobQueue, core, ripple);
+
+} // test
+} // ripple
diff --git a/src/test/unity/core_test_unity.cpp b/src/test/unity/core_test_unity.cpp
index c6fea7810..c9803768e 100644
--- a/src/test/unity/core_test_unity.cpp
+++ b/src/test/unity/core_test_unity.cpp
@@ -23,6 +23,7 @@
#include
#include
#include
+#include
#include
#include
#include