diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index 859c66d31..be8f2af8c 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -131,9 +131,7 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash) acquiringLedger_ = hash; app_.getJobQueue().addJob( - jtADVANCE, - "getConsensusLedger", - [id = hash, &app = app_](Job&) { + jtADVANCE, "getConsensusLedger", [id = hash, &app = app_]() { app.getInboundLedgers().acquire( id, 0, InboundLedger::Reason::CONSENSUS); }); @@ -423,9 +421,7 @@ RCLConsensus::Adaptor::onAccept( Json::Value&& consensusJson) { app_.getJobQueue().addJob( - jtACCEPT, - "acceptLedger", - [=, cj = std::move(consensusJson)](auto&) mutable { + jtACCEPT, "acceptLedger", [=, cj = std::move(consensusJson)]() mutable { // Note that no lock is held or acquired during this job. // This is because generic Consensus guarantees that once a ledger // is accepted, the consensus results and capture by reference state diff --git a/src/ripple/app/consensus/RCLValidations.cpp b/src/ripple/app/consensus/RCLValidations.cpp index 0fcf0660b..ab9391385 100644 --- a/src/ripple/app/consensus/RCLValidations.cpp +++ b/src/ripple/app/consensus/RCLValidations.cpp @@ -135,7 +135,7 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash) Application* pApp = &app_; app_.getJobQueue().addJob( - jtADVANCE, "getConsensusLedger", [pApp, hash](Job&) { + jtADVANCE, "getConsensusLedger", [pApp, hash]() { pApp->getInboundLedgers().acquire( hash, 0, InboundLedger::Reason::CONSENSUS); }); diff --git a/src/ripple/app/ledger/ConsensusTransSetSF.cpp b/src/ripple/app/ledger/ConsensusTransSetSF.cpp index 881424ff1..997a2aee1 100644 --- a/src/ripple/app/ledger/ConsensusTransSetSF.cpp +++ b/src/ripple/app/ledger/ConsensusTransSetSF.cpp @@ -62,10 +62,9 @@ ConsensusTransSetSF::gotNode( auto stx = std::make_shared(std::ref(sit)); assert(stx->getTransactionID() == nodeHash.as_uint256()); auto const pap = &app_; - app_.getJobQueue().addJob( - jtTRANSACTION, "TXS->TXN", [pap, stx](Job&) { - pap->getOPs().submitTransaction(stx); - }); + app_.getJobQueue().addJob(jtTRANSACTION, "TXS->TXN", [pap, stx]() { + pap->getOPs().submitTransaction(stx); + }); } catch (std::exception const&) { diff --git a/src/ripple/app/ledger/Ledger.cpp b/src/ripple/app/ledger/Ledger.cpp index 4a12a3999..cac9c40bb 100644 --- a/src/ripple/app/ledger/Ledger.cpp +++ b/src/ripple/app/ledger/Ledger.cpp @@ -981,10 +981,9 @@ pendSaveValidated( // See if we can use the JobQueue. if (!isSynchronous && - app.getJobQueue().addJob( - jobType, jobName, [&app, ledger, isCurrent](Job&) { - saveValidatedLedger(app, ledger, isCurrent); - })) + app.getJobQueue().addJob(jobType, jobName, [&app, ledger, isCurrent]() { + saveValidatedLedger(app, ledger, isCurrent); + })) { return true; } diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h index eb296f0de..dbb01f54a 100644 --- a/src/ripple/app/ledger/LedgerMaster.h +++ b/src/ripple/app/ledger/LedgerMaster.h @@ -301,7 +301,7 @@ private: setPubLedger(std::shared_ptr const& l); void - tryFill(Job& job, std::shared_ptr ledger); + tryFill(std::shared_ptr ledger); void getFetchPack(LedgerIndex missing, InboundLedger::Reason reason); @@ -326,7 +326,7 @@ private: findNewLedgersToPublish(std::unique_lock&); void - updatePaths(Job& job); + updatePaths(); // Returns true if work started. Always called with m_mutex locked. // The passed lock is a reminder to callers. diff --git a/src/ripple/app/ledger/OrderBookDB.cpp b/src/ripple/app/ledger/OrderBookDB.cpp index 4b598de1b..b9f72b715 100644 --- a/src/ripple/app/ledger/OrderBookDB.cpp +++ b/src/ripple/app/ledger/OrderBookDB.cpp @@ -68,7 +68,7 @@ OrderBookDB::setup(std::shared_ptr const& ledger) update(ledger); else app_.getJobQueue().addJob( - jtUPDATE_PF, "OrderBookDB::update", [this, ledger](Job&) { + jtUPDATE_PF, "OrderBookDB::update", [this, ledger]() { update(ledger); }); } diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index f961d28b7..979c14544 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -527,7 +527,7 @@ InboundLedger::done() // We hold the PeerSet lock, so must dispatch app_.getJobQueue().addJob( - jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()](Job&) { + jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() { if (self->complete_ && !self->failed_) { self->app_.getLedgerMaster().checkAccept(self->getLedger()); diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp index 8ee3443a2..76681ea0a 100644 --- a/src/ripple/app/ledger/impl/InboundLedgers.cpp +++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp @@ -183,7 +183,7 @@ public: // dispatch if (ledger->gotData(std::weak_ptr(peer), packet)) app_.getJobQueue().addJob( - jtLEDGER_DATA, "processLedgerData", [ledger](Job&) { + jtLEDGER_DATA, "processLedgerData", [ledger]() { ledger->runData(); }); @@ -198,7 +198,7 @@ public: if (packet->type() == protocol::liAS_NODE) { app_.getJobQueue().addJob( - jtLEDGER_DATA, "gotStaleData", [this, packet](Job&) { + jtLEDGER_DATA, "gotStaleData", [this, packet]() { gotStaleData(packet); }); } diff --git a/src/ripple/app/ledger/impl/LedgerDeltaAcquire.cpp b/src/ripple/app/ledger/impl/LedgerDeltaAcquire.cpp index 122915eed..3c19c6ee1 100644 --- a/src/ripple/app/ledger/impl/LedgerDeltaAcquire.cpp +++ b/src/ripple/app/ledger/impl/LedgerDeltaAcquire.cpp @@ -240,7 +240,7 @@ LedgerDeltaAcquire::onLedgerBuilt( app_.getJobQueue().addJob( jtREPLAY_TASK, "onLedgerBuilt", - [=, ledger = this->fullLedger_, &app = this->app_](Job&) { + [=, ledger = this->fullLedger_, &app = this->app_]() { for (auto reason : reasons) { switch (reason) diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 4cc195815..0dd0ba1ee 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -699,7 +699,7 @@ LedgerMaster::getEarliestFetch() } void -LedgerMaster::tryFill(Job& job, std::shared_ptr ledger) +LedgerMaster::tryFill(std::shared_ptr ledger) { std::uint32_t seq = ledger->info().seq; uint256 prevHash = ledger->info().parentHash; @@ -710,7 +710,7 @@ LedgerMaster::tryFill(Job& job, std::shared_ptr ledger) std::uint32_t maxHas = seq; NodeStore::Database& nodeStore{app_.getNodeStore()}; - while (!job.shouldCancel() && seq > 0) + while (!app_.getJobQueue().isStopping() && seq > 0) { { std::lock_guard ml(m_mutex); @@ -1453,7 +1453,7 @@ LedgerMaster::tryAdvance() if (!mAdvanceThread && !mValidLedger.empty()) { mAdvanceThread = true; - app_.getJobQueue().addJob(jtADVANCE, "advanceLedger", [this](Job&) { + app_.getJobQueue().addJob(jtADVANCE, "advanceLedger", [this]() { std::unique_lock sl(m_mutex); assert(!mValidLedger.empty() && mAdvanceThread); @@ -1476,7 +1476,7 @@ LedgerMaster::tryAdvance() } void -LedgerMaster::updatePaths(Job& job) +LedgerMaster::updatePaths() { { std::lock_guard ml(m_mutex); @@ -1487,7 +1487,7 @@ LedgerMaster::updatePaths(Job& job) } } - while (!job.shouldCancel()) + while (!app_.getJobQueue().isStopping()) { std::shared_ptr lastLedger; { @@ -1527,8 +1527,7 @@ LedgerMaster::updatePaths(Job& job) try { - app_.getPathRequests().updateAll( - lastLedger, job.getCancelCallback()); + app_.getPathRequests().updateAll(lastLedger); } catch (SHAMapMissingNode const& mn) { @@ -1591,7 +1590,7 @@ LedgerMaster::newPFWork( if (mPathFindThread < 2) { if (app_.getJobQueue().addJob( - jtUPDATE_PF, name, [this](Job& j) { updatePaths(j); })) + jtUPDATE_PF, name, [this]() { updatePaths(); })) { ++mPathFindThread; } @@ -1942,8 +1941,8 @@ LedgerMaster::fetchForHistory( mFillInProgress = seq; } app_.getJobQueue().addJob( - jtADVANCE, "tryFill", [this, ledger](Job& j) { - tryFill(j, ledger); + jtADVANCE, "tryFill", [this, ledger]() { + tryFill(ledger); }); } } @@ -2124,7 +2123,7 @@ LedgerMaster::gotFetchPack(bool progress, std::uint32_t seq) { if (!mGotFetchPackThread.test_and_set(std::memory_order_acquire)) { - app_.getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack", [&](Job&) { + app_.getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack", [&]() { app_.getInboundLedgers().gotFetchPack(); mGotFetchPackThread.clear(std::memory_order_release); }); diff --git a/src/ripple/app/ledger/impl/TimeoutCounter.cpp b/src/ripple/app/ledger/impl/TimeoutCounter.cpp index ff43a0e1a..9ea20c063 100644 --- a/src/ripple/app/ledger/impl/TimeoutCounter.cpp +++ b/src/ripple/app/ledger/impl/TimeoutCounter.cpp @@ -83,7 +83,7 @@ TimeoutCounter::queueJob(ScopedLockType& sl) app_.getJobQueue().addJob( queueJobParameter_.jobType, queueJobParameter_.jobName, - [wptr = pmDowncast()](Job&) { + [wptr = pmDowncast()]() { if (auto sptr = wptr.lock(); sptr) sptr->invokeOnTimer(); }); diff --git a/src/ripple/app/ledger/impl/TransactionAcquire.cpp b/src/ripple/app/ledger/impl/TransactionAcquire.cpp index 3cd4a301e..7d958cba8 100644 --- a/src/ripple/app/ledger/impl/TransactionAcquire.cpp +++ b/src/ripple/app/ledger/impl/TransactionAcquire.cpp @@ -81,7 +81,7 @@ TransactionAcquire::done() // just updates the consensus and related structures when we acquire // a transaction set. No need to update them if we're shutting down. app_.getJobQueue().addJob( - jtTXN_DATA, "completeAcquire", [pap, hash, map](Job&) { + jtTXN_DATA, "completeAcquire", [pap, hash, map]() { pap->getInboundTransactions().giveSet(hash, map, true); }); } diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 8040decea..ac0b6c947 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -1085,7 +1085,7 @@ public: if (e.value() == boost::system::errc::success) { m_jobQueue->addJob( - jtSWEEP, "sweep", [this](Job&) { doSweep(); }); + jtSWEEP, "sweep", [this]() { doSweep(); }); } // Recover as best we can if an unexpected error occurs. if (e.value() != boost::system::errc::success && diff --git a/src/ripple/app/main/NodeStoreScheduler.cpp b/src/ripple/app/main/NodeStoreScheduler.cpp index 0e6c14adb..0ac890964 100644 --- a/src/ripple/app/main/NodeStoreScheduler.cpp +++ b/src/ripple/app/main/NodeStoreScheduler.cpp @@ -32,7 +32,7 @@ NodeStoreScheduler::scheduleTask(NodeStore::Task& task) if (jobQueue_.isStopped()) return; - if (!jobQueue_.addJob(jtWRITE, "NodeObject::store", [&task](Job&) { + if (!jobQueue_.addJob(jtWRITE, "NodeObject::store", [&task]() { task.performScheduledTask(); })) { diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 574ce040b..2b9c5f316 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -949,7 +949,7 @@ NetworkOPsImp::setHeartbeatTimer() heartbeatTimer_, mConsensus.parms().ledgerGRANULARITY, [this]() { - m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this](Job&) { + m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() { processHeartbeatTimer(); }); }, @@ -964,7 +964,7 @@ NetworkOPsImp::setClusterTimer() clusterTimer_, 10s, [this]() { - m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this](Job&) { + m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() { processClusterTimer(); }); }, @@ -1153,7 +1153,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr const& iTrans) auto tx = std::make_shared(trans, reason, app_); - m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx](Job&) { + m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() { auto t = tx; processTransaction(t, false, false, FailHard::no); }); @@ -1224,9 +1224,8 @@ NetworkOPsImp::doTransactionAsync( if (mDispatchState == DispatchState::none) { - if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this](Job&) { - transactionBatch(); - })) + if (m_job_queue.addJob( + jtBATCH, "transactionBatch", [this]() { transactionBatch(); })) { mDispatchState = DispatchState::scheduled; } @@ -1262,10 +1261,9 @@ NetworkOPsImp::doTransactionSync( if (mTransactions.size()) { // More transactions need to be applied, but by another job. - if (m_job_queue.addJob( - jtBATCH, "transactionBatch", [this](Job&) { - transactionBatch(); - })) + if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() { + transactionBatch(); + })) { mDispatchState = DispatchState::scheduled; } @@ -2941,7 +2939,7 @@ NetworkOPsImp::reportFeeChange() if (f != mLastFeeSummary) { m_job_queue.addJob( - jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this](Job&) { + jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() { pubServer(); }); } @@ -2953,7 +2951,7 @@ NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase) m_job_queue.addJob( jtCLIENT_CONSENSUS, "reportConsensusStateChange->pubConsensus", - [this, phase](Job&) { pubConsensus(phase); }); + [this, phase]() { pubConsensus(phase); }); } inline void @@ -3346,7 +3344,7 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo) app_.getJobQueue().addJob( jtCLIENT_ACCT_HIST, "AccountHistoryTxStream", - [this, dbType = databaseType, subInfo](Job&) { + [this, dbType = databaseType, subInfo]() { auto const& accountId = subInfo.index_->accountId_; auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_; auto& txHistoryIndex = subInfo.index_->historyTxIndex_; diff --git a/src/ripple/app/paths/PathRequests.cpp b/src/ripple/app/paths/PathRequests.cpp index fe78f360f..50e591eb1 100644 --- a/src/ripple/app/paths/PathRequests.cpp +++ b/src/ripple/app/paths/PathRequests.cpp @@ -55,9 +55,7 @@ PathRequests::getLineCache( } void -PathRequests::updateAll( - std::shared_ptr const& inLedger, - Job::CancelCallback shouldCancel) +PathRequests::updateAll(std::shared_ptr const& inLedger) { auto event = app_.getJobQueue().makeLoadEvent(jtPATH_FIND, "PathRequest::updateAll"); @@ -84,7 +82,7 @@ PathRequests::updateAll( { for (auto const& wr : requests) { - if (shouldCancel()) + if (app_.getJobQueue().isStopping()) break; auto request = wr.lock(); @@ -174,7 +172,7 @@ PathRequests::updateAll( requests = requests_; cache = getLineCache(cache->getLedger(), false); } - } while (!shouldCancel()); + } while (!app_.getJobQueue().isStopping()); JLOG(mJournal.debug()) << "updateAll complete: " << processed << " processed and " << removed << " removed"; diff --git a/src/ripple/app/paths/PathRequests.h b/src/ripple/app/paths/PathRequests.h index d95ab9c5d..75b852867 100644 --- a/src/ripple/app/paths/PathRequests.h +++ b/src/ripple/app/paths/PathRequests.h @@ -47,12 +47,9 @@ public: /** Update all of the contained PathRequest instances. @param ledger Ledger we are pathfinding in. - @param shouldCancel Invocable that returns whether to cancel. */ void - updateAll( - std::shared_ptr const& ledger, - Job::CancelCallback shouldCancel); + updateAll(std::shared_ptr const& ledger); std::shared_ptr getLineCache( diff --git a/src/ripple/core/ClosureCounter.h b/src/ripple/core/ClosureCounter.h index 7c0d36d9f..9f5bc3ee6 100644 --- a/src/ripple/core/ClosureCounter.h +++ b/src/ripple/core/ClosureCounter.h @@ -31,21 +31,21 @@ namespace ripple { /** * The role of a `ClosureCounter` is to assist in shutdown by letting callers - * wait for the completion of callbacks (of a single type signature) that they - * previously scheduled. The lifetime of a `ClosureCounter` consists of two + * wait for the completion of closures (of a specific type signature) that they + * previously registered. These closures are typically callbacks for + * asynchronous operations. The lifetime of a `ClosureCounter` consists of two * phases: the initial expanding "fork" phase, and the subsequent shrinking * "join" phase. * - * In the fork phase, callers register a callback by passing the callback and + * In the fork phase, callers register a closure by passing the closure and * receiving a substitute in return. The substitute has the same callable - * interface as the callback, and it informs the `ClosureCounter` whenever it + * interface as the closure, and it informs the `ClosureCounter` whenever it * is copied or destroyed, so that it can keep an accurate count of copies. * * The transition to the join phase is made by a call to `join`. In this - * phase, every substitute returned going forward will be empty, signaling to - * the caller that they should just drop the callback and cancel their - * asynchronous operation. `join` blocks until all existing callback - * substitutes are destroyed. + * phase, every substitute returned going forward will be null, signaling to + * the caller that they should drop the closure and cancel their operation. + * `join` blocks until all existing closure substitutes are destroyed. * * \tparam Ret_t The return type of the closure. * \tparam Args_t The argument types of the closure. diff --git a/src/ripple/core/Coro.ipp b/src/ripple/core/Coro.ipp index 0234f9092..2b07c5a45 100644 --- a/src/ripple/core/Coro.ipp +++ b/src/ripple/core/Coro.ipp @@ -77,7 +77,7 @@ JobQueue::Coro::post() // sp keeps 'this' alive if (jq_.addJob( - type_, name_, [this, sp = shared_from_this()](Job&) { resume(); })) + type_, name_, [this, sp = shared_from_this()]() { resume(); })) { return true; } diff --git a/src/ripple/core/Job.h b/src/ripple/core/Job.h index 66aa0d051..0f3bb718b 100644 --- a/src/ripple/core/Job.h +++ b/src/ripple/core/Job.h @@ -109,43 +109,25 @@ public: // Job(); - // Job (Job const& other); - Job(JobType type, std::uint64_t index); - /** A callback used to check for canceling a job. */ - using CancelCallback = std::function; - // VFALCO TODO try to remove the dependency on LoadMonitor. Job(JobType type, std::string const& name, std::uint64_t index, LoadMonitor& lm, - std::function const& job, - CancelCallback cancelCallback); - - // Job& operator= (Job const& other); + std::function const& job); JobType getType() const; - CancelCallback - getCancelCallback() const; - /** Returns the time when the job was queued. */ clock_type::time_point const& queue_time() const; - /** Returns `true` if the running job should make a best-effort cancel. */ - bool - shouldCancel() const; - void doJob(); - void - rename(std::string const& n); - // These comparison operators make the jobs sort in priority order // in the job set bool @@ -158,16 +140,15 @@ public: operator>=(const Job& j) const; private: - CancelCallback m_cancelCallback; JobType mType; std::uint64_t mJobIndex; - std::function mJob; + std::function mJob; std::shared_ptr m_loadEvent; std::string mName; clock_type::time_point m_queue_time; }; -using JobCounter = ClosureCounter; +using JobCounter = ClosureCounter; } // namespace ripple diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h index a93d68b85..a9d541baa 100644 --- a/src/ripple/core/JobQueue.h +++ b/src/ripple/core/JobQueue.h @@ -138,7 +138,7 @@ public: join(); }; - using JobFunction = std::function; + using JobFunction = std::function; JobQueue( int threadCount, @@ -160,7 +160,7 @@ public: template < typename JobHandler, typename = std::enable_if_t()(std::declval())), + decltype(std::declval()()), void>::value>> bool addJob(JobType type, std::string const& name, JobHandler&& jobHandler) @@ -259,7 +259,6 @@ private: int nSuspend_ = 0; Workers m_workers; - Job::CancelCallback m_cancelCallback; // Statistics tracking perf::PerfLog& perfLog_; @@ -288,23 +287,6 @@ private: std::string const& name, JobFunction const& func); - // Signals an added Job for processing. - // - // Pre-conditions: - // The JobType must be valid. - // The Job must exist in mJobSet. - // The Job must not have previously been queued. - // - // Post-conditions: - // Count of waiting jobs of that type will be incremented. - // If JobQueue exists, and has at least one thread, Job will eventually - // run. - // - // Invariants: - // The calling thread owns the JobLock - void - queueJob(Job const& job, std::lock_guard const& lock); - // Returns the next Job we should run now. // // RunnableJob: diff --git a/src/ripple/core/impl/Job.cpp b/src/ripple/core/impl/Job.cpp index a9b82ccf3..780a9f49c 100644 --- a/src/ripple/core/impl/Job.cpp +++ b/src/ripple/core/impl/Job.cpp @@ -36,10 +36,8 @@ Job::Job( std::string const& name, std::uint64_t index, LoadMonitor& lm, - std::function const& job, - CancelCallback cancelCallback) - : m_cancelCallback(cancelCallback) - , mType(type) + std::function const& job) + : mType(type) , mJobIndex(index) , mJob(job) , mName(name) @@ -54,27 +52,12 @@ Job::getType() const return mType; } -Job::CancelCallback -Job::getCancelCallback() const -{ - assert(m_cancelCallback); - return m_cancelCallback; -} - Job::clock_type::time_point const& Job::queue_time() const { return m_queue_time; } -bool -Job::shouldCancel() const -{ - if (m_cancelCallback) - return m_cancelCallback(); - return false; -} - void Job::doJob() { @@ -82,19 +65,13 @@ Job::doJob() m_loadEvent->start(); m_loadEvent->setName(mName); - mJob(*this); + mJob(); // Destroy the lambda, otherwise we won't include // its duration in the time measurement mJob = nullptr; } -void -Job::rename(std::string const& newName) -{ - mName = newName; -} - bool Job::operator>(const Job& j) const { diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index 85dcec1ca..28947300c 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -35,7 +35,6 @@ JobQueue::JobQueue( , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs) , m_processCount(0) , m_workers(*this, &perfLog, "JobQueue", threadCount) - , m_cancelCallback(std::bind(&JobQueue::isStopping, this)) , perfLog_(perfLog) , m_collector(collector) { @@ -100,9 +99,27 @@ JobQueue::addRefCountedJob( { std::lock_guard lock(m_mutex); - auto result = m_jobSet.emplace( - type, name, ++m_lastJob, data.load(), func, m_cancelCallback); - queueJob(*result.first, lock); + auto result = + m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func); + auto const& job = *result.first; + + JobType const type(job.getType()); + assert(type != jtINVALID); + assert(m_jobSet.find(job) != m_jobSet.end()); + perfLog_.jobQueue(type); + + JobTypeData& data(getJobTypeData(type)); + + if (data.waiting + data.running < getJobLimit(type)) + { + m_workers.addTask(); + } + else + { + // defer the task until we go below the limit + ++data.deferred; + } + ++data.waiting; } return true; } @@ -282,29 +299,6 @@ JobQueue::isStopped() const return stopped_; } -void -JobQueue::queueJob(Job const& job, std::lock_guard const& lock) -{ - JobType const type(job.getType()); - assert(type != jtINVALID); - assert(m_jobSet.find(job) != m_jobSet.end()); - perfLog_.jobQueue(type); - - JobTypeData& data(getJobTypeData(type)); - - if (data.waiting + data.running < getJobLimit(type)) - { - m_workers.addTask(); - } - else - { - // defer the task until we go below the limit - // - ++data.deferred; - } - ++data.waiting; -} - void JobQueue::getNextJob(Job& job) { @@ -313,30 +307,25 @@ JobQueue::getNextJob(Job& job) std::set::const_iterator iter; for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter) { - JobTypeData& data(getJobTypeData(iter->getType())); + JobType const type = iter->getType(); + assert(type != jtINVALID); - assert(data.running <= getJobLimit(data.type())); + JobTypeData& data(getJobTypeData(type)); + assert(data.running <= getJobLimit(type)); // Run this job if we're running below the limit. if (data.running < getJobLimit(data.type())) { assert(data.waiting > 0); + --data.waiting; + ++data.running; break; } } assert(iter != m_jobSet.end()); - - JobType const type = iter->getType(); - JobTypeData& data(getJobTypeData(type)); - - assert(type != jtINVALID); - job = *iter; m_jobSet.erase(iter); - - --data.waiting; - ++data.running; } void diff --git a/src/ripple/core/impl/SociDB.cpp b/src/ripple/core/impl/SociDB.cpp index 419784c12..2d1332636 100644 --- a/src/ripple/core/impl/SociDB.cpp +++ b/src/ripple/core/impl/SociDB.cpp @@ -254,7 +254,7 @@ public: // There is a separate check in `checkpoint` for a valid // connection in the rare case when the DatabaseCon is destroyed // after locking this weak_ptr - [wp = std::weak_ptr{shared_from_this()}](Job&) { + [wp = std::weak_ptr{shared_from_this()}]() { if (auto self = wp.lock()) self->checkpoint(); })) diff --git a/src/ripple/core/impl/Workers.h b/src/ripple/core/impl/Workers.h index 0abbbe429..88ab8fa27 100644 --- a/src/ripple/core/impl/Workers.h +++ b/src/ripple/core/impl/Workers.h @@ -37,24 +37,35 @@ class PerfLog; /** * `Workers` is effectively a thread pool. The constructor takes a "callback" * that has a `void processTask(int instance)` method, and a number of - * workers. It creates that many Workers and then waits for calls to + * workers. It creates that many `Worker`s and then waits for calls to * `Workers::addTask()`. It holds a semaphore that counts the number of - * waiting tasks, and a condition variable for the event when the last worker - * pauses itself. + * pending "tasks", and a condition variable for the event when the last + * worker pauses itself. + * + * A "task" is just a call to the callback's `processTask` method. + * "Adding a task" means calling that method now, or remembering to call it in + * the future. + * This is implemented with a semaphore. + * If there are any workers waiting when a task is added, then one will be + * woken to claim the task. + * If not, then the next worker to wait on the semaphore will claim the task. * * Creating a `Worker` creates a thread that calls `Worker::run()`. When that * thread enters `Worker::run`, it increments the count of active workers in - * the parent `Workers` object and then blocks on the semaphore if there are - * no waiting tasks. It will be unblocked whenever the number of waiting tasks - * is incremented. That only happens in two circumstances: (1) when + * the parent `Workers` object and then tries to claim a task, which blocks if + * there are none pending. + * It will be unblocked whenever the semaphore is notified (i.e. when the + * number of pending tasks is incremented). + * That only happens in two circumstances: (1) when * `Workers::addTask` is called and (2) when `Workers` wants to pause some * workers ("pause one worker" is considered one task), which happens when * someone wants to stop the workers or shrink the threadpool. No worker * threads are ever destroyed until `Workers` is destroyed; it merely pauses * workers until then. * - * When an idle worker is woken, it checks whether `Workers` is trying to pause - * workers. If so, it adds itself to the set of paused workers and blocks on + * When a waiting worker is woken, it checks whether `Workers` is trying to + * pause workers. If so, it changes its status from active to paused and + * blocks on * its own condition variable. If not, then it calls `processTask` on the * "callback" held by `Workers`. * @@ -62,8 +73,8 @@ class PerfLog; * to exit is only set in the destructor of `Worker`, which unblocks the * paused thread and waits for it to exit. A `Worker::run` thread checks * whether it needs to exit only when it is woken from a pause (not when it is - * woken from idle). This is why the destructor for `Workers` pauses all the - * workers before destroying them. + * woken from waiting). This is why the destructor for `Workers` pauses all + * the workers before destroying them. */ class Workers { diff --git a/src/ripple/net/impl/RPCSub.cpp b/src/ripple/net/impl/RPCSub.cpp index f65f9a361..8b052e817 100644 --- a/src/ripple/net/impl/RPCSub.cpp +++ b/src/ripple/net/impl/RPCSub.cpp @@ -96,7 +96,7 @@ public: JLOG(j_.info()) << "RPCCall::fromNetwork start"; mSending = m_jobQueue.addJob( - jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this](Job&) { + jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() { sendThread(); }); } diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 7a1510996..6f0532821 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1070,7 +1070,7 @@ PeerImp::onMessage(std::shared_ptr const& m) // VFALCO What's the right job type? auto that = shared_from_this(); app_.getJobQueue().addJob( - jtVALIDATION_ut, "receiveManifests", [this, that, m](Job&) { + jtVALIDATION_ut, "receiveManifests", [this, that, m]() { overlay_.onManifests(m, that); }); } @@ -1608,7 +1608,7 @@ PeerImp::handleTransaction( [weak = std::weak_ptr(shared_from_this()), flags, checkSignature, - stx](Job&) { + stx]() { if (auto peer = weak.lock()) peer->checkTransaction(flags, checkSignature, stx); }); @@ -1711,7 +1711,7 @@ PeerImp::onMessage(std::shared_ptr const& m) // Queue a job to process the request std::weak_ptr weak = shared_from_this(); - app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m](Job&) { + app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() { if (auto peer = weak.lock()) peer->processLedgerRequest(m); }); @@ -1730,7 +1730,7 @@ PeerImp::onMessage(std::shared_ptr const& m) fee_ = Resource::feeMediumBurdenPeer; std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( - jtREPLAY_REQ, "recvProofPathRequest", [weak, m](Job&) { + jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() { if (auto peer = weak.lock()) { auto reply = @@ -1779,7 +1779,7 @@ PeerImp::onMessage(std::shared_ptr const& m) fee_ = Resource::feeMediumBurdenPeer; std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( - jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m](Job&) { + jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() { if (auto peer = weak.lock()) { auto reply = @@ -1900,7 +1900,7 @@ PeerImp::onMessage(std::shared_ptr const& m) { std::weak_ptr weak{shared_from_this()}; app_.getJobQueue().addJob( - jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m](Job&) { + jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() { if (auto peer = weak.lock()) { peer->app_.getInboundTransactions().gotData( @@ -2013,9 +2013,9 @@ PeerImp::onMessage(std::shared_ptr const& m) app_.getJobQueue().addJob( isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose", - [weak, m, proposal](Job& job) { + [weak, isTrusted, m, proposal]() { if (auto peer = weak.lock()) - peer->checkPropose(job, m, proposal); + peer->checkPropose(isTrusted, m, proposal); }); } @@ -2602,7 +2602,7 @@ PeerImp::onMessage(std::shared_ptr const& m) app_.getJobQueue().addJob( isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, "recvValidation->checkValidation", - [weak, val, m](Job&) { + [weak, val, m]() { if (auto peer = weak.lock()) peer->checkValidation(val, m); }); @@ -2655,7 +2655,7 @@ PeerImp::onMessage(std::shared_ptr const& m) std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( - jtREQUESTED_TXN, "doTransactions", [weak, m](Job&) { + jtREQUESTED_TXN, "doTransactions", [weak, m]() { if (auto peer = weak.lock()) peer->doTransactions(m); }); @@ -2795,7 +2795,7 @@ PeerImp::onMessage(std::shared_ptr const& m) std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( - jtMISSING_TXN, "handleHaveTransactions", [weak, m](Job&) { + jtMISSING_TXN, "handleHaveTransactions", [weak, m]() { if (auto peer = weak.lock()) peer->handleHaveTransactions(m); }); @@ -2975,7 +2975,7 @@ PeerImp::doFetchPack(const std::shared_ptr& packet) auto elapsed = UptimeClock::now(); auto const pap = &app_; app_.getJobQueue().addJob( - jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed](Job&) { + jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() { pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed); }); } @@ -3111,12 +3111,10 @@ PeerImp::checkTransaction( // Called from our JobQueue void PeerImp::checkPropose( - Job& job, + bool isTrusted, std::shared_ptr const& packet, RCLCxPeerPos peerPos) { - bool isTrusted = (job.getType() == jtPROPOSAL_t); - JLOG(p_journal_.trace()) << "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal"; diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index f7bfbbc9d..8bed64e72 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -619,7 +619,7 @@ private: void checkPropose( - Job& job, + bool isTrusted, std::shared_ptr const& packet, RCLCxPeerPos peerPos); diff --git a/src/ripple/rpc/impl/ShardArchiveHandler.cpp b/src/ripple/rpc/impl/ShardArchiveHandler.cpp index c52c1b501..efa422c8b 100644 --- a/src/ripple/rpc/impl/ShardArchiveHandler.cpp +++ b/src/ripple/rpc/impl/ShardArchiveHandler.cpp @@ -360,7 +360,7 @@ ShardArchiveHandler::next(std::lock_guard const& l) // to prevent holding up the lock if the downloader // sleeps. auto const& url{archives_.begin()->second}; - auto wrapper = jobCounter_.wrap([this, url, dstDir](Job&) { + auto wrapper = jobCounter_.wrap([this, url, dstDir]() { auto const ssl = (url.scheme == "https"); auto const defaultPort = ssl ? 443 : 80; @@ -416,42 +416,41 @@ ShardArchiveHandler::complete(path dstPath) } // Make lambdas mutable captured vars can be moved from - auto wrapper = - jobCounter_.wrap([=, dstPath = std::move(dstPath)](Job&) mutable { - if (stopping_) - return; + auto wrapper = jobCounter_.wrap([=, + dstPath = std::move(dstPath)]() mutable { + if (stopping_) + return; - // If not synced then defer and retry - auto const mode{app_.getOPs().getOperatingMode()}; - if (mode != OperatingMode::FULL) - { - std::lock_guard lock(m_); - timer_.expires_from_now(static_cast( - (static_cast(OperatingMode::FULL) - - static_cast(mode)) * - 10)); + // If not synced then defer and retry + auto const mode{app_.getOPs().getOperatingMode()}; + if (mode != OperatingMode::FULL) + { + std::lock_guard lock(m_); + timer_.expires_from_now(static_cast( + (static_cast(OperatingMode::FULL) - + static_cast(mode)) * + 10)); - auto wrapper = timerCounter_.wrap( - [=, dstPath = std::move(dstPath)]( - boost::system::error_code const& ec) mutable { - if (ec != boost::asio::error::operation_aborted) - complete(std::move(dstPath)); - }); + auto wrapper = timerCounter_.wrap( + [=, dstPath = std::move(dstPath)]( + boost::system::error_code const& ec) mutable { + if (ec != boost::asio::error::operation_aborted) + complete(std::move(dstPath)); + }); - if (!wrapper) - onClosureFailed( - "failed to wrap closure for operating mode timer", - lock); - else - timer_.async_wait(*wrapper); - } + if (!wrapper) + onClosureFailed( + "failed to wrap closure for operating mode timer", lock); else - { - process(dstPath); - std::lock_guard lock(m_); - removeAndProceed(lock); - } - }); + timer_.async_wait(*wrapper); + } + else + { + process(dstPath); + std::lock_guard lock(m_); + removeAndProceed(lock); + } + }); if (!wrapper) { diff --git a/src/test/core/Coroutine_test.cpp b/src/test/core/Coroutine_test.cpp index 944602784..8937255a7 100644 --- a/src/test/core/Coroutine_test.cpp +++ b/src/test/core/Coroutine_test.cpp @@ -127,7 +127,7 @@ public: BEAST_EXPECT(*lv == -1); gate g; - jq.addJob(jtCLIENT, "LocalValue-Test", [&](auto const& job) { + jq.addJob(jtCLIENT, "LocalValue-Test", [&]() { this->BEAST_EXPECT(*lv == -1); *lv = -2; this->BEAST_EXPECT(*lv == -2); @@ -166,7 +166,7 @@ public: c->join(); } - jq.addJob(jtCLIENT, "LocalValue-Test", [&](auto const& job) { + jq.addJob(jtCLIENT, "LocalValue-Test", [&]() { this->BEAST_EXPECT(*lv == -2); g.signal(); }); diff --git a/src/test/core/JobQueue_test.cpp b/src/test/core/JobQueue_test.cpp index d0c698099..cba021767 100644 --- a/src/test/core/JobQueue_test.cpp +++ b/src/test/core/JobQueue_test.cpp @@ -37,10 +37,9 @@ class JobQueue_test : public beast::unit_test::suite { // addJob() should run the Job (and return true). std::atomic jobRan{false}; - BEAST_EXPECT( - jQueue.addJob(jtCLIENT, "JobAddTest1", [&jobRan](Job&) { - jobRan = true; - }) == true); + BEAST_EXPECT(jQueue.addJob(jtCLIENT, "JobAddTest1", [&jobRan]() { + jobRan = true; + }) == true); // Wait for the Job to run. while (jobRan == false) @@ -58,7 +57,7 @@ class JobQueue_test : public beast::unit_test::suite // Not recommended for the faint of heart... bool unprotected; BEAST_EXPECT( - jQueue.addJob(jtCLIENT, "JobAddTest2", [&unprotected](Job&) { + jQueue.addJob(jtCLIENT, "JobAddTest2", [&unprotected]() { unprotected = false; }) == false); }