From efe3700f70ddc68d768246b43b743f5993fca85f Mon Sep 17 00:00:00 2001 From: Scott Schurr Date: Thu, 6 Apr 2017 17:39:42 -0700 Subject: [PATCH] Don't use JobQueue during shutdown (RIPD-1356): If the JobQueue is used during shutdown then those Jobs may access Stoppables after they have already stopped. This violates the preconditions of Stoppables and may lead to undefined behavior. The solution taken here is to reference count all Jobs in the JobQueue. At stop time all Jobs already in the JobQueue are allowed to run to completion, but no further Jobs are allowed into the JobQueue. If a Job is rejected from the JobQueue (because we are stopping), then JobQueue::addJob() returns false, so the caller can make any necessary adjustments. --- Builds/VisualStudio2015/RippleD.vcxproj | 4 + .../VisualStudio2015/RippleD.vcxproj.filters | 3 + src/ripple/app/consensus/RCLValidations.cpp | 4 +- src/ripple/app/ledger/Ledger.cpp | 27 ++- src/ripple/app/ledger/LedgerMaster.h | 20 ++- src/ripple/app/ledger/OrderBookDB.cpp | 9 + src/ripple/app/ledger/impl/LedgerMaster.cpp | 39 +++-- .../app/ledger/impl/TransactionAcquire.cpp | 5 + src/ripple/app/main/Application.cpp | 36 ++-- src/ripple/app/main/NodeStoreScheduler.cpp | 21 ++- src/ripple/app/main/NodeStoreScheduler.h | 4 +- src/ripple/app/misc/NetworkOPs.cpp | 36 ++-- src/ripple/app/misc/NetworkOPs.h | 8 +- src/ripple/app/paths/PathRequest.h | 4 +- src/ripple/app/paths/PathRequests.cpp | 9 +- src/ripple/app/paths/PathRequests.h | 6 + src/ripple/core/Coro.ipp | 87 ++++++++-- src/ripple/core/JobCounter.h | 24 ++- src/ripple/core/JobQueue.h | 88 +++++++--- src/ripple/core/Stoppable.h | 60 ++++++- src/ripple/core/impl/JobQueue.cpp | 7 +- src/ripple/core/impl/SociDB.cpp | 8 +- src/ripple/core/impl/Stoppable.cpp | 32 ++-- src/ripple/net/impl/RPCSub.cpp | 4 +- src/ripple/rpc/handlers/RipplePathFind.cpp | 88 +++++++++- src/ripple/rpc/impl/ServerHandlerImp.cpp | 28 +++- src/test/core/JobCounter_test.cpp | 25 ++- src/test/core/JobQueue_test.cpp | 154 ++++++++++++++++++ src/test/unity/core_test_unity.cpp | 1 + 29 files changed, 652 insertions(+), 189 deletions(-) create mode 100644 src/test/core/JobQueue_test.cpp diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj index 76041f25a4..629ce32ba2 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj +++ b/Builds/VisualStudio2015/RippleD.vcxproj @@ -4511,6 +4511,10 @@ True True + + True + True + True True diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters index c75f010366..244c56165d 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters @@ -5265,6 +5265,9 @@ test\core + + test\core + test\core diff --git a/src/ripple/app/consensus/RCLValidations.cpp b/src/ripple/app/consensus/RCLValidations.cpp index f1de8f55af..14c352fc2f 100644 --- a/src/ripple/app/consensus/RCLValidations.cpp +++ b/src/ripple/app/consensus/RCLValidations.cpp @@ -59,8 +59,8 @@ RCLValidationsPolicy::onStale(RCLValidation&& v) if (staleWriting_) return; - staleWriting_ = true; - app_.getJobQueue().addJob( + // addJob() may return false (Job not added) at shutdown. + staleWriting_ = app_.getJobQueue().addJob( jtWRITE, "Validations::doStaleWrite", [this](Job&) { auto event = app_.getJobQueue().makeLoadEvent(jtDISK, "ValidationWrite"); diff --git a/src/ripple/app/ledger/Ledger.cpp b/src/ripple/app/ledger/Ledger.cpp index 2881a67fe6..61543b9b9e 100644 --- a/src/ripple/app/ledger/Ledger.cpp +++ b/src/ripple/app/ledger/Ledger.cpp @@ -1048,25 +1048,22 @@ bool pendSaveValidated ( return true; } - if (isSynchronous) - return saveValidatedLedger(app, ledger, isCurrent); + JobType const jobType {isCurrent ? jtPUBLEDGER : jtPUBOLDLEDGER}; + char const* const jobName { + isCurrent ? "Ledger::pendSave" : "Ledger::pendOldSave"}; - auto job = [ledger, &app, isCurrent] (Job&) { - saveValidatedLedger(app, ledger, isCurrent); - }; - - if (isCurrent) + // See if we can use the JobQueue. + if (!isSynchronous && + app.getJobQueue().addJob (jobType, jobName, + [&app, ledger, isCurrent] (Job&) { + saveValidatedLedger(app, ledger, isCurrent); + })) { - app.getJobQueue().addJob( - jtPUBLEDGER, "Ledger::pendSave", job); - } - else - { - app.getJobQueue ().addJob( - jtPUBOLDLEDGER, "Ledger::pendOldSave", job); + return true; } - return true; + // The JobQueue won't do the Job. Do the save synchronously. + return saveValidatedLedger(app, ledger, isCurrent); } void diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h index 21006d361e..9f522e1b4b 100644 --- a/src/ripple/app/ledger/LedgerMaster.h +++ b/src/ripple/app/ledger/LedgerMaster.h @@ -203,9 +203,9 @@ public: void setBuildingLedger (LedgerIndex index); void tryAdvance (); - void newPathRequest (); + bool newPathRequest (); // Returns true if path request successfully placed. bool isNewPathRequest (); - void newOrderBookDB (); + bool newOrderBookDB (); // Returns true if able to fulfill request. bool fixIndex ( LedgerIndex ledgerIndex, LedgerHash const& ledgerHash); @@ -242,6 +242,9 @@ public: std::size_t getFetchPackCacheSize () const; private: + using ScopedLockType = std::lock_guard ; + using ScopedUnlockType = GenericScopedUnlock ; + void setValidLedger( std::shared_ptr const& l); void setPubLedger( @@ -255,8 +258,9 @@ private: boost::optional getLedgerHashForHistory(LedgerIndex index); std::size_t getNeededValidations(); void advanceThread(); - // Try to publish ledgers, acquire missing ledgers - void doAdvance(); + // Try to publish ledgers, acquire missing ledgers. Always called with + // m_mutex locked. The passed ScopedLockType is a reminder to callers. + void doAdvance(ScopedLockType&); bool shouldFetchPack(std::uint32_t seq) const; bool shouldAcquire( std::uint32_t const currentLedger, @@ -268,12 +272,12 @@ private: findNewLedgersToPublish(); void updatePaths(Job& job); - void newPFWork(const char *name); + + // Returns true if work started. Always called with m_mutex locked. + // The passed ScopedLockType is a reminder to callers. + bool newPFWork(const char *name, ScopedLockType&); private: - using ScopedLockType = std::lock_guard ; - using ScopedUnlockType = GenericScopedUnlock ; - Application& app_; beast::Journal m_journal; diff --git a/src/ripple/app/ledger/OrderBookDB.cpp b/src/ripple/app/ledger/OrderBookDB.cpp index f28c0afb0b..183007d9f6 100644 --- a/src/ripple/app/ledger/OrderBookDB.cpp +++ b/src/ripple/app/ledger/OrderBookDB.cpp @@ -101,6 +101,15 @@ void OrderBookDB::update( { for(auto& sle : ledger->sles) { + if (isStopping()) + { + JLOG (j_.info()) + << "OrderBookDB::update exiting due to isStopping"; + std::lock_guard sl (mLock); + mSeq = 0; + return; + } + if (sle->getType () == ltDIR_NODE && sle->isFieldPresent (sfExchangeRate) && sle->getFieldH256 (sfRootIndex) == sle->key()) diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 5deed8de77..959ad2b6bd 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -919,7 +919,7 @@ LedgerMaster::advanceThread() try { - doAdvance(); + doAdvance(sl); } catch (std::exception const&) { @@ -1159,6 +1159,7 @@ LedgerMaster::updatePaths (Job& job) { JLOG (m_journal.debug()) << "Published ledger too old for updating paths"; + ScopedLockType ml (m_mutex); --mPathFindThread; return; } @@ -1193,48 +1194,51 @@ LedgerMaster::updatePaths (Job& job) } } -void +bool LedgerMaster::newPathRequest () { ScopedLockType ml (m_mutex); - mPathFindNewRequest = true; - - newPFWork("pf:newRequest"); + mPathFindNewRequest = newPFWork("pf:newRequest", ml); + return mPathFindNewRequest; } bool LedgerMaster::isNewPathRequest () { ScopedLockType ml (m_mutex); - if (!mPathFindNewRequest) - return false; + bool const ret = mPathFindNewRequest; mPathFindNewRequest = false; - return true; + return ret; } // If the order book is radically updated, we need to reprocess all // pathfinding requests. -void +bool LedgerMaster::newOrderBookDB () { ScopedLockType ml (m_mutex); mPathLedger.reset(); - newPFWork("pf:newOBDB"); + return newPFWork("pf:newOBDB", ml); } /** A thread needs to be dispatched to handle pathfinding work of some kind. */ -void -LedgerMaster::newPFWork (const char *name) +bool +LedgerMaster::newPFWork (const char *name, ScopedLockType&) { if (mPathFindThread < 2) { - ++mPathFindThread; - app_.getJobQueue().addJob ( + if (app_.getJobQueue().addJob ( jtUPDATE_PF, name, - [this] (Job& j) { updatePaths(j); }); + [this] (Job& j) { updatePaths(j); })) + { + ++mPathFindThread; + } } + // If we're stopping don't give callers the expectation that their + // request will be fulfilled, even if it may be serviced. + return mPathFindThread > 0 && !isStopping(); } std::recursive_mutex& @@ -1520,7 +1524,7 @@ LedgerMaster::shouldAcquire ( } // Try to publish ledgers, acquire missing ledgers -void LedgerMaster::doAdvance () +void LedgerMaster::doAdvance (ScopedLockType& sl) { // TODO NIKB: simplify and unindent this a bit! @@ -1707,9 +1711,8 @@ void LedgerMaster::doAdvance () } } - progress = true; app_.getOPs().clearNeedNetworkLedger(); - newPFWork ("pf:newLedger"); + progress = newPFWork ("pf:newLedger", sl); } if (progress) mAdvanceWork = true; diff --git a/src/ripple/app/ledger/impl/TransactionAcquire.cpp b/src/ripple/app/ledger/impl/TransactionAcquire.cpp index 23da237b59..f846a92a2b 100644 --- a/src/ripple/app/ledger/impl/TransactionAcquire.cpp +++ b/src/ripple/app/ledger/impl/TransactionAcquire.cpp @@ -82,6 +82,11 @@ void TransactionAcquire::done () uint256 const& hash (mHash); std::shared_ptr const& map (mMap); auto const pap = &app_; + // Note that, when we're in the process of shutting down, addJob() + // may reject the request. If that happens then giveSet() will + // not be called. That's fine. According to David the giveSet() call + // just updates the consensus and related structures when we acquire + // a transaction set. No need to update them if we're shutting down. app_.getJobQueue().addJob (jtTXN_DATA, "completeAcquire", [pap, hash, map](Job&) { diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index a19ea27dbb..d952b1736d 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -905,29 +905,27 @@ public: if (timer == m_sweepTimer) { - // VFALCO TODO Move all this into doSweep - - if (! config_->standalone()) - { - boost::filesystem::space_info space = - boost::filesystem::space (config_->legacy ("database_path")); - - // VFALCO TODO Give this magic constant a name and move it into a well documented header - // - if (space.available < (512 * 1024 * 1024)) - { - JLOG(m_journal.fatal()) - << "Remaining free disk space is less than 512MB"; - signalStop (); - } - } - - m_jobQueue->addJob(jtSWEEP, "sweep", [this] (Job&) { doSweep(); }); + m_jobQueue->addJob( + jtSWEEP, "sweep", [this] (Job&) { doSweep(); }); } } void doSweep () { + if (! config_->standalone()) + { + boost::filesystem::space_info space = + boost::filesystem::space (config_->legacy ("database_path")); + + constexpr std::uintmax_t bytes512M = 512 * 1024 * 1024; + if (space.available < (bytes512M)) + { + JLOG(m_journal.fatal()) + << "Remaining free disk space is less than 512MB"; + signalStop (); + } + } + // VFALCO NOTE Does the order of calls matter? // VFALCO TODO fix the dependency inversion using an observer, // have listeners register for "onSweep ()" notification. @@ -943,7 +941,7 @@ public: family().treecache().sweep(); cachedSLEs_.expire(); - // VFALCO NOTE does the call to sweep() happen on another thread? + // Set timer to do another sweep later. m_sweepTimer.setExpiration ( std::chrono::seconds {config_->getSize (siSweepInterval)}); } diff --git a/src/ripple/app/main/NodeStoreScheduler.cpp b/src/ripple/app/main/NodeStoreScheduler.cpp index 54351c4712..dfe0121dfd 100644 --- a/src/ripple/app/main/NodeStoreScheduler.cpp +++ b/src/ripple/app/main/NodeStoreScheduler.cpp @@ -25,8 +25,6 @@ namespace ripple { NodeStoreScheduler::NodeStoreScheduler (Stoppable& parent) : Stoppable ("NodeStoreScheduler", parent) - , m_jobQueue (nullptr) - , m_taskCount (0) { } @@ -48,16 +46,29 @@ void NodeStoreScheduler::onChildrenStopped () void NodeStoreScheduler::scheduleTask (NodeStore::Task& task) { ++m_taskCount; - m_jobQueue->addJob ( + if (!m_jobQueue->addJob ( jtWRITE, "NodeObject::store", - [this, &task] (Job&) { doTask(task); }); + [this, &task] (Job&) { doTask(task); })) + { + // Job not added, presumably because we're shutting down. + // Recover by executing the task synchronously. + doTask (task); + } } void NodeStoreScheduler::doTask (NodeStore::Task& task) { task.performScheduledTask (); - if ((--m_taskCount == 0) && isStopping()) + + // NOTE: It feels a bit off that there are two different methods that + // call stopped(): onChildrenStopped() and doTask(). There's a + // suspicion that, as long as the Stoppable tree is configured + // correctly, this call to stopped() in doTask() can never occur. + // + // However, until we increase our confidence that the suspicion is + // correct, we will leave this code in place. + if ((--m_taskCount == 0) && isStopping() && areChildrenStopped()) stopped(); } diff --git a/src/ripple/app/main/NodeStoreScheduler.h b/src/ripple/app/main/NodeStoreScheduler.h index e6689011bf..657fe6499e 100644 --- a/src/ripple/app/main/NodeStoreScheduler.h +++ b/src/ripple/app/main/NodeStoreScheduler.h @@ -49,8 +49,8 @@ public: private: void doTask (NodeStore::Task& task); - JobQueue* m_jobQueue; - std::atomic m_taskCount; + JobQueue* m_jobQueue {nullptr}; + std::atomic m_taskCount {0}; }; } // ripple diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index c749b73306..e0e84c80b5 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -44,7 +44,6 @@ #include #include #include -#include #include #include #include @@ -218,8 +217,7 @@ public: ~NetworkOPsImp() override { - jobCounter_.join(); - // this clear() is necessary to ensure the shared_ptrs in this map get + // This clear() is necessary to ensure the shared_ptrs in this map get // destroyed NOW because the objects in this map invoke methods on this // class when they are destroyed mRpcSubMap.clear(); @@ -486,9 +484,6 @@ public: m_heartbeatTimer.cancel(); m_clusterTimer.cancel(); - // Wait until all our in-flight Jobs are completed. - jobCounter_.join(); - stopped (); } @@ -540,7 +535,6 @@ private: DeadlineTimer m_heartbeatTimer; DeadlineTimer m_clusterTimer; - JobCounter jobCounter_; RCLConsensus mConsensus; @@ -657,14 +651,14 @@ void NetworkOPsImp::onDeadlineTimer (DeadlineTimer& timer) { if (timer == m_heartbeatTimer) { - m_job_queue.addCountedJob ( - jtNETOP_TIMER, "NetOPs.heartbeat", jobCounter_, + m_job_queue.addJob ( + jtNETOP_TIMER, "NetOPs.heartbeat", [this] (Job&) { processHeartbeatTimer(); }); } else if (timer == m_clusterTimer) { - m_job_queue.addCountedJob ( - jtNETOP_CLUSTER, "NetOPs.cluster", jobCounter_, + m_job_queue.addJob ( + jtNETOP_CLUSTER, "NetOPs.cluster", [this] (Job&) { processClusterTimer(); }); } } @@ -837,8 +831,8 @@ void NetworkOPsImp::submitTransaction (std::shared_ptr const& iTrans auto tx = std::make_shared ( trans, reason, app_); - m_job_queue.addCountedJob ( - jtTRANSACTION, "submitTxn", jobCounter_, + m_job_queue.addJob ( + jtTRANSACTION, "submitTxn", [this, tx] (Job&) { auto t = tx; processTransaction(t, false, false, FailHard::no); @@ -904,8 +898,8 @@ void NetworkOPsImp::doTransactionAsync (std::shared_ptr transaction if (mDispatchState == DispatchState::none) { - if (m_job_queue.addCountedJob ( - jtBATCH, "transactionBatch", jobCounter_, + if (m_job_queue.addJob ( + jtBATCH, "transactionBatch", [this] (Job&) { transactionBatch(); })) { mDispatchState = DispatchState::scheduled; @@ -939,8 +933,8 @@ void NetworkOPsImp::doTransactionSync (std::shared_ptr transaction, if (mTransactions.size()) { // More transactions need to be applied, but by another job. - if (m_job_queue.addCountedJob ( - jtBATCH, "transactionBatch", jobCounter_, + if (m_job_queue.addJob ( + jtBATCH, "transactionBatch", [this] (Job&) { transactionBatch(); })) { mDispatchState = DispatchState::scheduled; @@ -2466,8 +2460,8 @@ void NetworkOPsImp::reportFeeChange () // only schedule the job if something has changed if (f != mLastFeeSummary) { - m_job_queue.addCountedJob ( - jtCLIENT, "reportFeeChange->pubServer", jobCounter_, + m_job_queue.addJob ( + jtCLIENT, "reportFeeChange->pubServer", [this] (Job&) { pubServer(); }); } } @@ -3307,10 +3301,6 @@ NetworkOPs::NetworkOPs (Stoppable& parent) { } -NetworkOPs::~NetworkOPs () -{ -} - //------------------------------------------------------------------------------ diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index c47e3ffb5f..555605a3c1 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -20,14 +20,14 @@ #ifndef RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED #define RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED -#include -#include #include #include +#include +#include #include #include +#include #include -#include #include #include @@ -96,7 +96,7 @@ public: } public: - virtual ~NetworkOPs () = 0; + ~NetworkOPs () override = default; //-------------------------------------------------------------------------- // diff --git a/src/ripple/app/paths/PathRequest.h b/src/ripple/app/paths/PathRequest.h index 4d2fa1ecf7..f5d2254db2 100644 --- a/src/ripple/app/paths/PathRequest.h +++ b/src/ripple/app/paths/PathRequest.h @@ -70,7 +70,7 @@ public: beast::Journal journal); // ripple_path_find semantics - // Completion function is called + // Completion function is called after path update is complete PathRequest ( Application& app, std::function const& completion, @@ -83,6 +83,8 @@ public: bool isNew (); bool needsUpdate (bool newOnly, LedgerIndex index); + + // Called when the PathRequest update is complete. void updateComplete (); std::pair doCreate ( diff --git a/src/ripple/app/paths/PathRequests.cpp b/src/ripple/app/paths/PathRequests.cpp index 69d76ceee9..0c71c48b46 100644 --- a/src/ripple/app/paths/PathRequests.cpp +++ b/src/ripple/app/paths/PathRequests.cpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include #include @@ -246,7 +248,12 @@ PathRequests::makeLegacyPathRequest( else { insertPathRequest (req); - app_.getLedgerMaster().newPathRequest(); + if (! app_.getLedgerMaster().newPathRequest()) + { + // The newPathRequest failed. Tell the caller. + result.second = rpcError (rpcTOO_BUSY); + req.reset(); + } } return std::move (result.second); diff --git a/src/ripple/app/paths/PathRequests.h b/src/ripple/app/paths/PathRequests.h index c8359566fb..fb5cc614e0 100644 --- a/src/ripple/app/paths/PathRequests.h +++ b/src/ripple/app/paths/PathRequests.h @@ -33,6 +33,7 @@ namespace ripple { class PathRequests { public: + /** A collection of all PathRequest instances. */ PathRequests (Application& app, beast::Journal journal, beast::insight::Collector::ptr const& collector) : app_ (app) @@ -43,6 +44,11 @@ public: mFull = collector->make_event ("pathfind_full"); } + /** Update all of the contained PathRequest instances. + + @param ledger Ledger we are pathfinding in. + @param shouldCancel Invocable that returns whether to cancel. + */ void updateAll (std::shared_ptr const& ledger, Job::CancelCallback shouldCancel); diff --git a/src/ripple/core/Coro.ipp b/src/ripple/core/Coro.ipp index 8eeadd32f3..73198e922b 100644 --- a/src/ripple/core/Coro.ipp +++ b/src/ripple/core/Coro.ipp @@ -48,7 +48,9 @@ inline JobQueue::Coro:: ~Coro() { +#ifndef NDEBUG assert(finished_); +#endif } inline @@ -64,7 +66,7 @@ yield() const } inline -void +bool JobQueue::Coro:: post() { @@ -74,23 +76,76 @@ post() } // sp keeps 'this' alive - jq_.addJob(type_, name_, + if (jq_.addJob(type_, name_, [this, sp = shared_from_this()](Job&) { - { - std::lock_guard lock(jq_.m_mutex); - --jq_.nSuspend_; - } - auto saved = detail::getLocalValues().release(); - detail::getLocalValues().reset(&lvs_); - std::lock_guard lock(mutex_); - coro_(); - detail::getLocalValues().release(); - detail::getLocalValues().reset(saved); - std::lock_guard lk(mutex_run_); - running_ = false; - cv_.notify_all(); - }); + resume(); + })) + { + return true; + } + + // The coroutine will not run. Clean up running_. + std::lock_guard lk(mutex_run_); + running_ = false; + cv_.notify_all(); + return false; +} + +inline +void +JobQueue::Coro:: +resume() +{ + { + std::lock_guard lk(mutex_run_); + running_ = true; + } + { + std::lock_guard lock(jq_.m_mutex); + --jq_.nSuspend_; + } + auto saved = detail::getLocalValues().release(); + detail::getLocalValues().reset(&lvs_); + std::lock_guard lock(mutex_); + assert (coro_); + coro_(); + detail::getLocalValues().release(); + detail::getLocalValues().reset(saved); + std::lock_guard lk(mutex_run_); + running_ = false; + cv_.notify_all(); +} + +inline +bool +JobQueue::Coro:: +runnable() const +{ + return static_cast(coro_); +} + +inline +void +JobQueue::Coro:: +expectEarlyExit() +{ +#ifndef NDEBUG + if (! finished_) +#endif + { + // expectEarlyExit() must only ever be called from outside the + // Coro's stack. It you're inside the stack you can simply return + // and be done. + // + // That said, since we're outside the Coro's stack, we need to + // decrement the nSuspend that the Coro's call to yield caused. + std::lock_guard lock(jq_.m_mutex); + --jq_.nSuspend_; +#ifndef NDEBUG + finished_ = true; +#endif + } } inline diff --git a/src/ripple/core/JobCounter.h b/src/ripple/core/JobCounter.h index 834f3bd677..a80e248242 100644 --- a/src/ripple/core/JobCounter.h +++ b/src/ripple/core/JobCounter.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_CORE_JOB_COUNTER_H_INCLUDED #define RIPPLE_CORE_JOB_COUNTER_H_INCLUDED +#include #include #include #include @@ -126,18 +127,31 @@ public: /** Destructor verifies all in-flight jobs are complete. */ ~JobCounter() { - join(); + using namespace std::chrono_literals; + join ("JobCounter", 1s, debugLog()); } - /** Returns once all counted in-flight Jobs are destroyed. */ - void join() + /** Returns once all counted in-flight Jobs are destroyed. + + @param name Name reported if join time exceeds wait. + @param wait If join() exceeds this duration report to Journal. + @param j Journal written to if wait is exceeded. + */ + void join (char const* name, + std::chrono::milliseconds wait, beast::Journal j) { std::unique_lock lock {mutex_}; waitForJobs_ = true; if (jobCount_ > 0) { - allJobsDoneCond_.wait ( - lock, [this] { return jobCount_ == 0; }); + if (! allJobsDoneCond_.wait_for ( + lock, wait, [this] { return jobCount_ == 0; })) + { + if (auto stream = j.error()) + stream << name << " waiting for JobCounter::join()."; + allJobsDoneCond_.wait (lock, [this] { return jobCount_ == 0; }); + } + } } diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h index cf2a873197..8c46c10d24 100644 --- a/src/ripple/core/JobQueue.h +++ b/src/ripple/core/JobQueue.h @@ -97,9 +97,31 @@ public: When the job runs, the coroutine's stack is restored and execution continues at the beginning of coroutine function or the statement after the previous call to yield. - Undefined behavior if called consecutively without a corresponding yield. + Undefined behavior if called after the coroutine has completed + with a return (as opposed to a yield()). + Undefined behavior if post() or resume() called consecutively + without a corresponding yield. + + @return true if the Coro's job is added to the JobQueue. */ - void post(); + bool post(); + + /** Resume coroutine execution. + Effects: + The coroutine continues execution from where it last left off + using this same thread. + Undefined behavior if called after the coroutine has completed + with a return (as opposed to a yield()). + Undefined behavior if resume() or post() called consecutively + without a corresponding yield. + */ + void resume(); + + /** Returns true if the Coro is still runnable (has not returned). */ + bool runnable() const; + + /** Once called, the Coro allows early exit without an assert. */ + void expectEarlyExit(); /** Waits until coroutine returns from the user function. */ void join(); @@ -113,29 +135,23 @@ public: /** Adds a job to the JobQueue. - @param t The type of job. + @param type The type of job. @param name Name of the job. - @param func std::function with signature void (Job&). Called when the job is executed. - */ - void addJob (JobType type, std::string const& name, JobFunction const& func); - - /** Adds a counted job to the JobQueue. - - @param t The type of job. - @param name Name of the job. - @param counter JobCounter for counting the Job. @param jobHandler Lambda with signature void (Job&). Called when the job is executed. - @return true if JobHandler added, false if JobCounter is already joined. + @return true if jobHandler added to queue. */ - template - bool addCountedJob (JobType type, - std::string const& name, JobCounter& counter, JobHandler&& jobHandler) + template ()(std::declval())), void>::value>> + bool addJob (JobType type, + std::string const& name, JobHandler&& jobHandler) { - if (auto optionalCountedJob = counter.wrap (std::move (jobHandler))) + if (auto optionalCountedJob = + Stoppable::jobCounter().wrap (std::forward(jobHandler))) { - addJob (type, name, std::move (*optionalCountedJob)); - return true; + return addRefCountedJob ( + type, name, std::move (*optionalCountedJob)); } return false; } @@ -145,9 +161,11 @@ public: @param t The type of job. @param name Name of the job. @param f Has a signature of void(std::shared_ptr). Called when the job executes. + + @return shared_ptr to posted Coro. nullptr if post was not successful. */ template - void postCoro (JobType t, std::string const& name, F&& f); + std::shared_ptr postCoro (JobType t, std::string const& name, F&& f); /** Jobs waiting at this priority. */ @@ -227,6 +245,16 @@ private: // Signals the service stopped if the stopped condition is met. void checkStopped (std::lock_guard const& lock); + // Adds a reference counted job to the JobQueue. + // + // param type The type of job. + // param name Name of the job. + // param func std::function with signature void (Job&). Called when the job is executed. + // + // return true if func added to queue. + bool addRefCountedJob ( + JobType type, std::string const& name, JobFunction const& func); + // Signals an added Job for processing. // // Pre-conditions: @@ -311,15 +339,15 @@ private: other requests while the RPC command completes its work asynchronously. postCoro() creates a Coro object. When the Coro ctor is called, and its - coro_ member is initialized(a boost::coroutines::pull_type), execution + coro_ member is initialized (a boost::coroutines::pull_type), execution automatically passes to the coroutine, which we don't want at this point, since we are still in the handler thread context. It's important to note here that construction of a boost pull_type automatically passes execution to the coroutine. A pull_type object automatically generates a push_type that is - used as the as a parameter(do_yield) in the signature of the function the + passed as a parameter (do_yield) in the signature of the function the pull_type was created with. This function is immediately called during coro_ construction and within it, Coro::yield_ is assigned the push_type - parameter(do_yield) address and called(yield()) so we can return execution + parameter (do_yield) address and called (yield()) so we can return execution back to the caller's stack. postCoro() then calls Coro::post(), which schedules a job on the job @@ -368,15 +396,23 @@ private: namespace ripple { template -void JobQueue::postCoro (JobType t, std::string const& name, F&& f) +std::shared_ptr +JobQueue::postCoro (JobType t, std::string const& name, F&& f) { /* First param is a detail type to make construction private. Last param is the function the coroutine runs. Signature of void(std::shared_ptr). */ - auto const coro = std::make_shared( + auto coro = std::make_shared( Coro_create_t{}, *this, t, name, std::forward(f)); - coro->post(); + if (! coro->post()) + { + // The Coro was not successfully posted. Disable it so it's destructor + // can run with no negative side effects. Then destroy it. + coro->expectEarlyExit(); + coro.reset(); + } + return coro; } } diff --git a/src/ripple/core/Stoppable.h b/src/ripple/core/Stoppable.h index fde3eb5912..aae8adef07 100644 --- a/src/ripple/core/Stoppable.h +++ b/src/ripple/core/Stoppable.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -167,6 +168,30 @@ class RootStoppable; when the last thread is about to exit it would call stopped(). @note A Stoppable may not be restarted. + + The form of the Stoppable tree in the rippled application evolves as + the source code changes and reacts to new demands. As of March in 2017 + the Stoppable tree had this form: + + @code + + Application + | + +--------------------+--------------------+ + | | | + LoadManager SHAMapStore NodeStoreScheduler + | + JobQueue + | + +-----------+-----------+-----------+-----------+----+--------+ + | | | | | | + | NetworkOPs | InboundLedgers | OrderbookDB + | | | + Overlay InboundTransactions LedgerMaster + | | + PeerFinder LedgerCleaner + + @endcode */ /** @{ */ class Stoppable @@ -190,6 +215,9 @@ public: /** Returns `true` if all children have stopped. */ bool areChildrenStopped () const; + /* JobQueue uses this method for Job counting. */ + inline JobCounter& jobCounter (); + /** Sleep or wake up on stop. @return `true` if we are stopping @@ -282,9 +310,8 @@ private: std::string m_name; RootStoppable& m_root; Child m_child; - std::atomic m_started; - std::atomic m_stopped; - std::atomic m_childrenStopped; + std::atomic m_stopped {false}; + std::atomic m_childrenStopped {false}; Children m_children; beast::WaitableEvent m_stoppedEvent; }; @@ -296,7 +323,7 @@ class RootStoppable : public Stoppable public: explicit RootStoppable (std::string name); - ~RootStoppable () = default; + ~RootStoppable (); bool isStopping() const; @@ -326,6 +353,18 @@ public: */ void stop (beast::Journal j); + /** Return true if start() was ever called. */ + bool started () const + { + return m_started; + } + + /* JobQueue uses this method for Job counting. */ + JobCounter& rootJobCounter () + { + return jobCounter_; + } + /** Sleep or wake up on stop. @return `true` if we are stopping @@ -346,15 +385,24 @@ private: */ bool stopAsync(beast::Journal j); - std::atomic m_prepared; - std::atomic m_calledStop; + std::atomic m_prepared {false}; + std::atomic m_started {false}; + std::atomic m_calledStop {false}; std::mutex m_; std::condition_variable c_; + JobCounter jobCounter_; }; /** @} */ //------------------------------------------------------------------------------ +JobCounter& Stoppable::jobCounter () +{ + return m_root.rootJobCounter(); +} + +//------------------------------------------------------------------------------ + template bool RootStoppable::alertable_sleep_for( diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index ffb919a437..f68897e802 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -67,8 +67,8 @@ JobQueue::collect () job_count = m_jobSet.size (); } -void -JobQueue::addJob (JobType type, std::string const& name, +bool +JobQueue::addRefCountedJob (JobType type, std::string const& name, JobFunction const& func) { assert (type != jtINVALID); @@ -76,7 +76,7 @@ JobQueue::addJob (JobType type, std::string const& name, auto iter (m_jobData.find (type)); assert (iter != m_jobData.end ()); if (iter == m_jobData.end ()) - return; + return false; JobTypeData& data (iter->second); @@ -108,6 +108,7 @@ JobQueue::addJob (JobType type, std::string const& name, data.load (), func, m_cancelCallback))); queueJob (*result.first, lock); } + return true; } int diff --git a/src/ripple/core/impl/SociDB.cpp b/src/ripple/core/impl/SociDB.cpp index aeae7b8729..c43daa05b6 100644 --- a/src/ripple/core/impl/SociDB.cpp +++ b/src/ripple/core/impl/SociDB.cpp @@ -226,7 +226,13 @@ private: running_ = true; } - jobQueue_.addJob (jtWAL, "WAL", [this] (Job&) { checkpoint(); }); + // If the Job is not added to the JobQueue then we're not running_. + if (! jobQueue_.addJob ( + jtWAL, "WAL", [this] (Job&) { checkpoint(); })) + { + std::lock_guard lock (mutex_); + running_ = false; + } } void checkpoint () diff --git a/src/ripple/core/impl/Stoppable.cpp b/src/ripple/core/impl/Stoppable.cpp index 862e592a01..ecd55475ef 100644 --- a/src/ripple/core/impl/Stoppable.cpp +++ b/src/ripple/core/impl/Stoppable.cpp @@ -26,9 +26,6 @@ Stoppable::Stoppable (std::string name, RootStoppable& root) : m_name (std::move (name)) , m_root (root) , m_child (this) - , m_started (false) - , m_stopped (false) - , m_childrenStopped (false) { } @@ -36,9 +33,6 @@ Stoppable::Stoppable (std::string name, Stoppable& parent) : m_name (std::move (name)) , m_root (parent.m_root) , m_child (this) - , m_started (false) - , m_stopped (false) - , m_childrenStopped (false) { // Must not have stopping parent. assert (! parent.isStopping()); @@ -48,8 +42,8 @@ Stoppable::Stoppable (std::string name, Stoppable& parent) Stoppable::~Stoppable () { - // Children must be stopped. - assert (!m_started || m_childrenStopped); + // Either we must not have started, or Children must be stopped. + assert (!m_root.started() || m_childrenStopped); } bool Stoppable::isStopping() const @@ -113,12 +107,13 @@ void Stoppable::stopAsyncRecursive (beast::Journal j) auto const start = high_resolution_clock::now(); onStop (); auto const ms = duration_cast( - high_resolution_clock::now() - start).count(); + high_resolution_clock::now() - start); #ifdef NDEBUG - if (ms >= 10) + using namespace std::chrono_literals; + if (ms >= 10ms) if (auto stream = j.fatal()) - stream << m_name << "::onStop took " << ms << "ms"; + stream << m_name << "::onStop took " << ms.count() << "ms"; #else (void)ms; #endif @@ -159,11 +154,15 @@ void Stoppable::stopRecursive (beast::Journal j) RootStoppable::RootStoppable (std::string name) : Stoppable (std::move (name), *this) - , m_prepared (false) - , m_calledStop (false) { } +RootStoppable::~RootStoppable () +{ + using namespace std::chrono_literals; + jobCounter_.join(m_name.c_str(), 1s, debugLog()); +} + bool RootStoppable::isStopping() const { return m_calledStop; @@ -194,7 +193,7 @@ void RootStoppable::stop (beast::Journal j) stopRecursive (j); } -bool RootStoppable::stopAsync(beast::Journal j) +bool RootStoppable::stopAsync (beast::Journal j) { bool alreadyCalled; { @@ -211,6 +210,11 @@ bool RootStoppable::stopAsync(beast::Journal j) stream << "Stoppable::stop called again"; return false; } + + // Wait until all in-flight JobQueue Jobs are completed. + using namespace std::chrono_literals; + jobCounter_.join (m_name.c_str(), 1s, j); + c_.notify_all(); stopAsyncRecursive(j); return true; diff --git a/src/ripple/net/impl/RPCSub.cpp b/src/ripple/net/impl/RPCSub.cpp index ac7b745015..a08d382941 100644 --- a/src/ripple/net/impl/RPCSub.cpp +++ b/src/ripple/net/impl/RPCSub.cpp @@ -93,11 +93,9 @@ public: if (!mSending) { // Start a sending thread. - mSending = true; - JLOG (j_.info()) << "RPCCall::fromNetwork start"; - m_jobQueue.addJob ( + mSending = m_jobQueue.addJob ( jtCLIENT, "RPCSub::sendThread", [this] (Job&) { sendThread(); }); diff --git a/src/ripple/rpc/handlers/RipplePathFind.cpp b/src/ripple/rpc/handlers/RipplePathFind.cpp index 0e59886fea..48fab268f6 100644 --- a/src/ripple/rpc/handlers/RipplePathFind.cpp +++ b/src/ripple/rpc/handlers/RipplePathFind.cpp @@ -55,9 +55,93 @@ Json::Value doRipplePathFind (RPC::Context& context) PathRequest::pointer request; lpLedger = context.ledgerMaster.getClosedLedger(); + // It doesn't look like there's much odd happening here, but you should + // be aware this code runs in a JobQueue::Coro, which is a coroutine. + // And we may be flipping around between threads. Here's an overview: + // + // 1. We're running doRipplePathFind() due to a call to + // ripple_path_find. doRipplePathFind() is currently running + // inside of a JobQueue::Coro using a JobQueue thread. + // + // 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the + // path-finding request. That request will (probably) run at some + // indeterminate future time on a (probably different) JobQueue + // thread. + // + // 3. As a continuation from that path-finding JobQueue thread, the + // coroutine we're currently running in (!) is posted to the + // JobQueue. Because it is a continuation, that post won't + // happen until the path-finding request completes. + // + // 4. Once the continuation is enqueued, and we have reason to think + // the path-finding job is likely to run, then the coroutine we're + // running in yield()s. That means it surrenders its thread in + // the JobQueue. The coroutine is suspended, but ready to run, + // because it is kept resident by a shared_ptr in the + // path-finding continuation. + // + // 5. If all goes well then path-finding runs on a JobQueue thread + // and executes its continuation. The continuation posts this + // same coroutine (!) to the JobQueue. + // + // 6. When the JobQueue calls this coroutine, this coroutine resumes + // from the line below the coro->yield() and returns the + // path-finding result. + // + // With so many moving parts, what could go wrong? + // + // Just in terms of the JobQueue refusing to add jobs at shutdown + // there are two specific things that can go wrong. + // + // 1. The path-finding Job queued by makeLegacyPathRequest() might be + // rejected (because we're shutting down). + // + // Fortunately this problem can be addressed by looking at the + // return value of makeLegacyPathRequest(). If + // makeLegacyPathRequest() cannot get a thread to run the path-find + // on, then it returns an empty request. + // + // 2. The path-finding job might run, but the Coro::post() might be + // rejected by the JobQueue (because we're shutting down). + // + // We handle this case by resuming (not posting) the Coro. + // By resuming the Coro, we allow the Coro to run to completion + // on the current thread instead of requiring that it run on a + // new thread from the JobQueue. + // + // Both of these failure modes are hard to recreate in a unit test + // because they are so dependent on inter-thread timing. However + // the failure modes can be observed by synchronously (inside the + // rippled source code) shutting down the application. The code to + // do so looks like this: + // + // context.app.signalStop(); + // while (! context.app.getJobQueue().jobCounter().joined()) { } + // + // The first line starts the process of shutting down the app. + // The second line waits until no more jobs can be added to the + // JobQueue before letting the thread continue. + // + // May 2017 jvResult = context.app.getPathRequests().makeLegacyPathRequest ( - request, std::bind(&JobQueue::Coro::post, context.coro), - context.consumer, lpLedger, context.params); + request, + [&context] () { + // Copying the shared_ptr keeps the coroutine alive up + // through the return. Otherwise the storage under the + // captured reference could evaporate when we return from + // coroCopy->resume(). This is not strictly necessary, but + // will make maintenance easier. + std::shared_ptr coroCopy {context.coro}; + if (!coroCopy->post()) + { + // The post() failed, so we won't get a thread to let + // the Coro finish. We'll call Coro::resume() so the + // Coro can finish on our thread. Otherwise the + // application will hang on shutdown. + coroCopy->resume(); + } + }, + context.consumer, lpLedger, context.params); if (request) { context.coro->yield(); diff --git a/src/ripple/rpc/impl/ServerHandlerImp.cpp b/src/ripple/rpc/impl/ServerHandlerImp.cpp index b94801740f..0e806c134b 100644 --- a/src/ripple/rpc/impl/ServerHandlerImp.cpp +++ b/src/ripple/rpc/impl/ServerHandlerImp.cpp @@ -309,11 +309,20 @@ ServerHandlerImp::onRequest (Session& session) return; } - m_jobQueue.postCoro(jtCLIENT, "RPC-Client", - [this, detach = session.detach()](std::shared_ptr c) + std::shared_ptr detachedSession = session.detach(); + auto const postResult = m_jobQueue.postCoro(jtCLIENT, "RPC-Client", + [this, detachedSession](std::shared_ptr coro) { - processSession(detach, c); + processSession(detachedSession, coro); }); + if (postResult == nullptr) + { + // The coroutine was rejected, probably because we're shutting down. + HTTPReply(503, "Service Unavailable", + makeOutput(*detachedSession), app_.journal("RPC")); + detachedSession->close(true); + return; + } } void @@ -350,12 +359,12 @@ ServerHandlerImp::onWSMessage( JLOG(m_journal.trace()) << "Websocket received '" << jv << "'"; - m_jobQueue.postCoro(jtCLIENT, "WS-Client", - [this, session = std::move(session), - jv = std::move(jv)](auto const& c) + auto const postResult = m_jobQueue.postCoro(jtCLIENT, "WS-Client", + [this, session, jv = std::move(jv)] + (std::shared_ptr const& coro) { auto const jr = - this->processSession(session, c, jv); + this->processSession(session, coro, jv); auto const s = to_string(jr); auto const n = s.length(); beast::multi_buffer sb(n); @@ -365,6 +374,11 @@ ServerHandlerImp::onWSMessage( StreambufWSMsg>(std::move(sb))); session->complete(); }); + if (postResult == nullptr) + { + // The coroutine was rejected, probably because we're shutting down. + session->close(); + } } void diff --git a/src/test/core/JobCounter_test.cpp b/src/test/core/JobCounter_test.cpp index 435fccecab..4cec0249ac 100644 --- a/src/test/core/JobCounter_test.cpp +++ b/src/test/core/JobCounter_test.cpp @@ -20,16 +20,22 @@ #include #include #include +#include #include #include #include namespace ripple { +namespace test { //------------------------------------------------------------------------------ class JobCounter_test : public beast::unit_test::suite { + // We're only using Env for its Journal. + jtx::Env env {*this}; + beast::Journal j {env.app().journal ("JobCounter_test")}; + void testWrap() { // Verify reference counting. @@ -41,8 +47,8 @@ class JobCounter_test : public beast::unit_test::suite // wrapped1 should be callable with a Job. { - Job j; - (*wrapped1)(j); + Job job; + (*wrapped1)(job); } { // Copy should increase reference count. @@ -66,7 +72,8 @@ class JobCounter_test : public beast::unit_test::suite BEAST_EXPECT (jobCounter.count() == 0); // Join with 0 count should not stall. - jobCounter.join(); + using namespace std::chrono_literals; + jobCounter.join("testWrap", 1ms, j); // Wrapping a Job after join() should return boost::none. BEAST_EXPECT (jobCounter.wrap ([] (Job&) {}) == boost::none); @@ -83,21 +90,22 @@ class JobCounter_test : public beast::unit_test::suite // Calling join() now should stall, so do it on a different thread. std::atomic threadExited {false}; - std::thread localThread ([&jobCounter, &threadExited] () + std::thread localThread ([&jobCounter, &threadExited, this] () { // Should stall after calling join. - jobCounter.join(); + using namespace std::chrono_literals; + jobCounter.join("testWaitOnJoin", 1ms, j); threadExited.store (true); }); // Wait for the thread to call jobCounter.join(). while (! jobCounter.joined()); - // The thread should still be active after waiting a millisecond. + // The thread should still be active after waiting 5 milliseconds. // This is not a guarantee that join() stalled the thread, but it // improves confidence. using namespace std::chrono_literals; - std::this_thread::sleep_for (1ms); + std::this_thread::sleep_for (5ms); BEAST_EXPECT (threadExited == false); // Destroy the Job and expect the thread to exit (asynchronously). @@ -119,4 +127,5 @@ public: BEAST_DEFINE_TESTSUITE(JobCounter, core, ripple); -} +} // test +} // ripple diff --git a/src/test/core/JobQueue_test.cpp b/src/test/core/JobQueue_test.cpp new file mode 100644 index 0000000000..11f355dd62 --- /dev/null +++ b/src/test/core/JobQueue_test.cpp @@ -0,0 +1,154 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2017 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include + +namespace ripple { +namespace test { + +//------------------------------------------------------------------------------ + +class JobQueue_test : public beast::unit_test::suite +{ + void testAddJob() + { + jtx::Env env {*this}; + + JobQueue& jQueue = env.app().getJobQueue(); + { + // addJob() should run the Job (and return true). + std::atomic jobRan {false}; + BEAST_EXPECT (jQueue.addJob (jtCLIENT, "JobAddTest1", + [&jobRan] (Job&) { jobRan = true; }) == true); + + // Wait for the Job to run. + while (jobRan == false); + } + { + // If the JobQueue's JobCounter is join()ed we should no + // longer be able to add Jobs (and calling addJob() should + // return false). + using namespace std::chrono_literals; + beast::Journal j {env.app().journal ("JobQueue_test")}; + JobCounter& jCounter = jQueue.jobCounter(); + jCounter.join("JobQueue_test", 1s, j); + + // The Job should never run, so having the Job access this + // unprotected variable on the stack should be completely safe. + // Not recommended for the faint of heart... + bool unprotected; + BEAST_EXPECT (jQueue.addJob (jtCLIENT, "JobAddTest2", + [&unprotected] (Job&) { unprotected = false; }) == false); + } + } + + void testPostCoro() + { + jtx::Env env {*this}; + + JobQueue& jQueue = env.app().getJobQueue(); + { + // Test repeated post()s until the Coro completes. + std::atomic yieldCount {0}; + auto const coro = jQueue.postCoro (jtCLIENT, "PostCoroTest1", + [&yieldCount] (std::shared_ptr const& coroCopy) + { + while (++yieldCount < 4) + coroCopy->yield(); + }); + BEAST_EXPECT (coro != nullptr); + + // Wait for the Job to run and yield. + while (yieldCount == 0); + + // Now re-post until the Coro says it is done. + int old = yieldCount; + while (coro->runnable()) + { + BEAST_EXPECT (coro->post()); + while (old == yieldCount) { } + coro->join(); + BEAST_EXPECT (++old == yieldCount); + } + BEAST_EXPECT (yieldCount == 4); + } + { + // Test repeated resume()s until the Coro completes. + int yieldCount {0}; + auto const coro = jQueue.postCoro (jtCLIENT, "PostCoroTest2", + [&yieldCount] (std::shared_ptr const& coroCopy) + { + while (++yieldCount < 4) + coroCopy->yield(); + }); + if (! coro) + { + // There's no good reason we should not get a Coro, but we + // can't continue without one. + BEAST_EXPECT (false); + return; + } + + // Wait for the Job to run and yield. + coro->join(); + + // Now resume until the Coro says it is done. + int old = yieldCount; + while (coro->runnable()) + { + coro->resume(); // Resume runs synchronously on this thread. + BEAST_EXPECT (++old == yieldCount); + } + BEAST_EXPECT (yieldCount == 4); + } + { + // If the JobQueue's JobCounter is join()ed we should no + // longer be able to add a Coro (and calling postCoro() should + // return false). + using namespace std::chrono_literals; + beast::Journal j {env.app().journal ("JobQueue_test")}; + JobCounter& jCounter = jQueue.jobCounter(); + jCounter.join("JobQueue_test", 1s, j); + + // The Coro should never run, so having the Coro access this + // unprotected variable on the stack should be completely safe. + // Not recommended for the faint of heart... + bool unprotected; + auto const coro = jQueue.postCoro (jtCLIENT, "PostCoroTest3", + [&unprotected] (std::shared_ptr const&) + { unprotected = false; }); + BEAST_EXPECT (coro == nullptr); + } + } + +public: + void run() + { + testAddJob(); + testPostCoro(); + } +}; + +BEAST_DEFINE_TESTSUITE(JobQueue, core, ripple); + +} // test +} // ripple diff --git a/src/test/unity/core_test_unity.cpp b/src/test/unity/core_test_unity.cpp index c6fea7810d..c9803768e5 100644 --- a/src/test/unity/core_test_unity.cpp +++ b/src/test/unity/core_test_unity.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include