From bce1520d4bd1da7e345b7566b15a34c2d9632f10 Mon Sep 17 00:00:00 2001 From: JCW Date: Thu, 2 Oct 2025 11:47:42 +0100 Subject: [PATCH] Address PR comments Signed-off-by: JCW --- src/xrpld/core/Coro.ipp | 11 +++-- src/xrpld/core/JobQueue.h | 68 ++++++++++++++---------------- src/xrpld/core/detail/JobQueue.cpp | 34 ++++++++++----- 3 files changed, 59 insertions(+), 54 deletions(-) diff --git a/src/xrpld/core/Coro.ipp b/src/xrpld/core/Coro.ipp index 8f6ddf95e6..67844ec672 100644 --- a/src/xrpld/core/Coro.ipp +++ b/src/xrpld/core/Coro.ipp @@ -120,14 +120,13 @@ JobQueue::Coro::post() inline void JobQueue::Coro::resume() { + auto suspended = CoroState::Suspended; + if (!state_.compare_exchange_strong(suspended, CoroState::Running)) { - auto suspended = CoroState::Suspended; - if (!state_.compare_exchange_strong(suspended, CoroState::Running)) - { - return; - } - cv_.notify_all(); + return; } + cv_.notify_all(); + { std::lock_guard lock(jq_.m_mutex); jq_.m_suspendedCoros.erase(this); diff --git a/src/xrpld/core/JobQueue.h b/src/xrpld/core/JobQueue.h index 3d4bebf155..f13abccffb 100644 --- a/src/xrpld/core/JobQueue.h +++ b/src/xrpld/core/JobQueue.h @@ -57,6 +57,13 @@ struct Coro_create_t class JobQueue : private Workers::Callback { public: + enum class QueueState + { + Accepting, + Stopping, + Stopped + }; + /** Coroutines must run to completion. */ class Coro : public std::enable_shared_from_this { @@ -168,24 +175,16 @@ public: @return true if jobHandler added to queue. */ - template < - typename JobHandler, - typename = std::enable_if_t()()), - void>::value>> + template bool addJob(JobType type, std::string const& name, JobHandler&& jobHandler) + requires std::is_void_v> { - if (!accepting_) + if (queueState_ != QueueState::Accepting) { return false; } - if (auto optionalCountedJob = - jobCounter_.wrap(std::forward(jobHandler))) - { - return addRefCountedJob(type, name, std::move(*optionalCountedJob)); - } - return false; + return addJobNoStatusCheck(type, name, std::forward(jobHandler)); } /** Creates a coroutine and adds a job to the queue which will run it. @@ -244,13 +243,16 @@ public: bool isStopping() const { - return stopping_; + return queueState_ == QueueState::Stopping; } // We may be able to move away from this, but we can keep it during the // transition. bool - isStopped() const; + isStopped() const + { + return queueState_ == QueueState::Stopped; + } private: friend class Coro; @@ -262,9 +264,7 @@ private: std::uint64_t m_lastJob; std::set m_jobSet; JobCounter jobCounter_; - std::atomic_bool accepting_ = true; - std::atomic_bool stopping_{false}; - std::atomic_bool stopped_{false}; + std::atomic queueState_{QueueState::Accepting}; JobDataMap m_jobData; JobTypeData m_invalidJobData; @@ -286,30 +286,24 @@ private: std::condition_variable cv_; - void - onStopResumeCoros(std::map>& coros) - { - for (auto& [_, coro] : coros) - { - if (auto coroPtr = coro.lock()) - { - if (auto optionalCountedJob = - jobCounter_.wrap([=]() { coroPtr->resume(); })) - { - addRefCountedJob( - coroPtr->type_, - coroPtr->name_, - std::move(*optionalCountedJob)); - } - } - } - } - void collect(); JobTypeData& getJobTypeData(JobType type); + template + bool + addJobNoStatusCheck(JobType type, std::string const& name, JobHandler&& jobHandler) + requires std::is_void_v> + { + if (auto optionalCountedJob = + jobCounter_.wrap(std::forward(jobHandler))) + { + return addRefCountedJob(type, name, std::move(*optionalCountedJob)); + } + return false; + } + // Adds a reference counted job to the JobQueue. // // param type The type of job. @@ -447,7 +441,7 @@ template std::shared_ptr JobQueue::postCoro(JobType t, std::string const& name, F&& f) { - if (!accepting_) + if (queueState_ != QueueState::Accepting) { return nullptr; } diff --git a/src/xrpld/core/detail/JobQueue.cpp b/src/xrpld/core/detail/JobQueue.cpp index 68800fee48..5068587015 100644 --- a/src/xrpld/core/detail/JobQueue.cpp +++ b/src/xrpld/core/detail/JobQueue.cpp @@ -29,7 +29,7 @@ namespace ripple { bool JobQueue::Coro::shouldStop() const { - return jq_.stopping_ || jq_.stopped_ || !jq_.accepting_ || exiting_; + return jq_.queueState_ != QueueState::Accepting || exiting_; } JobQueue::JobQueue( @@ -305,19 +305,33 @@ JobQueue::stop() // get suspended and yield() will return immediately, so we can safely // move m_suspendedCoros, and we can assume that no coroutine will be // suspended in the future. + + auto accepting = QueueState::Accepting; + + if (!queueState_.compare_exchange_strong(accepting, QueueState::Stopping)) + { + XRPL_ASSERT(false, "Incorrect queueState, should be accepting but not!"); + } std::map> suspendedCoros; { std::unique_lock lock(m_mutex); - accepting_ = false; suspendedCoros = std::move(m_suspendedCoros); } if (!suspendedCoros.empty()) { // We should resume the suspended coroutines so that the coroutines // get a chance to exit cleanly. - onStopResumeCoros(suspendedCoros); + for (auto& [_, coro] : suspendedCoros) + { + if (auto coroPtr = coro.lock()) + { + // We don't allow any new jobs from outside when we are + // stopping, but we should allow new jobs from inside the class. + addJobNoStatusCheck(coroPtr->type_, coroPtr->name_, [coroPtr]() { coroPtr->resume(); }); + } + } } - stopping_ = true; + using namespace std::chrono_literals; jobCounter_.join("JobQueue", 1s, m_journal); { @@ -337,14 +351,12 @@ JobQueue::stop() m_jobSet.empty(), "ripple::JobQueue::stop : all jobs completed"); XRPL_ASSERT( nSuspend_ == 0, "ripple::JobQueue::stop : no coros suspended"); - stopped_ = true; } -} - -bool -JobQueue::isStopped() const -{ - return stopped_; + auto stopping = QueueState::Stopping; + if (queueState_.compare_exchange_strong(stopping, QueueState::Stopped)) + { + XRPL_ASSERT(false, "Incorrect queueState, should be stopping but not!"); + } } void