From c2a08a1f263abcf41e16f147d0c04e680db01dcf Mon Sep 17 00:00:00 2001 From: John Freeman Date: Tue, 16 Nov 2021 10:55:52 -0600 Subject: [PATCH] Simplify the Job Queue: This is a refactor aimed at cleaning up and simplifying the existing job queue. As of now, all jobs are cancelled at the same time and in the same way, so this commit removes the per-job cancellation token. If the need for such support is demonstrated, support can be re-added. * Revise documentation for ClosureCounter and Workers. * Simplify code, removing unnecessary function arguments and deduplicating expressions * Restructure job handlers to no longer need to pass a job's handle to the job. --- src/ripple/app/consensus/RCLConsensus.cpp | 8 +-- src/ripple/app/consensus/RCLValidations.cpp | 2 +- src/ripple/app/ledger/ConsensusTransSetSF.cpp | 7 +- src/ripple/app/ledger/Ledger.cpp | 7 +- src/ripple/app/ledger/LedgerMaster.h | 4 +- src/ripple/app/ledger/OrderBookDB.cpp | 2 +- src/ripple/app/ledger/impl/InboundLedger.cpp | 2 +- src/ripple/app/ledger/impl/InboundLedgers.cpp | 4 +- .../app/ledger/impl/LedgerDeltaAcquire.cpp | 2 +- src/ripple/app/ledger/impl/LedgerMaster.cpp | 21 +++--- src/ripple/app/ledger/impl/TimeoutCounter.cpp | 2 +- .../app/ledger/impl/TransactionAcquire.cpp | 2 +- src/ripple/app/main/Application.cpp | 2 +- src/ripple/app/main/NodeStoreScheduler.cpp | 2 +- src/ripple/app/misc/NetworkOPs.cpp | 24 ++++--- src/ripple/app/paths/PathRequests.cpp | 8 +-- src/ripple/app/paths/PathRequests.h | 5 +- src/ripple/core/ClosureCounter.h | 16 ++--- src/ripple/core/Coro.ipp | 2 +- src/ripple/core/Job.h | 25 +------ src/ripple/core/JobQueue.h | 22 +------ src/ripple/core/impl/Job.cpp | 29 +-------- src/ripple/core/impl/JobQueue.cpp | 65 ++++++++----------- src/ripple/core/impl/SociDB.cpp | 2 +- src/ripple/core/impl/Workers.h | 31 ++++++--- src/ripple/net/impl/RPCSub.cpp | 2 +- src/ripple/overlay/impl/PeerImp.cpp | 28 ++++---- src/ripple/overlay/impl/PeerImp.h | 2 +- src/ripple/rpc/impl/ShardArchiveHandler.cpp | 65 +++++++++---------- src/test/core/Coroutine_test.cpp | 4 +- src/test/core/JobQueue_test.cpp | 9 ++- 31 files changed, 164 insertions(+), 242 deletions(-) diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index 859c66d315..be8f2af8ca 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -131,9 +131,7 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash) acquiringLedger_ = hash; app_.getJobQueue().addJob( - jtADVANCE, - "getConsensusLedger", - [id = hash, &app = app_](Job&) { + jtADVANCE, "getConsensusLedger", [id = hash, &app = app_]() { app.getInboundLedgers().acquire( id, 0, InboundLedger::Reason::CONSENSUS); }); @@ -423,9 +421,7 @@ RCLConsensus::Adaptor::onAccept( Json::Value&& consensusJson) { app_.getJobQueue().addJob( - jtACCEPT, - "acceptLedger", - [=, cj = std::move(consensusJson)](auto&) mutable { + jtACCEPT, "acceptLedger", [=, cj = std::move(consensusJson)]() mutable { // Note that no lock is held or acquired during this job. // This is because generic Consensus guarantees that once a ledger // is accepted, the consensus results and capture by reference state diff --git a/src/ripple/app/consensus/RCLValidations.cpp b/src/ripple/app/consensus/RCLValidations.cpp index 0fcf0660bf..ab9391385d 100644 --- a/src/ripple/app/consensus/RCLValidations.cpp +++ b/src/ripple/app/consensus/RCLValidations.cpp @@ -135,7 +135,7 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash) Application* pApp = &app_; app_.getJobQueue().addJob( - jtADVANCE, "getConsensusLedger", [pApp, hash](Job&) { + jtADVANCE, "getConsensusLedger", [pApp, hash]() { pApp->getInboundLedgers().acquire( hash, 0, InboundLedger::Reason::CONSENSUS); }); diff --git a/src/ripple/app/ledger/ConsensusTransSetSF.cpp b/src/ripple/app/ledger/ConsensusTransSetSF.cpp index 881424ff1b..997a2aee14 100644 --- a/src/ripple/app/ledger/ConsensusTransSetSF.cpp +++ b/src/ripple/app/ledger/ConsensusTransSetSF.cpp @@ -62,10 +62,9 @@ ConsensusTransSetSF::gotNode( auto stx = std::make_shared(std::ref(sit)); assert(stx->getTransactionID() == nodeHash.as_uint256()); auto const pap = &app_; - app_.getJobQueue().addJob( - jtTRANSACTION, "TXS->TXN", [pap, stx](Job&) { - pap->getOPs().submitTransaction(stx); - }); + app_.getJobQueue().addJob(jtTRANSACTION, "TXS->TXN", [pap, stx]() { + pap->getOPs().submitTransaction(stx); + }); } catch (std::exception const&) { diff --git a/src/ripple/app/ledger/Ledger.cpp b/src/ripple/app/ledger/Ledger.cpp index 4a12a39992..cac9c40bb7 100644 --- a/src/ripple/app/ledger/Ledger.cpp +++ b/src/ripple/app/ledger/Ledger.cpp @@ -981,10 +981,9 @@ pendSaveValidated( // See if we can use the JobQueue. if (!isSynchronous && - app.getJobQueue().addJob( - jobType, jobName, [&app, ledger, isCurrent](Job&) { - saveValidatedLedger(app, ledger, isCurrent); - })) + app.getJobQueue().addJob(jobType, jobName, [&app, ledger, isCurrent]() { + saveValidatedLedger(app, ledger, isCurrent); + })) { return true; } diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h index eb296f0deb..dbb01f54a4 100644 --- a/src/ripple/app/ledger/LedgerMaster.h +++ b/src/ripple/app/ledger/LedgerMaster.h @@ -301,7 +301,7 @@ private: setPubLedger(std::shared_ptr const& l); void - tryFill(Job& job, std::shared_ptr ledger); + tryFill(std::shared_ptr ledger); void getFetchPack(LedgerIndex missing, InboundLedger::Reason reason); @@ -326,7 +326,7 @@ private: findNewLedgersToPublish(std::unique_lock&); void - updatePaths(Job& job); + updatePaths(); // Returns true if work started. Always called with m_mutex locked. // The passed lock is a reminder to callers. diff --git a/src/ripple/app/ledger/OrderBookDB.cpp b/src/ripple/app/ledger/OrderBookDB.cpp index 4b598de1b4..b9f72b7152 100644 --- a/src/ripple/app/ledger/OrderBookDB.cpp +++ b/src/ripple/app/ledger/OrderBookDB.cpp @@ -68,7 +68,7 @@ OrderBookDB::setup(std::shared_ptr const& ledger) update(ledger); else app_.getJobQueue().addJob( - jtUPDATE_PF, "OrderBookDB::update", [this, ledger](Job&) { + jtUPDATE_PF, "OrderBookDB::update", [this, ledger]() { update(ledger); }); } diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index f961d28b76..979c145441 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -527,7 +527,7 @@ InboundLedger::done() // We hold the PeerSet lock, so must dispatch app_.getJobQueue().addJob( - jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()](Job&) { + jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() { if (self->complete_ && !self->failed_) { self->app_.getLedgerMaster().checkAccept(self->getLedger()); diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp index 8ee3443a27..76681ea0a9 100644 --- a/src/ripple/app/ledger/impl/InboundLedgers.cpp +++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp @@ -183,7 +183,7 @@ public: // dispatch if (ledger->gotData(std::weak_ptr(peer), packet)) app_.getJobQueue().addJob( - jtLEDGER_DATA, "processLedgerData", [ledger](Job&) { + jtLEDGER_DATA, "processLedgerData", [ledger]() { ledger->runData(); }); @@ -198,7 +198,7 @@ public: if (packet->type() == protocol::liAS_NODE) { app_.getJobQueue().addJob( - jtLEDGER_DATA, "gotStaleData", [this, packet](Job&) { + jtLEDGER_DATA, "gotStaleData", [this, packet]() { gotStaleData(packet); }); } diff --git a/src/ripple/app/ledger/impl/LedgerDeltaAcquire.cpp b/src/ripple/app/ledger/impl/LedgerDeltaAcquire.cpp index 122915eed9..3c19c6ee15 100644 --- a/src/ripple/app/ledger/impl/LedgerDeltaAcquire.cpp +++ b/src/ripple/app/ledger/impl/LedgerDeltaAcquire.cpp @@ -240,7 +240,7 @@ LedgerDeltaAcquire::onLedgerBuilt( app_.getJobQueue().addJob( jtREPLAY_TASK, "onLedgerBuilt", - [=, ledger = this->fullLedger_, &app = this->app_](Job&) { + [=, ledger = this->fullLedger_, &app = this->app_]() { for (auto reason : reasons) { switch (reason) diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 4cc1958154..0dd0ba1eec 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -699,7 +699,7 @@ LedgerMaster::getEarliestFetch() } void -LedgerMaster::tryFill(Job& job, std::shared_ptr ledger) +LedgerMaster::tryFill(std::shared_ptr ledger) { std::uint32_t seq = ledger->info().seq; uint256 prevHash = ledger->info().parentHash; @@ -710,7 +710,7 @@ LedgerMaster::tryFill(Job& job, std::shared_ptr ledger) std::uint32_t maxHas = seq; NodeStore::Database& nodeStore{app_.getNodeStore()}; - while (!job.shouldCancel() && seq > 0) + while (!app_.getJobQueue().isStopping() && seq > 0) { { std::lock_guard ml(m_mutex); @@ -1453,7 +1453,7 @@ LedgerMaster::tryAdvance() if (!mAdvanceThread && !mValidLedger.empty()) { mAdvanceThread = true; - app_.getJobQueue().addJob(jtADVANCE, "advanceLedger", [this](Job&) { + app_.getJobQueue().addJob(jtADVANCE, "advanceLedger", [this]() { std::unique_lock sl(m_mutex); assert(!mValidLedger.empty() && mAdvanceThread); @@ -1476,7 +1476,7 @@ LedgerMaster::tryAdvance() } void -LedgerMaster::updatePaths(Job& job) +LedgerMaster::updatePaths() { { std::lock_guard ml(m_mutex); @@ -1487,7 +1487,7 @@ LedgerMaster::updatePaths(Job& job) } } - while (!job.shouldCancel()) + while (!app_.getJobQueue().isStopping()) { std::shared_ptr lastLedger; { @@ -1527,8 +1527,7 @@ LedgerMaster::updatePaths(Job& job) try { - app_.getPathRequests().updateAll( - lastLedger, job.getCancelCallback()); + app_.getPathRequests().updateAll(lastLedger); } catch (SHAMapMissingNode const& mn) { @@ -1591,7 +1590,7 @@ LedgerMaster::newPFWork( if (mPathFindThread < 2) { if (app_.getJobQueue().addJob( - jtUPDATE_PF, name, [this](Job& j) { updatePaths(j); })) + jtUPDATE_PF, name, [this]() { updatePaths(); })) { ++mPathFindThread; } @@ -1942,8 +1941,8 @@ LedgerMaster::fetchForHistory( mFillInProgress = seq; } app_.getJobQueue().addJob( - jtADVANCE, "tryFill", [this, ledger](Job& j) { - tryFill(j, ledger); + jtADVANCE, "tryFill", [this, ledger]() { + tryFill(ledger); }); } } @@ -2124,7 +2123,7 @@ LedgerMaster::gotFetchPack(bool progress, std::uint32_t seq) { if (!mGotFetchPackThread.test_and_set(std::memory_order_acquire)) { - app_.getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack", [&](Job&) { + app_.getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack", [&]() { app_.getInboundLedgers().gotFetchPack(); mGotFetchPackThread.clear(std::memory_order_release); }); diff --git a/src/ripple/app/ledger/impl/TimeoutCounter.cpp b/src/ripple/app/ledger/impl/TimeoutCounter.cpp index ff43a0e1af..9ea20c0638 100644 --- a/src/ripple/app/ledger/impl/TimeoutCounter.cpp +++ b/src/ripple/app/ledger/impl/TimeoutCounter.cpp @@ -83,7 +83,7 @@ TimeoutCounter::queueJob(ScopedLockType& sl) app_.getJobQueue().addJob( queueJobParameter_.jobType, queueJobParameter_.jobName, - [wptr = pmDowncast()](Job&) { + [wptr = pmDowncast()]() { if (auto sptr = wptr.lock(); sptr) sptr->invokeOnTimer(); }); diff --git a/src/ripple/app/ledger/impl/TransactionAcquire.cpp b/src/ripple/app/ledger/impl/TransactionAcquire.cpp index 3cd4a301e1..7d958cba86 100644 --- a/src/ripple/app/ledger/impl/TransactionAcquire.cpp +++ b/src/ripple/app/ledger/impl/TransactionAcquire.cpp @@ -81,7 +81,7 @@ TransactionAcquire::done() // just updates the consensus and related structures when we acquire // a transaction set. No need to update them if we're shutting down. app_.getJobQueue().addJob( - jtTXN_DATA, "completeAcquire", [pap, hash, map](Job&) { + jtTXN_DATA, "completeAcquire", [pap, hash, map]() { pap->getInboundTransactions().giveSet(hash, map, true); }); } diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 8040decea6..ac0b6c947a 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -1085,7 +1085,7 @@ public: if (e.value() == boost::system::errc::success) { m_jobQueue->addJob( - jtSWEEP, "sweep", [this](Job&) { doSweep(); }); + jtSWEEP, "sweep", [this]() { doSweep(); }); } // Recover as best we can if an unexpected error occurs. if (e.value() != boost::system::errc::success && diff --git a/src/ripple/app/main/NodeStoreScheduler.cpp b/src/ripple/app/main/NodeStoreScheduler.cpp index 0e6c14adbb..0ac8909641 100644 --- a/src/ripple/app/main/NodeStoreScheduler.cpp +++ b/src/ripple/app/main/NodeStoreScheduler.cpp @@ -32,7 +32,7 @@ NodeStoreScheduler::scheduleTask(NodeStore::Task& task) if (jobQueue_.isStopped()) return; - if (!jobQueue_.addJob(jtWRITE, "NodeObject::store", [&task](Job&) { + if (!jobQueue_.addJob(jtWRITE, "NodeObject::store", [&task]() { task.performScheduledTask(); })) { diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 574ce040bf..2b9c5f316b 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -949,7 +949,7 @@ NetworkOPsImp::setHeartbeatTimer() heartbeatTimer_, mConsensus.parms().ledgerGRANULARITY, [this]() { - m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this](Job&) { + m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() { processHeartbeatTimer(); }); }, @@ -964,7 +964,7 @@ NetworkOPsImp::setClusterTimer() clusterTimer_, 10s, [this]() { - m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this](Job&) { + m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() { processClusterTimer(); }); }, @@ -1153,7 +1153,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr const& iTrans) auto tx = std::make_shared(trans, reason, app_); - m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx](Job&) { + m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() { auto t = tx; processTransaction(t, false, false, FailHard::no); }); @@ -1224,9 +1224,8 @@ NetworkOPsImp::doTransactionAsync( if (mDispatchState == DispatchState::none) { - if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this](Job&) { - transactionBatch(); - })) + if (m_job_queue.addJob( + jtBATCH, "transactionBatch", [this]() { transactionBatch(); })) { mDispatchState = DispatchState::scheduled; } @@ -1262,10 +1261,9 @@ NetworkOPsImp::doTransactionSync( if (mTransactions.size()) { // More transactions need to be applied, but by another job. - if (m_job_queue.addJob( - jtBATCH, "transactionBatch", [this](Job&) { - transactionBatch(); - })) + if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() { + transactionBatch(); + })) { mDispatchState = DispatchState::scheduled; } @@ -2941,7 +2939,7 @@ NetworkOPsImp::reportFeeChange() if (f != mLastFeeSummary) { m_job_queue.addJob( - jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this](Job&) { + jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() { pubServer(); }); } @@ -2953,7 +2951,7 @@ NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase) m_job_queue.addJob( jtCLIENT_CONSENSUS, "reportConsensusStateChange->pubConsensus", - [this, phase](Job&) { pubConsensus(phase); }); + [this, phase]() { pubConsensus(phase); }); } inline void @@ -3346,7 +3344,7 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo) app_.getJobQueue().addJob( jtCLIENT_ACCT_HIST, "AccountHistoryTxStream", - [this, dbType = databaseType, subInfo](Job&) { + [this, dbType = databaseType, subInfo]() { auto const& accountId = subInfo.index_->accountId_; auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_; auto& txHistoryIndex = subInfo.index_->historyTxIndex_; diff --git a/src/ripple/app/paths/PathRequests.cpp b/src/ripple/app/paths/PathRequests.cpp index fe78f360fe..50e591eb1b 100644 --- a/src/ripple/app/paths/PathRequests.cpp +++ b/src/ripple/app/paths/PathRequests.cpp @@ -55,9 +55,7 @@ PathRequests::getLineCache( } void -PathRequests::updateAll( - std::shared_ptr const& inLedger, - Job::CancelCallback shouldCancel) +PathRequests::updateAll(std::shared_ptr const& inLedger) { auto event = app_.getJobQueue().makeLoadEvent(jtPATH_FIND, "PathRequest::updateAll"); @@ -84,7 +82,7 @@ PathRequests::updateAll( { for (auto const& wr : requests) { - if (shouldCancel()) + if (app_.getJobQueue().isStopping()) break; auto request = wr.lock(); @@ -174,7 +172,7 @@ PathRequests::updateAll( requests = requests_; cache = getLineCache(cache->getLedger(), false); } - } while (!shouldCancel()); + } while (!app_.getJobQueue().isStopping()); JLOG(mJournal.debug()) << "updateAll complete: " << processed << " processed and " << removed << " removed"; diff --git a/src/ripple/app/paths/PathRequests.h b/src/ripple/app/paths/PathRequests.h index d95ab9c5d4..75b852867a 100644 --- a/src/ripple/app/paths/PathRequests.h +++ b/src/ripple/app/paths/PathRequests.h @@ -47,12 +47,9 @@ public: /** Update all of the contained PathRequest instances. @param ledger Ledger we are pathfinding in. - @param shouldCancel Invocable that returns whether to cancel. */ void - updateAll( - std::shared_ptr const& ledger, - Job::CancelCallback shouldCancel); + updateAll(std::shared_ptr const& ledger); std::shared_ptr getLineCache( diff --git a/src/ripple/core/ClosureCounter.h b/src/ripple/core/ClosureCounter.h index 7c0d36d9f3..9f5bc3ee61 100644 --- a/src/ripple/core/ClosureCounter.h +++ b/src/ripple/core/ClosureCounter.h @@ -31,21 +31,21 @@ namespace ripple { /** * The role of a `ClosureCounter` is to assist in shutdown by letting callers - * wait for the completion of callbacks (of a single type signature) that they - * previously scheduled. The lifetime of a `ClosureCounter` consists of two + * wait for the completion of closures (of a specific type signature) that they + * previously registered. These closures are typically callbacks for + * asynchronous operations. The lifetime of a `ClosureCounter` consists of two * phases: the initial expanding "fork" phase, and the subsequent shrinking * "join" phase. * - * In the fork phase, callers register a callback by passing the callback and + * In the fork phase, callers register a closure by passing the closure and * receiving a substitute in return. The substitute has the same callable - * interface as the callback, and it informs the `ClosureCounter` whenever it + * interface as the closure, and it informs the `ClosureCounter` whenever it * is copied or destroyed, so that it can keep an accurate count of copies. * * The transition to the join phase is made by a call to `join`. In this - * phase, every substitute returned going forward will be empty, signaling to - * the caller that they should just drop the callback and cancel their - * asynchronous operation. `join` blocks until all existing callback - * substitutes are destroyed. + * phase, every substitute returned going forward will be null, signaling to + * the caller that they should drop the closure and cancel their operation. + * `join` blocks until all existing closure substitutes are destroyed. * * \tparam Ret_t The return type of the closure. * \tparam Args_t The argument types of the closure. diff --git a/src/ripple/core/Coro.ipp b/src/ripple/core/Coro.ipp index 0234f90926..2b07c5a458 100644 --- a/src/ripple/core/Coro.ipp +++ b/src/ripple/core/Coro.ipp @@ -77,7 +77,7 @@ JobQueue::Coro::post() // sp keeps 'this' alive if (jq_.addJob( - type_, name_, [this, sp = shared_from_this()](Job&) { resume(); })) + type_, name_, [this, sp = shared_from_this()]() { resume(); })) { return true; } diff --git a/src/ripple/core/Job.h b/src/ripple/core/Job.h index 66aa0d051f..0f3bb718bb 100644 --- a/src/ripple/core/Job.h +++ b/src/ripple/core/Job.h @@ -109,43 +109,25 @@ public: // Job(); - // Job (Job const& other); - Job(JobType type, std::uint64_t index); - /** A callback used to check for canceling a job. */ - using CancelCallback = std::function; - // VFALCO TODO try to remove the dependency on LoadMonitor. Job(JobType type, std::string const& name, std::uint64_t index, LoadMonitor& lm, - std::function const& job, - CancelCallback cancelCallback); - - // Job& operator= (Job const& other); + std::function const& job); JobType getType() const; - CancelCallback - getCancelCallback() const; - /** Returns the time when the job was queued. */ clock_type::time_point const& queue_time() const; - /** Returns `true` if the running job should make a best-effort cancel. */ - bool - shouldCancel() const; - void doJob(); - void - rename(std::string const& n); - // These comparison operators make the jobs sort in priority order // in the job set bool @@ -158,16 +140,15 @@ public: operator>=(const Job& j) const; private: - CancelCallback m_cancelCallback; JobType mType; std::uint64_t mJobIndex; - std::function mJob; + std::function mJob; std::shared_ptr m_loadEvent; std::string mName; clock_type::time_point m_queue_time; }; -using JobCounter = ClosureCounter; +using JobCounter = ClosureCounter; } // namespace ripple diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h index a93d68b859..a9d541baa7 100644 --- a/src/ripple/core/JobQueue.h +++ b/src/ripple/core/JobQueue.h @@ -138,7 +138,7 @@ public: join(); }; - using JobFunction = std::function; + using JobFunction = std::function; JobQueue( int threadCount, @@ -160,7 +160,7 @@ public: template < typename JobHandler, typename = std::enable_if_t()(std::declval())), + decltype(std::declval()()), void>::value>> bool addJob(JobType type, std::string const& name, JobHandler&& jobHandler) @@ -259,7 +259,6 @@ private: int nSuspend_ = 0; Workers m_workers; - Job::CancelCallback m_cancelCallback; // Statistics tracking perf::PerfLog& perfLog_; @@ -288,23 +287,6 @@ private: std::string const& name, JobFunction const& func); - // Signals an added Job for processing. - // - // Pre-conditions: - // The JobType must be valid. - // The Job must exist in mJobSet. - // The Job must not have previously been queued. - // - // Post-conditions: - // Count of waiting jobs of that type will be incremented. - // If JobQueue exists, and has at least one thread, Job will eventually - // run. - // - // Invariants: - // The calling thread owns the JobLock - void - queueJob(Job const& job, std::lock_guard const& lock); - // Returns the next Job we should run now. // // RunnableJob: diff --git a/src/ripple/core/impl/Job.cpp b/src/ripple/core/impl/Job.cpp index a9b82ccf33..780a9f49cd 100644 --- a/src/ripple/core/impl/Job.cpp +++ b/src/ripple/core/impl/Job.cpp @@ -36,10 +36,8 @@ Job::Job( std::string const& name, std::uint64_t index, LoadMonitor& lm, - std::function const& job, - CancelCallback cancelCallback) - : m_cancelCallback(cancelCallback) - , mType(type) + std::function const& job) + : mType(type) , mJobIndex(index) , mJob(job) , mName(name) @@ -54,27 +52,12 @@ Job::getType() const return mType; } -Job::CancelCallback -Job::getCancelCallback() const -{ - assert(m_cancelCallback); - return m_cancelCallback; -} - Job::clock_type::time_point const& Job::queue_time() const { return m_queue_time; } -bool -Job::shouldCancel() const -{ - if (m_cancelCallback) - return m_cancelCallback(); - return false; -} - void Job::doJob() { @@ -82,19 +65,13 @@ Job::doJob() m_loadEvent->start(); m_loadEvent->setName(mName); - mJob(*this); + mJob(); // Destroy the lambda, otherwise we won't include // its duration in the time measurement mJob = nullptr; } -void -Job::rename(std::string const& newName) -{ - mName = newName; -} - bool Job::operator>(const Job& j) const { diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index 85dcec1ca6..28947300cc 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -35,7 +35,6 @@ JobQueue::JobQueue( , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs) , m_processCount(0) , m_workers(*this, &perfLog, "JobQueue", threadCount) - , m_cancelCallback(std::bind(&JobQueue::isStopping, this)) , perfLog_(perfLog) , m_collector(collector) { @@ -100,9 +99,27 @@ JobQueue::addRefCountedJob( { std::lock_guard lock(m_mutex); - auto result = m_jobSet.emplace( - type, name, ++m_lastJob, data.load(), func, m_cancelCallback); - queueJob(*result.first, lock); + auto result = + m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func); + auto const& job = *result.first; + + JobType const type(job.getType()); + assert(type != jtINVALID); + assert(m_jobSet.find(job) != m_jobSet.end()); + perfLog_.jobQueue(type); + + JobTypeData& data(getJobTypeData(type)); + + if (data.waiting + data.running < getJobLimit(type)) + { + m_workers.addTask(); + } + else + { + // defer the task until we go below the limit + ++data.deferred; + } + ++data.waiting; } return true; } @@ -282,29 +299,6 @@ JobQueue::isStopped() const return stopped_; } -void -JobQueue::queueJob(Job const& job, std::lock_guard const& lock) -{ - JobType const type(job.getType()); - assert(type != jtINVALID); - assert(m_jobSet.find(job) != m_jobSet.end()); - perfLog_.jobQueue(type); - - JobTypeData& data(getJobTypeData(type)); - - if (data.waiting + data.running < getJobLimit(type)) - { - m_workers.addTask(); - } - else - { - // defer the task until we go below the limit - // - ++data.deferred; - } - ++data.waiting; -} - void JobQueue::getNextJob(Job& job) { @@ -313,30 +307,25 @@ JobQueue::getNextJob(Job& job) std::set::const_iterator iter; for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter) { - JobTypeData& data(getJobTypeData(iter->getType())); + JobType const type = iter->getType(); + assert(type != jtINVALID); - assert(data.running <= getJobLimit(data.type())); + JobTypeData& data(getJobTypeData(type)); + assert(data.running <= getJobLimit(type)); // Run this job if we're running below the limit. if (data.running < getJobLimit(data.type())) { assert(data.waiting > 0); + --data.waiting; + ++data.running; break; } } assert(iter != m_jobSet.end()); - - JobType const type = iter->getType(); - JobTypeData& data(getJobTypeData(type)); - - assert(type != jtINVALID); - job = *iter; m_jobSet.erase(iter); - - --data.waiting; - ++data.running; } void diff --git a/src/ripple/core/impl/SociDB.cpp b/src/ripple/core/impl/SociDB.cpp index 419784c129..2d13326360 100644 --- a/src/ripple/core/impl/SociDB.cpp +++ b/src/ripple/core/impl/SociDB.cpp @@ -254,7 +254,7 @@ public: // There is a separate check in `checkpoint` for a valid // connection in the rare case when the DatabaseCon is destroyed // after locking this weak_ptr - [wp = std::weak_ptr{shared_from_this()}](Job&) { + [wp = std::weak_ptr{shared_from_this()}]() { if (auto self = wp.lock()) self->checkpoint(); })) diff --git a/src/ripple/core/impl/Workers.h b/src/ripple/core/impl/Workers.h index 0abbbe429c..88ab8fa27e 100644 --- a/src/ripple/core/impl/Workers.h +++ b/src/ripple/core/impl/Workers.h @@ -37,24 +37,35 @@ class PerfLog; /** * `Workers` is effectively a thread pool. The constructor takes a "callback" * that has a `void processTask(int instance)` method, and a number of - * workers. It creates that many Workers and then waits for calls to + * workers. It creates that many `Worker`s and then waits for calls to * `Workers::addTask()`. It holds a semaphore that counts the number of - * waiting tasks, and a condition variable for the event when the last worker - * pauses itself. + * pending "tasks", and a condition variable for the event when the last + * worker pauses itself. + * + * A "task" is just a call to the callback's `processTask` method. + * "Adding a task" means calling that method now, or remembering to call it in + * the future. + * This is implemented with a semaphore. + * If there are any workers waiting when a task is added, then one will be + * woken to claim the task. + * If not, then the next worker to wait on the semaphore will claim the task. * * Creating a `Worker` creates a thread that calls `Worker::run()`. When that * thread enters `Worker::run`, it increments the count of active workers in - * the parent `Workers` object and then blocks on the semaphore if there are - * no waiting tasks. It will be unblocked whenever the number of waiting tasks - * is incremented. That only happens in two circumstances: (1) when + * the parent `Workers` object and then tries to claim a task, which blocks if + * there are none pending. + * It will be unblocked whenever the semaphore is notified (i.e. when the + * number of pending tasks is incremented). + * That only happens in two circumstances: (1) when * `Workers::addTask` is called and (2) when `Workers` wants to pause some * workers ("pause one worker" is considered one task), which happens when * someone wants to stop the workers or shrink the threadpool. No worker * threads are ever destroyed until `Workers` is destroyed; it merely pauses * workers until then. * - * When an idle worker is woken, it checks whether `Workers` is trying to pause - * workers. If so, it adds itself to the set of paused workers and blocks on + * When a waiting worker is woken, it checks whether `Workers` is trying to + * pause workers. If so, it changes its status from active to paused and + * blocks on * its own condition variable. If not, then it calls `processTask` on the * "callback" held by `Workers`. * @@ -62,8 +73,8 @@ class PerfLog; * to exit is only set in the destructor of `Worker`, which unblocks the * paused thread and waits for it to exit. A `Worker::run` thread checks * whether it needs to exit only when it is woken from a pause (not when it is - * woken from idle). This is why the destructor for `Workers` pauses all the - * workers before destroying them. + * woken from waiting). This is why the destructor for `Workers` pauses all + * the workers before destroying them. */ class Workers { diff --git a/src/ripple/net/impl/RPCSub.cpp b/src/ripple/net/impl/RPCSub.cpp index f65f9a3614..8b052e817c 100644 --- a/src/ripple/net/impl/RPCSub.cpp +++ b/src/ripple/net/impl/RPCSub.cpp @@ -96,7 +96,7 @@ public: JLOG(j_.info()) << "RPCCall::fromNetwork start"; mSending = m_jobQueue.addJob( - jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this](Job&) { + jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() { sendThread(); }); } diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 7a15109967..6f05328212 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1070,7 +1070,7 @@ PeerImp::onMessage(std::shared_ptr const& m) // VFALCO What's the right job type? auto that = shared_from_this(); app_.getJobQueue().addJob( - jtVALIDATION_ut, "receiveManifests", [this, that, m](Job&) { + jtVALIDATION_ut, "receiveManifests", [this, that, m]() { overlay_.onManifests(m, that); }); } @@ -1608,7 +1608,7 @@ PeerImp::handleTransaction( [weak = std::weak_ptr(shared_from_this()), flags, checkSignature, - stx](Job&) { + stx]() { if (auto peer = weak.lock()) peer->checkTransaction(flags, checkSignature, stx); }); @@ -1711,7 +1711,7 @@ PeerImp::onMessage(std::shared_ptr const& m) // Queue a job to process the request std::weak_ptr weak = shared_from_this(); - app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m](Job&) { + app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() { if (auto peer = weak.lock()) peer->processLedgerRequest(m); }); @@ -1730,7 +1730,7 @@ PeerImp::onMessage(std::shared_ptr const& m) fee_ = Resource::feeMediumBurdenPeer; std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( - jtREPLAY_REQ, "recvProofPathRequest", [weak, m](Job&) { + jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() { if (auto peer = weak.lock()) { auto reply = @@ -1779,7 +1779,7 @@ PeerImp::onMessage(std::shared_ptr const& m) fee_ = Resource::feeMediumBurdenPeer; std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( - jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m](Job&) { + jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() { if (auto peer = weak.lock()) { auto reply = @@ -1900,7 +1900,7 @@ PeerImp::onMessage(std::shared_ptr const& m) { std::weak_ptr weak{shared_from_this()}; app_.getJobQueue().addJob( - jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m](Job&) { + jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() { if (auto peer = weak.lock()) { peer->app_.getInboundTransactions().gotData( @@ -2013,9 +2013,9 @@ PeerImp::onMessage(std::shared_ptr const& m) app_.getJobQueue().addJob( isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose", - [weak, m, proposal](Job& job) { + [weak, isTrusted, m, proposal]() { if (auto peer = weak.lock()) - peer->checkPropose(job, m, proposal); + peer->checkPropose(isTrusted, m, proposal); }); } @@ -2602,7 +2602,7 @@ PeerImp::onMessage(std::shared_ptr const& m) app_.getJobQueue().addJob( isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, "recvValidation->checkValidation", - [weak, val, m](Job&) { + [weak, val, m]() { if (auto peer = weak.lock()) peer->checkValidation(val, m); }); @@ -2655,7 +2655,7 @@ PeerImp::onMessage(std::shared_ptr const& m) std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( - jtREQUESTED_TXN, "doTransactions", [weak, m](Job&) { + jtREQUESTED_TXN, "doTransactions", [weak, m]() { if (auto peer = weak.lock()) peer->doTransactions(m); }); @@ -2795,7 +2795,7 @@ PeerImp::onMessage(std::shared_ptr const& m) std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( - jtMISSING_TXN, "handleHaveTransactions", [weak, m](Job&) { + jtMISSING_TXN, "handleHaveTransactions", [weak, m]() { if (auto peer = weak.lock()) peer->handleHaveTransactions(m); }); @@ -2975,7 +2975,7 @@ PeerImp::doFetchPack(const std::shared_ptr& packet) auto elapsed = UptimeClock::now(); auto const pap = &app_; app_.getJobQueue().addJob( - jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed](Job&) { + jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() { pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed); }); } @@ -3111,12 +3111,10 @@ PeerImp::checkTransaction( // Called from our JobQueue void PeerImp::checkPropose( - Job& job, + bool isTrusted, std::shared_ptr const& packet, RCLCxPeerPos peerPos) { - bool isTrusted = (job.getType() == jtPROPOSAL_t); - JLOG(p_journal_.trace()) << "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal"; diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index f7bfbbc9d6..8bed64e724 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -619,7 +619,7 @@ private: void checkPropose( - Job& job, + bool isTrusted, std::shared_ptr const& packet, RCLCxPeerPos peerPos); diff --git a/src/ripple/rpc/impl/ShardArchiveHandler.cpp b/src/ripple/rpc/impl/ShardArchiveHandler.cpp index c52c1b5017..efa422c8bf 100644 --- a/src/ripple/rpc/impl/ShardArchiveHandler.cpp +++ b/src/ripple/rpc/impl/ShardArchiveHandler.cpp @@ -360,7 +360,7 @@ ShardArchiveHandler::next(std::lock_guard const& l) // to prevent holding up the lock if the downloader // sleeps. auto const& url{archives_.begin()->second}; - auto wrapper = jobCounter_.wrap([this, url, dstDir](Job&) { + auto wrapper = jobCounter_.wrap([this, url, dstDir]() { auto const ssl = (url.scheme == "https"); auto const defaultPort = ssl ? 443 : 80; @@ -416,42 +416,41 @@ ShardArchiveHandler::complete(path dstPath) } // Make lambdas mutable captured vars can be moved from - auto wrapper = - jobCounter_.wrap([=, dstPath = std::move(dstPath)](Job&) mutable { - if (stopping_) - return; + auto wrapper = jobCounter_.wrap([=, + dstPath = std::move(dstPath)]() mutable { + if (stopping_) + return; - // If not synced then defer and retry - auto const mode{app_.getOPs().getOperatingMode()}; - if (mode != OperatingMode::FULL) - { - std::lock_guard lock(m_); - timer_.expires_from_now(static_cast( - (static_cast(OperatingMode::FULL) - - static_cast(mode)) * - 10)); + // If not synced then defer and retry + auto const mode{app_.getOPs().getOperatingMode()}; + if (mode != OperatingMode::FULL) + { + std::lock_guard lock(m_); + timer_.expires_from_now(static_cast( + (static_cast(OperatingMode::FULL) - + static_cast(mode)) * + 10)); - auto wrapper = timerCounter_.wrap( - [=, dstPath = std::move(dstPath)]( - boost::system::error_code const& ec) mutable { - if (ec != boost::asio::error::operation_aborted) - complete(std::move(dstPath)); - }); + auto wrapper = timerCounter_.wrap( + [=, dstPath = std::move(dstPath)]( + boost::system::error_code const& ec) mutable { + if (ec != boost::asio::error::operation_aborted) + complete(std::move(dstPath)); + }); - if (!wrapper) - onClosureFailed( - "failed to wrap closure for operating mode timer", - lock); - else - timer_.async_wait(*wrapper); - } + if (!wrapper) + onClosureFailed( + "failed to wrap closure for operating mode timer", lock); else - { - process(dstPath); - std::lock_guard lock(m_); - removeAndProceed(lock); - } - }); + timer_.async_wait(*wrapper); + } + else + { + process(dstPath); + std::lock_guard lock(m_); + removeAndProceed(lock); + } + }); if (!wrapper) { diff --git a/src/test/core/Coroutine_test.cpp b/src/test/core/Coroutine_test.cpp index 9446027849..8937255a75 100644 --- a/src/test/core/Coroutine_test.cpp +++ b/src/test/core/Coroutine_test.cpp @@ -127,7 +127,7 @@ public: BEAST_EXPECT(*lv == -1); gate g; - jq.addJob(jtCLIENT, "LocalValue-Test", [&](auto const& job) { + jq.addJob(jtCLIENT, "LocalValue-Test", [&]() { this->BEAST_EXPECT(*lv == -1); *lv = -2; this->BEAST_EXPECT(*lv == -2); @@ -166,7 +166,7 @@ public: c->join(); } - jq.addJob(jtCLIENT, "LocalValue-Test", [&](auto const& job) { + jq.addJob(jtCLIENT, "LocalValue-Test", [&]() { this->BEAST_EXPECT(*lv == -2); g.signal(); }); diff --git a/src/test/core/JobQueue_test.cpp b/src/test/core/JobQueue_test.cpp index d0c698099c..cba0217675 100644 --- a/src/test/core/JobQueue_test.cpp +++ b/src/test/core/JobQueue_test.cpp @@ -37,10 +37,9 @@ class JobQueue_test : public beast::unit_test::suite { // addJob() should run the Job (and return true). std::atomic jobRan{false}; - BEAST_EXPECT( - jQueue.addJob(jtCLIENT, "JobAddTest1", [&jobRan](Job&) { - jobRan = true; - }) == true); + BEAST_EXPECT(jQueue.addJob(jtCLIENT, "JobAddTest1", [&jobRan]() { + jobRan = true; + }) == true); // Wait for the Job to run. while (jobRan == false) @@ -58,7 +57,7 @@ class JobQueue_test : public beast::unit_test::suite // Not recommended for the faint of heart... bool unprotected; BEAST_EXPECT( - jQueue.addJob(jtCLIENT, "JobAddTest2", [&unprotected](Job&) { + jQueue.addJob(jtCLIENT, "JobAddTest2", [&unprotected]() { unprotected = false; }) == false); }