Fix the assertion failure in JobQueue::stop when exiting but there's a suspended coroutine

Signed-off-by: JCW <a1q123456@users.noreply.github.com>
This commit is contained in:
JCW
2025-09-08 12:58:26 +01:00
parent 9fe0a154f1
commit 3adfa074bc
4 changed files with 86 additions and 43 deletions

View File

@@ -41,7 +41,10 @@ JobQueue::Coro::Coro(
do_yield) {
yield_ = &do_yield;
yield();
fn(shared_from_this());
if (!shouldStop())
{
fn(shared_from_this());
}
#ifndef NDEBUG
finished_ = true;
#endif
@@ -58,11 +61,17 @@ inline JobQueue::Coro::~Coro()
}
inline void
JobQueue::Coro::yield() const
JobQueue::Coro::yield()
{
{
std::lock_guard lock(jq_.m_mutex);
if (shouldStop())
{
return;
}
++jq_.nSuspend_;
jq_.m_suspendedCoros[this] = weak_from_this();
jq_.cv_.notify_all();
}
(*yield_)();
}
@@ -99,6 +108,8 @@ JobQueue::Coro::resume()
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
jq_.m_suspendedCoros.erase(this);
jq_.cv_.notify_all();
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
@@ -120,27 +131,6 @@ JobQueue::Coro::runnable() const
return static_cast<bool>(coro_);
}
inline void
JobQueue::Coro::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
// expectEarlyExit() must only ever be called from outside the
// Coro's stack. It you're inside the stack you can simply return
// and be done.
//
// That said, since we're outside the Coro's stack, we need to
// decrement the nSuspend that the Coro's call to yield caused.
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
}
inline void
JobQueue::Coro::join()
{

View File

@@ -60,6 +60,7 @@ public:
/** Coroutines must run to completion. */
class Coro : public std::enable_shared_from_this<Coro>
{
friend class JobQueue;
private:
detail::LocalValues lvs_;
JobQueue& jq_;
@@ -97,7 +98,7 @@ public:
post.
*/
void
yield() const;
yield();
/** Schedule coroutine execution.
Effects:
@@ -131,13 +132,13 @@ public:
bool
runnable() const;
/** Once called, the Coro allows early exit without an assert. */
void
expectEarlyExit();
/** Waits until coroutine returns from the user function. */
void
join();
/** Returns true if the coroutine should stop executing */
bool
shouldStop() const;
};
using JobFunction = std::function<void()>;
@@ -167,6 +168,10 @@ public:
bool
addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
{
if (!accepting_)
{
return false;
}
if (auto optionalCountedJob =
jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
{
@@ -249,6 +254,7 @@ private:
std::uint64_t m_lastJob;
std::set<Job> m_jobSet;
JobCounter jobCounter_;
std::atomic_bool accepting_ = true;
std::atomic_bool stopping_{false};
std::atomic_bool stopped_{false};
JobDataMap m_jobData;
@@ -260,6 +266,8 @@ private:
// The number of suspended coroutines
int nSuspend_ = 0;
std::map<void*, std::weak_ptr<Coro>> m_suspendedCoros;
Workers m_workers;
// Statistics tracking
@@ -270,6 +278,22 @@ private:
std::condition_variable cv_;
void
onStopResumeCoros(std::map<void*, std::weak_ptr<Coro>>& coros)
{
for (auto& [_, coro] : coros)
{
if (auto coroPtr = coro.lock())
{
if (auto optionalCountedJob =
jobCounter_.wrap([=]() { coroPtr->resume(); }))
{
addRefCountedJob(coroPtr->type_, coroPtr->name_, std::move(*optionalCountedJob));
}
}
}
}
void
collect();
JobTypeData&
@@ -412,6 +436,10 @@ template <class F>
std::shared_ptr<JobQueue::Coro>
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{
if (!accepting_)
{
return nullptr;
}
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<Coro>).
@@ -422,7 +450,6 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{
// The Coro was not successfully posted. Disable it so it's destructor
// can run with no negative side effects. Then destroy it.
coro->expectEarlyExit();
coro.reset();
}
return coro;

View File

@@ -26,6 +26,12 @@
namespace ripple {
bool
JobQueue::Coro::shouldStop() const
{
return jq_.stopping_ || jq_.stopped_ || !jq_.accepting_;
}
JobQueue::JobQueue(
int threadCount,
beast::insight::Collector::ptr const& collector,
@@ -295,6 +301,22 @@ JobQueue::getJobTypeData(JobType type)
void
JobQueue::stop()
{
// Once we stop accepting new jobs, all running coroutines won't be able to
// get suspended and yield() will return immediately, so we can safely
// move m_suspendedCoros, and we can assume that no coroutine will be
// suspended in the future.
std::map<void*, std::weak_ptr<Coro>> suspendedCoros;
{
std::unique_lock lock(m_mutex);
accepting_ = false;
suspendedCoros = std::move(m_suspendedCoros);
}
if (!suspendedCoros.empty())
{
// We should resume the suspended coroutines so that the coroutines
// get a chance to exit cleanly.
onStopResumeCoros(suspendedCoros);
}
stopping_ = true;
using namespace std::chrono_literals;
jobCounter_.join("JobQueue", 1s, m_journal);
@@ -306,7 +328,7 @@ JobQueue::stop()
// we must wait on the condition variable to make these assertions.
std::unique_lock<std::mutex> lock(m_mutex);
cv_.wait(
lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
lock, [this] { return m_processCount == 0 && nSuspend_ == 0 && m_jobSet.empty(); });
XRPL_ASSERT(
m_processCount == 0,
"ripple::JobQueue::stop : all processes completed");

View File

@@ -128,21 +128,17 @@ doRipplePathFind(RPC::JsonContext& context)
// May 2017
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
request,
[&context]() {
// Copying the shared_ptr keeps the coroutine alive up
[coro = context.coro]() {
// Capturing the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the
// captured reference could evaporate when we return from
// coroCopy->resume(). This is not strictly necessary, but
// will make maintenance easier.
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
if (!coroCopy->post())
{
// The post() failed, so we won't get a thread to let
// the Coro finish. We'll call Coro::resume() so the
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
}
// coro->post().
// When post() failed, we won't get a thread to let
// the Coro finish. We should ignore the coroutine and
// let it destruct, as the JobQueu has been signaled to
// close, and resuming it manually messes up the internal
// state in JobQueue.
coro->post();
},
context.consumer,
lpLedger,
@@ -150,6 +146,14 @@ doRipplePathFind(RPC::JsonContext& context)
if (request)
{
context.coro->yield();
// Each time after we resume from yield(), we should
// check if cancellation has been requested. It would
// be a lot more elegant if we replace boost coroutine
// with c++ standard coroutine.
if (context.coro->shouldStop())
{
return jvResult;
}
jvResult = request->doStatus(context.params);
}