From 3adfa074bc995b4001edca74669a094b2a19bfe0 Mon Sep 17 00:00:00 2001 From: JCW Date: Mon, 8 Sep 2025 12:58:26 +0100 Subject: [PATCH] Fix the assertion failure in JobQueue::stop when exiting but there's a suspended coroutine Signed-off-by: JCW --- src/xrpld/core/Coro.ipp | 36 ++++++++------------- src/xrpld/core/JobQueue.h | 39 +++++++++++++++++++---- src/xrpld/core/detail/JobQueue.cpp | 24 +++++++++++++- src/xrpld/rpc/handlers/RipplePathFind.cpp | 30 +++++++++-------- 4 files changed, 86 insertions(+), 43 deletions(-) diff --git a/src/xrpld/core/Coro.ipp b/src/xrpld/core/Coro.ipp index 5901e07c68..bb6fa16ff4 100644 --- a/src/xrpld/core/Coro.ipp +++ b/src/xrpld/core/Coro.ipp @@ -41,7 +41,10 @@ JobQueue::Coro::Coro( do_yield) { yield_ = &do_yield; yield(); - fn(shared_from_this()); + if (!shouldStop()) + { + fn(shared_from_this()); + } #ifndef NDEBUG finished_ = true; #endif @@ -58,11 +61,17 @@ inline JobQueue::Coro::~Coro() } inline void -JobQueue::Coro::yield() const +JobQueue::Coro::yield() { { std::lock_guard lock(jq_.m_mutex); + if (shouldStop()) + { + return; + } ++jq_.nSuspend_; + jq_.m_suspendedCoros[this] = weak_from_this(); + jq_.cv_.notify_all(); } (*yield_)(); } @@ -99,6 +108,8 @@ JobQueue::Coro::resume() { std::lock_guard lock(jq_.m_mutex); --jq_.nSuspend_; + jq_.m_suspendedCoros.erase(this); + jq_.cv_.notify_all(); } auto saved = detail::getLocalValues().release(); detail::getLocalValues().reset(&lvs_); @@ -120,27 +131,6 @@ JobQueue::Coro::runnable() const return static_cast(coro_); } -inline void -JobQueue::Coro::expectEarlyExit() -{ -#ifndef NDEBUG - if (!finished_) -#endif - { - // expectEarlyExit() must only ever be called from outside the - // Coro's stack. It you're inside the stack you can simply return - // and be done. - // - // That said, since we're outside the Coro's stack, we need to - // decrement the nSuspend that the Coro's call to yield caused. - std::lock_guard lock(jq_.m_mutex); - --jq_.nSuspend_; -#ifndef NDEBUG - finished_ = true; -#endif - } -} - inline void JobQueue::Coro::join() { diff --git a/src/xrpld/core/JobQueue.h b/src/xrpld/core/JobQueue.h index eda956c019..883040bba4 100644 --- a/src/xrpld/core/JobQueue.h +++ b/src/xrpld/core/JobQueue.h @@ -60,6 +60,7 @@ public: /** Coroutines must run to completion. */ class Coro : public std::enable_shared_from_this { + friend class JobQueue; private: detail::LocalValues lvs_; JobQueue& jq_; @@ -97,7 +98,7 @@ public: post. */ void - yield() const; + yield(); /** Schedule coroutine execution. Effects: @@ -131,13 +132,13 @@ public: bool runnable() const; - /** Once called, the Coro allows early exit without an assert. */ - void - expectEarlyExit(); - /** Waits until coroutine returns from the user function. */ void join(); + + /** Returns true if the coroutine should stop executing */ + bool + shouldStop() const; }; using JobFunction = std::function; @@ -167,6 +168,10 @@ public: bool addJob(JobType type, std::string const& name, JobHandler&& jobHandler) { + if (!accepting_) + { + return false; + } if (auto optionalCountedJob = jobCounter_.wrap(std::forward(jobHandler))) { @@ -249,6 +254,7 @@ private: std::uint64_t m_lastJob; std::set m_jobSet; JobCounter jobCounter_; + std::atomic_bool accepting_ = true; std::atomic_bool stopping_{false}; std::atomic_bool stopped_{false}; JobDataMap m_jobData; @@ -260,6 +266,8 @@ private: // The number of suspended coroutines int nSuspend_ = 0; + std::map> m_suspendedCoros; + Workers m_workers; // Statistics tracking @@ -270,6 +278,22 @@ private: std::condition_variable cv_; + void + onStopResumeCoros(std::map>& coros) + { + for (auto& [_, coro] : coros) + { + if (auto coroPtr = coro.lock()) + { + if (auto optionalCountedJob = + jobCounter_.wrap([=]() { coroPtr->resume(); })) + { + addRefCountedJob(coroPtr->type_, coroPtr->name_, std::move(*optionalCountedJob)); + } + } + } + } + void collect(); JobTypeData& @@ -412,6 +436,10 @@ template std::shared_ptr JobQueue::postCoro(JobType t, std::string const& name, F&& f) { + if (!accepting_) + { + return nullptr; + } /* First param is a detail type to make construction private. Last param is the function the coroutine runs. Signature of void(std::shared_ptr). @@ -422,7 +450,6 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f) { // The Coro was not successfully posted. Disable it so it's destructor // can run with no negative side effects. Then destroy it. - coro->expectEarlyExit(); coro.reset(); } return coro; diff --git a/src/xrpld/core/detail/JobQueue.cpp b/src/xrpld/core/detail/JobQueue.cpp index 1ea1df51ab..1e446a32d1 100644 --- a/src/xrpld/core/detail/JobQueue.cpp +++ b/src/xrpld/core/detail/JobQueue.cpp @@ -26,6 +26,12 @@ namespace ripple { +bool +JobQueue::Coro::shouldStop() const +{ + return jq_.stopping_ || jq_.stopped_ || !jq_.accepting_; +} + JobQueue::JobQueue( int threadCount, beast::insight::Collector::ptr const& collector, @@ -295,6 +301,22 @@ JobQueue::getJobTypeData(JobType type) void JobQueue::stop() { + // Once we stop accepting new jobs, all running coroutines won't be able to + // get suspended and yield() will return immediately, so we can safely + // move m_suspendedCoros, and we can assume that no coroutine will be + // suspended in the future. + std::map> suspendedCoros; + { + std::unique_lock lock(m_mutex); + accepting_ = false; + suspendedCoros = std::move(m_suspendedCoros); + } + if (!suspendedCoros.empty()) + { + // We should resume the suspended coroutines so that the coroutines + // get a chance to exit cleanly. + onStopResumeCoros(suspendedCoros); + } stopping_ = true; using namespace std::chrono_literals; jobCounter_.join("JobQueue", 1s, m_journal); @@ -306,7 +328,7 @@ JobQueue::stop() // we must wait on the condition variable to make these assertions. std::unique_lock lock(m_mutex); cv_.wait( - lock, [this] { return m_processCount == 0 && m_jobSet.empty(); }); + lock, [this] { return m_processCount == 0 && nSuspend_ == 0 && m_jobSet.empty(); }); XRPL_ASSERT( m_processCount == 0, "ripple::JobQueue::stop : all processes completed"); diff --git a/src/xrpld/rpc/handlers/RipplePathFind.cpp b/src/xrpld/rpc/handlers/RipplePathFind.cpp index abe1b3b9bb..c6e48d562d 100644 --- a/src/xrpld/rpc/handlers/RipplePathFind.cpp +++ b/src/xrpld/rpc/handlers/RipplePathFind.cpp @@ -128,21 +128,17 @@ doRipplePathFind(RPC::JsonContext& context) // May 2017 jvResult = context.app.getPathRequests().makeLegacyPathRequest( request, - [&context]() { - // Copying the shared_ptr keeps the coroutine alive up + [coro = context.coro]() { + // Capturing the shared_ptr keeps the coroutine alive up // through the return. Otherwise the storage under the // captured reference could evaporate when we return from - // coroCopy->resume(). This is not strictly necessary, but - // will make maintenance easier. - std::shared_ptr coroCopy{context.coro}; - if (!coroCopy->post()) - { - // The post() failed, so we won't get a thread to let - // the Coro finish. We'll call Coro::resume() so the - // Coro can finish on our thread. Otherwise the - // application will hang on shutdown. - coroCopy->resume(); - } + // coro->post(). + // When post() failed, we won't get a thread to let + // the Coro finish. We should ignore the coroutine and + // let it destruct, as the JobQueu has been signaled to + // close, and resuming it manually messes up the internal + // state in JobQueue. + coro->post(); }, context.consumer, lpLedger, @@ -150,6 +146,14 @@ doRipplePathFind(RPC::JsonContext& context) if (request) { context.coro->yield(); + // Each time after we resume from yield(), we should + // check if cancellation has been requested. It would + // be a lot more elegant if we replace boost coroutine + // with c++ standard coroutine. + if (context.coro->shouldStop()) + { + return jvResult; + } jvResult = request->doStatus(context.params); }