Don't use JobQueue during shutdown (RIPD-1356):

If the JobQueue is used during shutdown then those Jobs may access
Stoppables after they have already stopped.  This violates the
preconditions of Stoppables and may lead to undefined behavior.

The solution taken here is to reference count all Jobs in the
JobQueue.  At stop time all Jobs already in the JobQueue are
allowed to run to completion, but no further Jobs are allowed
into the JobQueue.

If a Job is rejected from the JobQueue (because we are stopping),
then JobQueue::addJob() returns false, so the caller can make any
necessary adjustments.
This commit is contained in:
Scott Schurr
2017-04-06 17:39:42 -07:00
committed by seelabs
parent fc89d2e014
commit efe3700f70
29 changed files with 652 additions and 189 deletions

View File

@@ -4511,6 +4511,10 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\test\core\JobQueue_test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\test\core\SociDB_test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>

View File

@@ -5265,6 +5265,9 @@
<ClCompile Include="..\..\src\test\core\JobCounter_test.cpp">
<Filter>test\core</Filter>
</ClCompile>
<ClCompile Include="..\..\src\test\core\JobQueue_test.cpp">
<Filter>test\core</Filter>
</ClCompile>
<ClCompile Include="..\..\src\test\core\SociDB_test.cpp">
<Filter>test\core</Filter>
</ClCompile>

View File

@@ -59,8 +59,8 @@ RCLValidationsPolicy::onStale(RCLValidation&& v)
if (staleWriting_)
return;
staleWriting_ = true;
app_.getJobQueue().addJob(
// addJob() may return false (Job not added) at shutdown.
staleWriting_ = app_.getJobQueue().addJob(
jtWRITE, "Validations::doStaleWrite", [this](Job&) {
auto event =
app_.getJobQueue().makeLoadEvent(jtDISK, "ValidationWrite");

View File

@@ -1048,25 +1048,22 @@ bool pendSaveValidated (
return true;
}
if (isSynchronous)
return saveValidatedLedger(app, ledger, isCurrent);
JobType const jobType {isCurrent ? jtPUBLEDGER : jtPUBOLDLEDGER};
char const* const jobName {
isCurrent ? "Ledger::pendSave" : "Ledger::pendOldSave"};
auto job = [ledger, &app, isCurrent] (Job&) {
// See if we can use the JobQueue.
if (!isSynchronous &&
app.getJobQueue().addJob (jobType, jobName,
[&app, ledger, isCurrent] (Job&) {
saveValidatedLedger(app, ledger, isCurrent);
};
if (isCurrent)
}))
{
app.getJobQueue().addJob(
jtPUBLEDGER, "Ledger::pendSave", job);
}
else
{
app.getJobQueue ().addJob(
jtPUBOLDLEDGER, "Ledger::pendOldSave", job);
}
return true;
}
// The JobQueue won't do the Job. Do the save synchronously.
return saveValidatedLedger(app, ledger, isCurrent);
}
void

View File

@@ -203,9 +203,9 @@ public:
void setBuildingLedger (LedgerIndex index);
void tryAdvance ();
void newPathRequest ();
bool newPathRequest (); // Returns true if path request successfully placed.
bool isNewPathRequest ();
void newOrderBookDB ();
bool newOrderBookDB (); // Returns true if able to fulfill request.
bool fixIndex (
LedgerIndex ledgerIndex, LedgerHash const& ledgerHash);
@@ -242,6 +242,9 @@ public:
std::size_t getFetchPackCacheSize () const;
private:
using ScopedLockType = std::lock_guard <std::recursive_mutex>;
using ScopedUnlockType = GenericScopedUnlock <std::recursive_mutex>;
void setValidLedger(
std::shared_ptr<Ledger const> const& l);
void setPubLedger(
@@ -255,8 +258,9 @@ private:
boost::optional<LedgerHash> getLedgerHashForHistory(LedgerIndex index);
std::size_t getNeededValidations();
void advanceThread();
// Try to publish ledgers, acquire missing ledgers
void doAdvance();
// Try to publish ledgers, acquire missing ledgers. Always called with
// m_mutex locked. The passed ScopedLockType is a reminder to callers.
void doAdvance(ScopedLockType&);
bool shouldFetchPack(std::uint32_t seq) const;
bool shouldAcquire(
std::uint32_t const currentLedger,
@@ -268,12 +272,12 @@ private:
findNewLedgersToPublish();
void updatePaths(Job& job);
void newPFWork(const char *name);
// Returns true if work started. Always called with m_mutex locked.
// The passed ScopedLockType is a reminder to callers.
bool newPFWork(const char *name, ScopedLockType&);
private:
using ScopedLockType = std::lock_guard <std::recursive_mutex>;
using ScopedUnlockType = GenericScopedUnlock <std::recursive_mutex>;
Application& app_;
beast::Journal m_journal;

View File

@@ -101,6 +101,15 @@ void OrderBookDB::update(
{
for(auto& sle : ledger->sles)
{
if (isStopping())
{
JLOG (j_.info())
<< "OrderBookDB::update exiting due to isStopping";
std::lock_guard <std::recursive_mutex> sl (mLock);
mSeq = 0;
return;
}
if (sle->getType () == ltDIR_NODE &&
sle->isFieldPresent (sfExchangeRate) &&
sle->getFieldH256 (sfRootIndex) == sle->key())

View File

@@ -919,7 +919,7 @@ LedgerMaster::advanceThread()
try
{
doAdvance();
doAdvance(sl);
}
catch (std::exception const&)
{
@@ -1159,6 +1159,7 @@ LedgerMaster::updatePaths (Job& job)
{
JLOG (m_journal.debug())
<< "Published ledger too old for updating paths";
ScopedLockType ml (m_mutex);
--mPathFindThread;
return;
}
@@ -1193,48 +1194,51 @@ LedgerMaster::updatePaths (Job& job)
}
}
void
bool
LedgerMaster::newPathRequest ()
{
ScopedLockType ml (m_mutex);
mPathFindNewRequest = true;
newPFWork("pf:newRequest");
mPathFindNewRequest = newPFWork("pf:newRequest", ml);
return mPathFindNewRequest;
}
bool
LedgerMaster::isNewPathRequest ()
{
ScopedLockType ml (m_mutex);
if (!mPathFindNewRequest)
return false;
bool const ret = mPathFindNewRequest;
mPathFindNewRequest = false;
return true;
return ret;
}
// If the order book is radically updated, we need to reprocess all
// pathfinding requests.
void
bool
LedgerMaster::newOrderBookDB ()
{
ScopedLockType ml (m_mutex);
mPathLedger.reset();
newPFWork("pf:newOBDB");
return newPFWork("pf:newOBDB", ml);
}
/** A thread needs to be dispatched to handle pathfinding work of some kind.
*/
void
LedgerMaster::newPFWork (const char *name)
bool
LedgerMaster::newPFWork (const char *name, ScopedLockType&)
{
if (mPathFindThread < 2)
{
++mPathFindThread;
app_.getJobQueue().addJob (
if (app_.getJobQueue().addJob (
jtUPDATE_PF, name,
[this] (Job& j) { updatePaths(j); });
[this] (Job& j) { updatePaths(j); }))
{
++mPathFindThread;
}
}
// If we're stopping don't give callers the expectation that their
// request will be fulfilled, even if it may be serviced.
return mPathFindThread > 0 && !isStopping();
}
std::recursive_mutex&
@@ -1520,7 +1524,7 @@ LedgerMaster::shouldAcquire (
}
// Try to publish ledgers, acquire missing ledgers
void LedgerMaster::doAdvance ()
void LedgerMaster::doAdvance (ScopedLockType& sl)
{
// TODO NIKB: simplify and unindent this a bit!
@@ -1707,9 +1711,8 @@ void LedgerMaster::doAdvance ()
}
}
progress = true;
app_.getOPs().clearNeedNetworkLedger();
newPFWork ("pf:newLedger");
progress = newPFWork ("pf:newLedger", sl);
}
if (progress)
mAdvanceWork = true;

View File

@@ -82,6 +82,11 @@ void TransactionAcquire::done ()
uint256 const& hash (mHash);
std::shared_ptr <SHAMap> const& map (mMap);
auto const pap = &app_;
// Note that, when we're in the process of shutting down, addJob()
// may reject the request. If that happens then giveSet() will
// not be called. That's fine. According to David the giveSet() call
// just updates the consensus and related structures when we acquire
// a transaction set. No need to update them if we're shutting down.
app_.getJobQueue().addJob (jtTXN_DATA, "completeAcquire",
[pap, hash, map](Job&)
{

View File

@@ -905,16 +905,20 @@ public:
if (timer == m_sweepTimer)
{
// VFALCO TODO Move all this into doSweep
m_jobQueue->addJob(
jtSWEEP, "sweep", [this] (Job&) { doSweep(); });
}
}
void doSweep ()
{
if (! config_->standalone())
{
boost::filesystem::space_info space =
boost::filesystem::space (config_->legacy ("database_path"));
// VFALCO TODO Give this magic constant a name and move it into a well documented header
//
if (space.available < (512 * 1024 * 1024))
constexpr std::uintmax_t bytes512M = 512 * 1024 * 1024;
if (space.available < (bytes512M))
{
JLOG(m_journal.fatal())
<< "Remaining free disk space is less than 512MB";
@@ -922,12 +926,6 @@ public:
}
}
m_jobQueue->addJob(jtSWEEP, "sweep", [this] (Job&) { doSweep(); });
}
}
void doSweep ()
{
// VFALCO NOTE Does the order of calls matter?
// VFALCO TODO fix the dependency inversion using an observer,
// have listeners register for "onSweep ()" notification.
@@ -943,7 +941,7 @@ public:
family().treecache().sweep();
cachedSLEs_.expire();
// VFALCO NOTE does the call to sweep() happen on another thread?
// Set timer to do another sweep later.
m_sweepTimer.setExpiration (
std::chrono::seconds {config_->getSize (siSweepInterval)});
}

View File

@@ -25,8 +25,6 @@ namespace ripple {
NodeStoreScheduler::NodeStoreScheduler (Stoppable& parent)
: Stoppable ("NodeStoreScheduler", parent)
, m_jobQueue (nullptr)
, m_taskCount (0)
{
}
@@ -48,16 +46,29 @@ void NodeStoreScheduler::onChildrenStopped ()
void NodeStoreScheduler::scheduleTask (NodeStore::Task& task)
{
++m_taskCount;
m_jobQueue->addJob (
if (!m_jobQueue->addJob (
jtWRITE,
"NodeObject::store",
[this, &task] (Job&) { doTask(task); });
[this, &task] (Job&) { doTask(task); }))
{
// Job not added, presumably because we're shutting down.
// Recover by executing the task synchronously.
doTask (task);
}
}
void NodeStoreScheduler::doTask (NodeStore::Task& task)
{
task.performScheduledTask ();
if ((--m_taskCount == 0) && isStopping())
// NOTE: It feels a bit off that there are two different methods that
// call stopped(): onChildrenStopped() and doTask(). There's a
// suspicion that, as long as the Stoppable tree is configured
// correctly, this call to stopped() in doTask() can never occur.
//
// However, until we increase our confidence that the suspicion is
// correct, we will leave this code in place.
if ((--m_taskCount == 0) && isStopping() && areChildrenStopped())
stopped();
}

View File

@@ -49,8 +49,8 @@ public:
private:
void doTask (NodeStore::Task& task);
JobQueue* m_jobQueue;
std::atomic <int> m_taskCount;
JobQueue* m_jobQueue {nullptr};
std::atomic <int> m_taskCount {0};
};
} // ripple

View File

@@ -44,7 +44,6 @@
#include <ripple/basics/UptimeTimer.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/core/DeadlineTimer.h>
#include <ripple/core/JobCounter.h>
#include <ripple/crypto/csprng.h>
#include <ripple/crypto/RFC1751.h>
#include <ripple/json/to_string.h>
@@ -218,8 +217,7 @@ public:
~NetworkOPsImp() override
{
jobCounter_.join();
// this clear() is necessary to ensure the shared_ptrs in this map get
// This clear() is necessary to ensure the shared_ptrs in this map get
// destroyed NOW because the objects in this map invoke methods on this
// class when they are destroyed
mRpcSubMap.clear();
@@ -486,9 +484,6 @@ public:
m_heartbeatTimer.cancel();
m_clusterTimer.cancel();
// Wait until all our in-flight Jobs are completed.
jobCounter_.join();
stopped ();
}
@@ -540,7 +535,6 @@ private:
DeadlineTimer m_heartbeatTimer;
DeadlineTimer m_clusterTimer;
JobCounter jobCounter_;
RCLConsensus mConsensus;
@@ -657,14 +651,14 @@ void NetworkOPsImp::onDeadlineTimer (DeadlineTimer& timer)
{
if (timer == m_heartbeatTimer)
{
m_job_queue.addCountedJob (
jtNETOP_TIMER, "NetOPs.heartbeat", jobCounter_,
m_job_queue.addJob (
jtNETOP_TIMER, "NetOPs.heartbeat",
[this] (Job&) { processHeartbeatTimer(); });
}
else if (timer == m_clusterTimer)
{
m_job_queue.addCountedJob (
jtNETOP_CLUSTER, "NetOPs.cluster", jobCounter_,
m_job_queue.addJob (
jtNETOP_CLUSTER, "NetOPs.cluster",
[this] (Job&) { processClusterTimer(); });
}
}
@@ -837,8 +831,8 @@ void NetworkOPsImp::submitTransaction (std::shared_ptr<STTx const> const& iTrans
auto tx = std::make_shared<Transaction> (
trans, reason, app_);
m_job_queue.addCountedJob (
jtTRANSACTION, "submitTxn", jobCounter_,
m_job_queue.addJob (
jtTRANSACTION, "submitTxn",
[this, tx] (Job&) {
auto t = tx;
processTransaction(t, false, false, FailHard::no);
@@ -904,8 +898,8 @@ void NetworkOPsImp::doTransactionAsync (std::shared_ptr<Transaction> transaction
if (mDispatchState == DispatchState::none)
{
if (m_job_queue.addCountedJob (
jtBATCH, "transactionBatch", jobCounter_,
if (m_job_queue.addJob (
jtBATCH, "transactionBatch",
[this] (Job&) { transactionBatch(); }))
{
mDispatchState = DispatchState::scheduled;
@@ -939,8 +933,8 @@ void NetworkOPsImp::doTransactionSync (std::shared_ptr<Transaction> transaction,
if (mTransactions.size())
{
// More transactions need to be applied, but by another job.
if (m_job_queue.addCountedJob (
jtBATCH, "transactionBatch", jobCounter_,
if (m_job_queue.addJob (
jtBATCH, "transactionBatch",
[this] (Job&) { transactionBatch(); }))
{
mDispatchState = DispatchState::scheduled;
@@ -2466,8 +2460,8 @@ void NetworkOPsImp::reportFeeChange ()
// only schedule the job if something has changed
if (f != mLastFeeSummary)
{
m_job_queue.addCountedJob (
jtCLIENT, "reportFeeChange->pubServer", jobCounter_,
m_job_queue.addJob (
jtCLIENT, "reportFeeChange->pubServer",
[this] (Job&) { pubServer(); });
}
}
@@ -3307,10 +3301,6 @@ NetworkOPs::NetworkOPs (Stoppable& parent)
{
}
NetworkOPs::~NetworkOPs ()
{
}
//------------------------------------------------------------------------------

View File

@@ -20,14 +20,14 @@
#ifndef RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED
#define RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED
#include <ripple/core/JobQueue.h>
#include <ripple/protocol/STValidation.h>
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/consensus/RCLCxPeerPos.h>
#include <ripple/core/JobQueue.h>
#include <ripple/core/Stoppable.h>
#include <ripple/ledger/ReadView.h>
#include <ripple/net/InfoSub.h>
#include <ripple/protocol/STValidation.h>
#include <memory>
#include <ripple/core/Stoppable.h>
#include <deque>
#include <tuple>
@@ -96,7 +96,7 @@ public:
}
public:
virtual ~NetworkOPs () = 0;
~NetworkOPs () override = default;
//--------------------------------------------------------------------------
//

View File

@@ -70,7 +70,7 @@ public:
beast::Journal journal);
// ripple_path_find semantics
// Completion function is called
// Completion function is called after path update is complete
PathRequest (
Application& app,
std::function <void (void)> const& completion,
@@ -83,6 +83,8 @@ public:
bool isNew ();
bool needsUpdate (bool newOnly, LedgerIndex index);
// Called when the PathRequest update is complete.
void updateComplete ();
std::pair<bool, Json::Value> doCreate (

View File

@@ -23,6 +23,8 @@
#include <ripple/app/main/Application.h>
#include <ripple/basics/Log.h>
#include <ripple/core/JobQueue.h>
#include <ripple/net/RPCErr.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/JsonFields.h>
#include <ripple/resource/Fees.h>
#include <algorithm>
@@ -246,7 +248,12 @@ PathRequests::makeLegacyPathRequest(
else
{
insertPathRequest (req);
app_.getLedgerMaster().newPathRequest();
if (! app_.getLedgerMaster().newPathRequest())
{
// The newPathRequest failed. Tell the caller.
result.second = rpcError (rpcTOO_BUSY);
req.reset();
}
}
return std::move (result.second);

View File

@@ -33,6 +33,7 @@ namespace ripple {
class PathRequests
{
public:
/** A collection of all PathRequest instances. */
PathRequests (Application& app,
beast::Journal journal, beast::insight::Collector::ptr const& collector)
: app_ (app)
@@ -43,6 +44,11 @@ public:
mFull = collector->make_event ("pathfind_full");
}
/** Update all of the contained PathRequest instances.
@param ledger Ledger we are pathfinding in.
@param shouldCancel Invocable that returns whether to cancel.
*/
void updateAll (std::shared_ptr<ReadView const> const& ledger,
Job::CancelCallback shouldCancel);

View File

@@ -48,7 +48,9 @@ inline
JobQueue::Coro::
~Coro()
{
#ifndef NDEBUG
assert(finished_);
#endif
}
inline
@@ -64,7 +66,7 @@ yield() const
}
inline
void
bool
JobQueue::Coro::
post()
{
@@ -74,9 +76,31 @@ post()
}
// sp keeps 'this' alive
jq_.addJob(type_, name_,
if (jq_.addJob(type_, name_,
[this, sp = shared_from_this()](Job&)
{
resume();
}))
{
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard<std::mutex> lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
inline
void
JobQueue::Coro::
resume()
{
{
std::lock_guard<std::mutex> lk(mutex_run_);
running_ = true;
}
{
std::lock_guard<std::mutex> lock(jq_.m_mutex);
--jq_.nSuspend_;
@@ -84,13 +108,44 @@ post()
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard<std::mutex> lock(mutex_);
assert (coro_);
coro_();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
std::lock_guard<std::mutex> lk(mutex_run_);
running_ = false;
cv_.notify_all();
});
}
inline
bool
JobQueue::Coro::
runnable() const
{
return static_cast<bool>(coro_);
}
inline
void
JobQueue::Coro::
expectEarlyExit()
{
#ifndef NDEBUG
if (! finished_)
#endif
{
// expectEarlyExit() must only ever be called from outside the
// Coro's stack. It you're inside the stack you can simply return
// and be done.
//
// That said, since we're outside the Coro's stack, we need to
// decrement the nSuspend that the Coro's call to yield caused.
std::lock_guard<std::mutex> lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
}
inline

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_CORE_JOB_COUNTER_H_INCLUDED
#define RIPPLE_CORE_JOB_COUNTER_H_INCLUDED
#include <ripple/basics/Log.h>
#include <ripple/core/Job.h>
#include <boost/optional.hpp>
#include <atomic>
@@ -126,18 +127,31 @@ public:
/** Destructor verifies all in-flight jobs are complete. */
~JobCounter()
{
join();
using namespace std::chrono_literals;
join ("JobCounter", 1s, debugLog());
}
/** Returns once all counted in-flight Jobs are destroyed. */
void join()
/** Returns once all counted in-flight Jobs are destroyed.
@param name Name reported if join time exceeds wait.
@param wait If join() exceeds this duration report to Journal.
@param j Journal written to if wait is exceeded.
*/
void join (char const* name,
std::chrono::milliseconds wait, beast::Journal j)
{
std::unique_lock<std::mutex> lock {mutex_};
waitForJobs_ = true;
if (jobCount_ > 0)
{
allJobsDoneCond_.wait (
lock, [this] { return jobCount_ == 0; });
if (! allJobsDoneCond_.wait_for (
lock, wait, [this] { return jobCount_ == 0; }))
{
if (auto stream = j.error())
stream << name << " waiting for JobCounter::join().";
allJobsDoneCond_.wait (lock, [this] { return jobCount_ == 0; });
}
}
}

View File

@@ -97,9 +97,31 @@ public:
When the job runs, the coroutine's stack is restored and execution
continues at the beginning of coroutine function or the statement
after the previous call to yield.
Undefined behavior if called consecutively without a corresponding yield.
Undefined behavior if called after the coroutine has completed
with a return (as opposed to a yield()).
Undefined behavior if post() or resume() called consecutively
without a corresponding yield.
@return true if the Coro's job is added to the JobQueue.
*/
void post();
bool post();
/** Resume coroutine execution.
Effects:
The coroutine continues execution from where it last left off
using this same thread.
Undefined behavior if called after the coroutine has completed
with a return (as opposed to a yield()).
Undefined behavior if resume() or post() called consecutively
without a corresponding yield.
*/
void resume();
/** Returns true if the Coro is still runnable (has not returned). */
bool runnable() const;
/** Once called, the Coro allows early exit without an assert. */
void expectEarlyExit();
/** Waits until coroutine returns from the user function. */
void join();
@@ -113,29 +135,23 @@ public:
/** Adds a job to the JobQueue.
@param t The type of job.
@param type The type of job.
@param name Name of the job.
@param func std::function with signature void (Job&). Called when the job is executed.
*/
void addJob (JobType type, std::string const& name, JobFunction const& func);
/** Adds a counted job to the JobQueue.
@param t The type of job.
@param name Name of the job.
@param counter JobCounter for counting the Job.
@param jobHandler Lambda with signature void (Job&). Called when the job is executed.
@return true if JobHandler added, false if JobCounter is already joined.
@return true if jobHandler added to queue.
*/
template <typename JobHandler>
bool addCountedJob (JobType type,
std::string const& name, JobCounter& counter, JobHandler&& jobHandler)
template <typename JobHandler,
typename = std::enable_if_t<std::is_same<decltype(
std::declval<JobHandler&&>()(std::declval<Job&>())), void>::value>>
bool addJob (JobType type,
std::string const& name, JobHandler&& jobHandler)
{
if (auto optionalCountedJob = counter.wrap (std::move (jobHandler)))
if (auto optionalCountedJob =
Stoppable::jobCounter().wrap (std::forward<JobHandler>(jobHandler)))
{
addJob (type, name, std::move (*optionalCountedJob));
return true;
return addRefCountedJob (
type, name, std::move (*optionalCountedJob));
}
return false;
}
@@ -145,9 +161,11 @@ public:
@param t The type of job.
@param name Name of the job.
@param f Has a signature of void(std::shared_ptr<Coro>). Called when the job executes.
@return shared_ptr to posted Coro. nullptr if post was not successful.
*/
template <class F>
void postCoro (JobType t, std::string const& name, F&& f);
std::shared_ptr<Coro> postCoro (JobType t, std::string const& name, F&& f);
/** Jobs waiting at this priority.
*/
@@ -227,6 +245,16 @@ private:
// Signals the service stopped if the stopped condition is met.
void checkStopped (std::lock_guard <std::mutex> const& lock);
// Adds a reference counted job to the JobQueue.
//
// param type The type of job.
// param name Name of the job.
// param func std::function with signature void (Job&). Called when the job is executed.
//
// return true if func added to queue.
bool addRefCountedJob (
JobType type, std::string const& name, JobFunction const& func);
// Signals an added Job for processing.
//
// Pre-conditions:
@@ -311,15 +339,15 @@ private:
other requests while the RPC command completes its work asynchronously.
postCoro() creates a Coro object. When the Coro ctor is called, and its
coro_ member is initialized(a boost::coroutines::pull_type), execution
coro_ member is initialized (a boost::coroutines::pull_type), execution
automatically passes to the coroutine, which we don't want at this point,
since we are still in the handler thread context. It's important to note here
that construction of a boost pull_type automatically passes execution to the
coroutine. A pull_type object automatically generates a push_type that is
used as the as a parameter(do_yield) in the signature of the function the
passed as a parameter (do_yield) in the signature of the function the
pull_type was created with. This function is immediately called during coro_
construction and within it, Coro::yield_ is assigned the push_type
parameter(do_yield) address and called(yield()) so we can return execution
parameter (do_yield) address and called (yield()) so we can return execution
back to the caller's stack.
postCoro() then calls Coro::post(), which schedules a job on the job
@@ -368,15 +396,23 @@ private:
namespace ripple {
template <class F>
void JobQueue::postCoro (JobType t, std::string const& name, F&& f)
std::shared_ptr<JobQueue::Coro>
JobQueue::postCoro (JobType t, std::string const& name, F&& f)
{
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<Coro>).
*/
auto const coro = std::make_shared<Coro>(
auto coro = std::make_shared<Coro>(
Coro_create_t{}, *this, t, name, std::forward<F>(f));
coro->post();
if (! coro->post())
{
// The Coro was not successfully posted. Disable it so it's destructor
// can run with no negative side effects. Then destroy it.
coro->expectEarlyExit();
coro.reset();
}
return coro;
}
}

View File

@@ -23,6 +23,7 @@
#include <ripple/beast/core/LockFreeStack.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/beast/core/WaitableEvent.h>
#include <ripple/core/JobCounter.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
@@ -167,6 +168,30 @@ class RootStoppable;
when the last thread is about to exit it would call stopped().
@note A Stoppable may not be restarted.
The form of the Stoppable tree in the rippled application evolves as
the source code changes and reacts to new demands. As of March in 2017
the Stoppable tree had this form:
@code
Application
|
+--------------------+--------------------+
| | |
LoadManager SHAMapStore NodeStoreScheduler
|
JobQueue
|
+-----------+-----------+-----------+-----------+----+--------+
| | | | | |
| NetworkOPs | InboundLedgers | OrderbookDB
| | |
Overlay InboundTransactions LedgerMaster
| |
PeerFinder LedgerCleaner
@endcode
*/
/** @{ */
class Stoppable
@@ -190,6 +215,9 @@ public:
/** Returns `true` if all children have stopped. */
bool areChildrenStopped () const;
/* JobQueue uses this method for Job counting. */
inline JobCounter& jobCounter ();
/** Sleep or wake up on stop.
@return `true` if we are stopping
@@ -282,9 +310,8 @@ private:
std::string m_name;
RootStoppable& m_root;
Child m_child;
std::atomic<bool> m_started;
std::atomic<bool> m_stopped;
std::atomic<bool> m_childrenStopped;
std::atomic<bool> m_stopped {false};
std::atomic<bool> m_childrenStopped {false};
Children m_children;
beast::WaitableEvent m_stoppedEvent;
};
@@ -296,7 +323,7 @@ class RootStoppable : public Stoppable
public:
explicit RootStoppable (std::string name);
~RootStoppable () = default;
~RootStoppable ();
bool isStopping() const;
@@ -326,6 +353,18 @@ public:
*/
void stop (beast::Journal j);
/** Return true if start() was ever called. */
bool started () const
{
return m_started;
}
/* JobQueue uses this method for Job counting. */
JobCounter& rootJobCounter ()
{
return jobCounter_;
}
/** Sleep or wake up on stop.
@return `true` if we are stopping
@@ -346,15 +385,24 @@ private:
*/
bool stopAsync(beast::Journal j);
std::atomic<bool> m_prepared;
std::atomic<bool> m_calledStop;
std::atomic<bool> m_prepared {false};
std::atomic<bool> m_started {false};
std::atomic<bool> m_calledStop {false};
std::mutex m_;
std::condition_variable c_;
JobCounter jobCounter_;
};
/** @} */
//------------------------------------------------------------------------------
JobCounter& Stoppable::jobCounter ()
{
return m_root.rootJobCounter();
}
//------------------------------------------------------------------------------
template <class Rep, class Period>
bool
RootStoppable::alertable_sleep_for(

View File

@@ -67,8 +67,8 @@ JobQueue::collect ()
job_count = m_jobSet.size ();
}
void
JobQueue::addJob (JobType type, std::string const& name,
bool
JobQueue::addRefCountedJob (JobType type, std::string const& name,
JobFunction const& func)
{
assert (type != jtINVALID);
@@ -76,7 +76,7 @@ JobQueue::addJob (JobType type, std::string const& name,
auto iter (m_jobData.find (type));
assert (iter != m_jobData.end ());
if (iter == m_jobData.end ())
return;
return false;
JobTypeData& data (iter->second);
@@ -108,6 +108,7 @@ JobQueue::addJob (JobType type, std::string const& name,
data.load (), func, m_cancelCallback)));
queueJob (*result.first, lock);
}
return true;
}
int

View File

@@ -226,7 +226,13 @@ private:
running_ = true;
}
jobQueue_.addJob (jtWAL, "WAL", [this] (Job&) { checkpoint(); });
// If the Job is not added to the JobQueue then we're not running_.
if (! jobQueue_.addJob (
jtWAL, "WAL", [this] (Job&) { checkpoint(); }))
{
std::lock_guard <std::mutex> lock (mutex_);
running_ = false;
}
}
void checkpoint ()

View File

@@ -26,9 +26,6 @@ Stoppable::Stoppable (std::string name, RootStoppable& root)
: m_name (std::move (name))
, m_root (root)
, m_child (this)
, m_started (false)
, m_stopped (false)
, m_childrenStopped (false)
{
}
@@ -36,9 +33,6 @@ Stoppable::Stoppable (std::string name, Stoppable& parent)
: m_name (std::move (name))
, m_root (parent.m_root)
, m_child (this)
, m_started (false)
, m_stopped (false)
, m_childrenStopped (false)
{
// Must not have stopping parent.
assert (! parent.isStopping());
@@ -48,8 +42,8 @@ Stoppable::Stoppable (std::string name, Stoppable& parent)
Stoppable::~Stoppable ()
{
// Children must be stopped.
assert (!m_started || m_childrenStopped);
// Either we must not have started, or Children must be stopped.
assert (!m_root.started() || m_childrenStopped);
}
bool Stoppable::isStopping() const
@@ -113,12 +107,13 @@ void Stoppable::stopAsyncRecursive (beast::Journal j)
auto const start = high_resolution_clock::now();
onStop ();
auto const ms = duration_cast<milliseconds>(
high_resolution_clock::now() - start).count();
high_resolution_clock::now() - start);
#ifdef NDEBUG
if (ms >= 10)
using namespace std::chrono_literals;
if (ms >= 10ms)
if (auto stream = j.fatal())
stream << m_name << "::onStop took " << ms << "ms";
stream << m_name << "::onStop took " << ms.count() << "ms";
#else
(void)ms;
#endif
@@ -159,11 +154,15 @@ void Stoppable::stopRecursive (beast::Journal j)
RootStoppable::RootStoppable (std::string name)
: Stoppable (std::move (name), *this)
, m_prepared (false)
, m_calledStop (false)
{
}
RootStoppable::~RootStoppable ()
{
using namespace std::chrono_literals;
jobCounter_.join(m_name.c_str(), 1s, debugLog());
}
bool RootStoppable::isStopping() const
{
return m_calledStop;
@@ -194,7 +193,7 @@ void RootStoppable::stop (beast::Journal j)
stopRecursive (j);
}
bool RootStoppable::stopAsync(beast::Journal j)
bool RootStoppable::stopAsync (beast::Journal j)
{
bool alreadyCalled;
{
@@ -211,6 +210,11 @@ bool RootStoppable::stopAsync(beast::Journal j)
stream << "Stoppable::stop called again";
return false;
}
// Wait until all in-flight JobQueue Jobs are completed.
using namespace std::chrono_literals;
jobCounter_.join (m_name.c_str(), 1s, j);
c_.notify_all();
stopAsyncRecursive(j);
return true;

View File

@@ -93,11 +93,9 @@ public:
if (!mSending)
{
// Start a sending thread.
mSending = true;
JLOG (j_.info()) << "RPCCall::fromNetwork start";
m_jobQueue.addJob (
mSending = m_jobQueue.addJob (
jtCLIENT, "RPCSub::sendThread", [this] (Job&) {
sendThread();
});

View File

@@ -55,8 +55,92 @@ Json::Value doRipplePathFind (RPC::Context& context)
PathRequest::pointer request;
lpLedger = context.ledgerMaster.getClosedLedger();
// It doesn't look like there's much odd happening here, but you should
// be aware this code runs in a JobQueue::Coro, which is a coroutine.
// And we may be flipping around between threads. Here's an overview:
//
// 1. We're running doRipplePathFind() due to a call to
// ripple_path_find. doRipplePathFind() is currently running
// inside of a JobQueue::Coro using a JobQueue thread.
//
// 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the
// path-finding request. That request will (probably) run at some
// indeterminate future time on a (probably different) JobQueue
// thread.
//
// 3. As a continuation from that path-finding JobQueue thread, the
// coroutine we're currently running in (!) is posted to the
// JobQueue. Because it is a continuation, that post won't
// happen until the path-finding request completes.
//
// 4. Once the continuation is enqueued, and we have reason to think
// the path-finding job is likely to run, then the coroutine we're
// running in yield()s. That means it surrenders its thread in
// the JobQueue. The coroutine is suspended, but ready to run,
// because it is kept resident by a shared_ptr in the
// path-finding continuation.
//
// 5. If all goes well then path-finding runs on a JobQueue thread
// and executes its continuation. The continuation posts this
// same coroutine (!) to the JobQueue.
//
// 6. When the JobQueue calls this coroutine, this coroutine resumes
// from the line below the coro->yield() and returns the
// path-finding result.
//
// With so many moving parts, what could go wrong?
//
// Just in terms of the JobQueue refusing to add jobs at shutdown
// there are two specific things that can go wrong.
//
// 1. The path-finding Job queued by makeLegacyPathRequest() might be
// rejected (because we're shutting down).
//
// Fortunately this problem can be addressed by looking at the
// return value of makeLegacyPathRequest(). If
// makeLegacyPathRequest() cannot get a thread to run the path-find
// on, then it returns an empty request.
//
// 2. The path-finding job might run, but the Coro::post() might be
// rejected by the JobQueue (because we're shutting down).
//
// We handle this case by resuming (not posting) the Coro.
// By resuming the Coro, we allow the Coro to run to completion
// on the current thread instead of requiring that it run on a
// new thread from the JobQueue.
//
// Both of these failure modes are hard to recreate in a unit test
// because they are so dependent on inter-thread timing. However
// the failure modes can be observed by synchronously (inside the
// rippled source code) shutting down the application. The code to
// do so looks like this:
//
// context.app.signalStop();
// while (! context.app.getJobQueue().jobCounter().joined()) { }
//
// The first line starts the process of shutting down the app.
// The second line waits until no more jobs can be added to the
// JobQueue before letting the thread continue.
//
// May 2017
jvResult = context.app.getPathRequests().makeLegacyPathRequest (
request, std::bind(&JobQueue::Coro::post, context.coro),
request,
[&context] () {
// Copying the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the
// captured reference could evaporate when we return from
// coroCopy->resume(). This is not strictly necessary, but
// will make maintenance easier.
std::shared_ptr<JobQueue::Coro> coroCopy {context.coro};
if (!coroCopy->post())
{
// The post() failed, so we won't get a thread to let
// the Coro finish. We'll call Coro::resume() so the
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
}
},
context.consumer, lpLedger, context.params);
if (request)
{

View File

@@ -309,11 +309,20 @@ ServerHandlerImp::onRequest (Session& session)
return;
}
m_jobQueue.postCoro(jtCLIENT, "RPC-Client",
[this, detach = session.detach()](std::shared_ptr<JobQueue::Coro> c)
std::shared_ptr<Session> detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoro(jtCLIENT, "RPC-Client",
[this, detachedSession](std::shared_ptr<JobQueue::Coro> coro)
{
processSession(detach, c);
processSession(detachedSession, coro);
});
if (postResult == nullptr)
{
// The coroutine was rejected, probably because we're shutting down.
HTTPReply(503, "Service Unavailable",
makeOutput(*detachedSession), app_.journal("RPC"));
detachedSession->close(true);
return;
}
}
void
@@ -350,12 +359,12 @@ ServerHandlerImp::onWSMessage(
JLOG(m_journal.trace())
<< "Websocket received '" << jv << "'";
m_jobQueue.postCoro(jtCLIENT, "WS-Client",
[this, session = std::move(session),
jv = std::move(jv)](auto const& c)
auto const postResult = m_jobQueue.postCoro(jtCLIENT, "WS-Client",
[this, session, jv = std::move(jv)]
(std::shared_ptr<JobQueue::Coro> const& coro)
{
auto const jr =
this->processSession(session, c, jv);
this->processSession(session, coro, jv);
auto const s = to_string(jr);
auto const n = s.length();
beast::multi_buffer sb(n);
@@ -365,6 +374,11 @@ ServerHandlerImp::onWSMessage(
StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
});
if (postResult == nullptr)
{
// The coroutine was rejected, probably because we're shutting down.
session->close();
}
}
void

View File

@@ -20,16 +20,22 @@
#include <BeastConfig.h>
#include <ripple/core/JobCounter.h>
#include <ripple/beast/unit_test.h>
#include <test/jtx/Env.h>
#include <atomic>
#include <chrono>
#include <thread>
namespace ripple {
namespace test {
//------------------------------------------------------------------------------
class JobCounter_test : public beast::unit_test::suite
{
// We're only using Env for its Journal.
jtx::Env env {*this};
beast::Journal j {env.app().journal ("JobCounter_test")};
void testWrap()
{
// Verify reference counting.
@@ -41,8 +47,8 @@ class JobCounter_test : public beast::unit_test::suite
// wrapped1 should be callable with a Job.
{
Job j;
(*wrapped1)(j);
Job job;
(*wrapped1)(job);
}
{
// Copy should increase reference count.
@@ -66,7 +72,8 @@ class JobCounter_test : public beast::unit_test::suite
BEAST_EXPECT (jobCounter.count() == 0);
// Join with 0 count should not stall.
jobCounter.join();
using namespace std::chrono_literals;
jobCounter.join("testWrap", 1ms, j);
// Wrapping a Job after join() should return boost::none.
BEAST_EXPECT (jobCounter.wrap ([] (Job&) {}) == boost::none);
@@ -83,21 +90,22 @@ class JobCounter_test : public beast::unit_test::suite
// Calling join() now should stall, so do it on a different thread.
std::atomic<bool> threadExited {false};
std::thread localThread ([&jobCounter, &threadExited] ()
std::thread localThread ([&jobCounter, &threadExited, this] ()
{
// Should stall after calling join.
jobCounter.join();
using namespace std::chrono_literals;
jobCounter.join("testWaitOnJoin", 1ms, j);
threadExited.store (true);
});
// Wait for the thread to call jobCounter.join().
while (! jobCounter.joined());
// The thread should still be active after waiting a millisecond.
// The thread should still be active after waiting 5 milliseconds.
// This is not a guarantee that join() stalled the thread, but it
// improves confidence.
using namespace std::chrono_literals;
std::this_thread::sleep_for (1ms);
std::this_thread::sleep_for (5ms);
BEAST_EXPECT (threadExited == false);
// Destroy the Job and expect the thread to exit (asynchronously).
@@ -119,4 +127,5 @@ public:
BEAST_DEFINE_TESTSUITE(JobCounter, core, ripple);
}
} // test
} // ripple

View File

@@ -0,0 +1,154 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2017 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/core/JobQueue.h>
#include <ripple/beast/unit_test.h>
#include <test/jtx/Env.h>
namespace ripple {
namespace test {
//------------------------------------------------------------------------------
class JobQueue_test : public beast::unit_test::suite
{
void testAddJob()
{
jtx::Env env {*this};
JobQueue& jQueue = env.app().getJobQueue();
{
// addJob() should run the Job (and return true).
std::atomic<bool> jobRan {false};
BEAST_EXPECT (jQueue.addJob (jtCLIENT, "JobAddTest1",
[&jobRan] (Job&) { jobRan = true; }) == true);
// Wait for the Job to run.
while (jobRan == false);
}
{
// If the JobQueue's JobCounter is join()ed we should no
// longer be able to add Jobs (and calling addJob() should
// return false).
using namespace std::chrono_literals;
beast::Journal j {env.app().journal ("JobQueue_test")};
JobCounter& jCounter = jQueue.jobCounter();
jCounter.join("JobQueue_test", 1s, j);
// The Job should never run, so having the Job access this
// unprotected variable on the stack should be completely safe.
// Not recommended for the faint of heart...
bool unprotected;
BEAST_EXPECT (jQueue.addJob (jtCLIENT, "JobAddTest2",
[&unprotected] (Job&) { unprotected = false; }) == false);
}
}
void testPostCoro()
{
jtx::Env env {*this};
JobQueue& jQueue = env.app().getJobQueue();
{
// Test repeated post()s until the Coro completes.
std::atomic<int> yieldCount {0};
auto const coro = jQueue.postCoro (jtCLIENT, "PostCoroTest1",
[&yieldCount] (std::shared_ptr<JobQueue::Coro> const& coroCopy)
{
while (++yieldCount < 4)
coroCopy->yield();
});
BEAST_EXPECT (coro != nullptr);
// Wait for the Job to run and yield.
while (yieldCount == 0);
// Now re-post until the Coro says it is done.
int old = yieldCount;
while (coro->runnable())
{
BEAST_EXPECT (coro->post());
while (old == yieldCount) { }
coro->join();
BEAST_EXPECT (++old == yieldCount);
}
BEAST_EXPECT (yieldCount == 4);
}
{
// Test repeated resume()s until the Coro completes.
int yieldCount {0};
auto const coro = jQueue.postCoro (jtCLIENT, "PostCoroTest2",
[&yieldCount] (std::shared_ptr<JobQueue::Coro> const& coroCopy)
{
while (++yieldCount < 4)
coroCopy->yield();
});
if (! coro)
{
// There's no good reason we should not get a Coro, but we
// can't continue without one.
BEAST_EXPECT (false);
return;
}
// Wait for the Job to run and yield.
coro->join();
// Now resume until the Coro says it is done.
int old = yieldCount;
while (coro->runnable())
{
coro->resume(); // Resume runs synchronously on this thread.
BEAST_EXPECT (++old == yieldCount);
}
BEAST_EXPECT (yieldCount == 4);
}
{
// If the JobQueue's JobCounter is join()ed we should no
// longer be able to add a Coro (and calling postCoro() should
// return false).
using namespace std::chrono_literals;
beast::Journal j {env.app().journal ("JobQueue_test")};
JobCounter& jCounter = jQueue.jobCounter();
jCounter.join("JobQueue_test", 1s, j);
// The Coro should never run, so having the Coro access this
// unprotected variable on the stack should be completely safe.
// Not recommended for the faint of heart...
bool unprotected;
auto const coro = jQueue.postCoro (jtCLIENT, "PostCoroTest3",
[&unprotected] (std::shared_ptr<JobQueue::Coro> const&)
{ unprotected = false; });
BEAST_EXPECT (coro == nullptr);
}
}
public:
void run()
{
testAddJob();
testPostCoro();
}
};
BEAST_DEFINE_TESTSUITE(JobQueue, core, ripple);
} // test
} // ripple

View File

@@ -23,6 +23,7 @@
#include <test/core/CryptoPRNG_test.cpp>
#include <test/core/DeadlineTimer_test.cpp>
#include <test/core/JobCounter_test.cpp>
#include <test/core/JobQueue_test.cpp>
#include <test/core/SociDB_test.cpp>
#include <test/core/Stoppable_test.cpp>
#include <test/core/TerminateHandler_test.cpp>