Compare commits

...

6 Commits

Author SHA1 Message Date
JCW
9a30560f53 Fix the bug
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-18 09:31:37 +01:00
JCW
6fd30ebde1 Fix formatting
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-16 16:19:32 +01:00
JCW
a1f6580e54 Fix the bug
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-16 16:14:43 +01:00
JCW
32a3f0a867 Fix formatting
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-08 14:53:58 +01:00
JCW
ed6dcdb10f Add unit test
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-08 14:53:58 +01:00
JCW
3adfa074bc Fix the assertion failure in JobQueue::stop when exiting but there's a suspended coroutine
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-08 14:53:58 +01:00
5 changed files with 187 additions and 66 deletions

View File

@@ -175,12 +175,78 @@ public:
BEAST_EXPECT(*lv == -1); BEAST_EXPECT(*lv == -1);
} }
void
stopJobQueueWhenCoroutineSuspended()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("Stop JobQueue when a coroutine is suspended");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
bool started = false;
bool finished = false;
std::optional<bool> shouldStop;
std::condition_variable cv;
std::mutex m;
std::unique_lock<std::mutex> lk(m);
auto coro = env.app().getJobQueue().postCoro(
jtCLIENT, "Coroutine-Test", [&](auto const& c) {
started = true;
cv.notify_all();
c->yield();
finished = true;
shouldStop = c->shouldStop();
cv.notify_all();
});
cv.wait_for(lk, 5s, [&]() { return started; });
env.app().getJobQueue().stop();
cv.wait_for(lk, 5s, [&]() { return finished; });
BEAST_EXPECT(finished);
BEAST_EXPECT(shouldStop.has_value() && *shouldStop == true);
}
void
coroutineGetsDestroyedBeforeExecuting()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("Coroutine gets destroyed before executing");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
{
auto coro = std::make_shared<JobQueue::Coro>(
Coro_create_t{},
env.app().getJobQueue(),
JobType::jtCLIENT,
"test",
[](auto coro) {
});
}
pass();
}
void void
run() override run() override
{ {
correct_order(); correct_order();
incorrect_order(); incorrect_order();
thread_specific_storage(); thread_specific_storage();
stopJobQueueWhenCoroutineSuspended();
coroutineGetsDestroyedBeforeExecuting();
} }
}; };

View File

@@ -34,17 +34,21 @@ JobQueue::Coro::Coro(
: jq_(jq) : jq_(jq)
, type_(type) , type_(type)
, name_(name) , name_(name)
, running_(false)
, coro_( , coro_(
[this, fn = std::forward<F>(f)]( [this, fn = std::forward<F>(f)](
boost::coroutines::asymmetric_coroutine<void>::push_type& boost::coroutines::asymmetric_coroutine<void>::push_type&
do_yield) { do_yield) {
yield_ = &do_yield; yield_ = &do_yield;
yield(); yield();
fn(shared_from_this()); // self makes Coro alive until this function returns
#ifndef NDEBUG std::shared_ptr<Coro> self;
finished_ = true; if (!shouldStop())
#endif {
self = shared_from_this();
fn(self);
}
state_ = CoroState::Finished;
cv_.notify_all();
}, },
boost::coroutines::attributes(megabytes(1))) boost::coroutines::attributes(megabytes(1)))
{ {
@@ -52,17 +56,36 @@ JobQueue::Coro::Coro(
inline JobQueue::Coro::~Coro() inline JobQueue::Coro::~Coro()
{ {
XRPL_ASSERT(
state_ != CoroState::Running,
"ripple::JobQueue::Coro::~Coro : is not running");
exiting_ = true;
// Resume the coroutine so that it has a chance to clean things up
if (state_ == CoroState::Suspended)
{
resume();
}
#ifndef NDEBUG #ifndef NDEBUG
XRPL_ASSERT(finished_, "ripple::JobQueue::Coro::~Coro : is finished"); XRPL_ASSERT(
state_ == CoroState::Finished,
"ripple::JobQueue::Coro::~Coro : is finished");
#endif #endif
} }
inline void inline void
JobQueue::Coro::yield() const JobQueue::Coro::yield()
{ {
{ {
std::lock_guard lock(jq_.m_mutex); std::lock_guard lock(jq_.m_mutex);
if (shouldStop())
{
return;
}
state_ = CoroState::Suspended;
++jq_.nSuspend_; ++jq_.nSuspend_;
jq_.m_suspendedCoros[this] = weak_from_this();
jq_.cv_.notify_all();
} }
(*yield_)(); (*yield_)();
} }
@@ -70,11 +93,6 @@ JobQueue::Coro::yield() const
inline bool inline bool
JobQueue::Coro::post() JobQueue::Coro::post()
{ {
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// sp keeps 'this' alive // sp keeps 'this' alive
if (jq_.addJob( if (jq_.addJob(
type_, name_, [this, sp = shared_from_this()]() { resume(); })) type_, name_, [this, sp = shared_from_this()]() { resume(); }))
@@ -82,9 +100,6 @@ JobQueue::Coro::post()
return true; return true;
} }
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all(); cv_.notify_all();
return false; return false;
} }
@@ -94,11 +109,17 @@ JobQueue::Coro::resume()
{ {
{ {
std::lock_guard lk(mutex_run_); std::lock_guard lk(mutex_run_);
running_ = true; if (state_ != CoroState::Suspended)
{
return;
}
state_ = CoroState::Running;
} }
{ {
std::lock_guard lock(jq_.m_mutex); std::lock_guard lock(jq_.m_mutex);
jq_.m_suspendedCoros.erase(this);
--jq_.nSuspend_; --jq_.nSuspend_;
jq_.cv_.notify_all();
} }
auto saved = detail::getLocalValues().release(); auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_); detail::getLocalValues().reset(&lvs_);
@@ -109,9 +130,6 @@ JobQueue::Coro::resume()
coro_(); coro_();
detail::getLocalValues().release(); detail::getLocalValues().release();
detail::getLocalValues().reset(saved); detail::getLocalValues().reset(saved);
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
} }
inline bool inline bool
@@ -120,32 +138,11 @@ JobQueue::Coro::runnable() const
return static_cast<bool>(coro_); return static_cast<bool>(coro_);
} }
inline void
JobQueue::Coro::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
// expectEarlyExit() must only ever be called from outside the
// Coro's stack. It you're inside the stack you can simply return
// and be done.
//
// That said, since we're outside the Coro's stack, we need to
// decrement the nSuspend that the Coro's call to yield caused.
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
}
inline void inline void
JobQueue::Coro::join() JobQueue::Coro::join()
{ {
std::unique_lock<std::mutex> lk(mutex_run_); std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; }); cv_.wait(lk, [this]() { return state_ != CoroState::Running; });
} }
} // namespace ripple } // namespace ripple

View File

@@ -60,20 +60,22 @@ public:
/** Coroutines must run to completion. */ /** Coroutines must run to completion. */
class Coro : public std::enable_shared_from_this<Coro> class Coro : public std::enable_shared_from_this<Coro>
{ {
friend class JobQueue;
private: private:
enum class CoroState { None, Suspended, Running, Finished };
std::atomic_bool exiting_ = false;
detail::LocalValues lvs_; detail::LocalValues lvs_;
JobQueue& jq_; JobQueue& jq_;
JobType type_; JobType type_;
std::string name_; std::string name_;
bool running_; std::atomic<CoroState> state_ = CoroState::None;
std::mutex mutex_; std::mutex mutex_;
std::mutex mutex_run_; std::mutex mutex_run_;
std::condition_variable cv_; std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_; boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_; boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
#ifndef NDEBUG
bool finished_ = false;
#endif
public: public:
// Private: Used in the implementation // Private: Used in the implementation
@@ -97,7 +99,7 @@ public:
post. post.
*/ */
void void
yield() const; yield();
/** Schedule coroutine execution. /** Schedule coroutine execution.
Effects: Effects:
@@ -131,13 +133,13 @@ public:
bool bool
runnable() const; runnable() const;
/** Once called, the Coro allows early exit without an assert. */
void
expectEarlyExit();
/** Waits until coroutine returns from the user function. */ /** Waits until coroutine returns from the user function. */
void void
join(); join();
/** Returns true if the coroutine should stop executing */
bool
shouldStop() const;
}; };
using JobFunction = std::function<void()>; using JobFunction = std::function<void()>;
@@ -167,6 +169,10 @@ public:
bool bool
addJob(JobType type, std::string const& name, JobHandler&& jobHandler) addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
{ {
if (!accepting_)
{
return false;
}
if (auto optionalCountedJob = if (auto optionalCountedJob =
jobCounter_.wrap(std::forward<JobHandler>(jobHandler))) jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
{ {
@@ -249,6 +255,7 @@ private:
std::uint64_t m_lastJob; std::uint64_t m_lastJob;
std::set<Job> m_jobSet; std::set<Job> m_jobSet;
JobCounter jobCounter_; JobCounter jobCounter_;
std::atomic_bool accepting_ = true;
std::atomic_bool stopping_{false}; std::atomic_bool stopping_{false};
std::atomic_bool stopped_{false}; std::atomic_bool stopped_{false};
JobDataMap m_jobData; JobDataMap m_jobData;
@@ -260,6 +267,8 @@ private:
// The number of suspended coroutines // The number of suspended coroutines
int nSuspend_ = 0; int nSuspend_ = 0;
std::map<void*, std::weak_ptr<Coro>> m_suspendedCoros;
Workers m_workers; Workers m_workers;
// Statistics tracking // Statistics tracking
@@ -270,6 +279,25 @@ private:
std::condition_variable cv_; std::condition_variable cv_;
void
onStopResumeCoros(std::map<void*, std::weak_ptr<Coro>>& coros)
{
for (auto& [_, coro] : coros)
{
if (auto coroPtr = coro.lock())
{
if (auto optionalCountedJob =
jobCounter_.wrap([=]() { coroPtr->resume(); }))
{
addRefCountedJob(
coroPtr->type_,
coroPtr->name_,
std::move(*optionalCountedJob));
}
}
}
}
void void
collect(); collect();
JobTypeData& JobTypeData&
@@ -412,6 +440,10 @@ template <class F>
std::shared_ptr<JobQueue::Coro> std::shared_ptr<JobQueue::Coro>
JobQueue::postCoro(JobType t, std::string const& name, F&& f) JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{ {
if (!accepting_)
{
return nullptr;
}
/* First param is a detail type to make construction private. /* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<Coro>). void(std::shared_ptr<Coro>).
@@ -422,7 +454,6 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{ {
// The Coro was not successfully posted. Disable it so it's destructor // The Coro was not successfully posted. Disable it so it's destructor
// can run with no negative side effects. Then destroy it. // can run with no negative side effects. Then destroy it.
coro->expectEarlyExit();
coro.reset(); coro.reset();
} }
return coro; return coro;

View File

@@ -26,6 +26,12 @@
namespace ripple { namespace ripple {
bool
JobQueue::Coro::shouldStop() const
{
return jq_.stopping_ || jq_.stopped_ || !jq_.accepting_ || exiting_;
}
JobQueue::JobQueue( JobQueue::JobQueue(
int threadCount, int threadCount,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
@@ -295,6 +301,22 @@ JobQueue::getJobTypeData(JobType type)
void void
JobQueue::stop() JobQueue::stop()
{ {
// Once we stop accepting new jobs, all running coroutines won't be able to
// get suspended and yield() will return immediately, so we can safely
// move m_suspendedCoros, and we can assume that no coroutine will be
// suspended in the future.
std::map<void*, std::weak_ptr<Coro>> suspendedCoros;
{
std::unique_lock lock(m_mutex);
accepting_ = false;
suspendedCoros = std::move(m_suspendedCoros);
}
if (!suspendedCoros.empty())
{
// We should resume the suspended coroutines so that the coroutines
// get a chance to exit cleanly.
onStopResumeCoros(suspendedCoros);
}
stopping_ = true; stopping_ = true;
using namespace std::chrono_literals; using namespace std::chrono_literals;
jobCounter_.join("JobQueue", 1s, m_journal); jobCounter_.join("JobQueue", 1s, m_journal);
@@ -305,8 +327,9 @@ JobQueue::stop()
// `Job::doJob` and the return of `JobQueue::processTask`. That is why // `Job::doJob` and the return of `JobQueue::processTask`. That is why
// we must wait on the condition variable to make these assertions. // we must wait on the condition variable to make these assertions.
std::unique_lock<std::mutex> lock(m_mutex); std::unique_lock<std::mutex> lock(m_mutex);
cv_.wait( cv_.wait(lock, [this] {
lock, [this] { return m_processCount == 0 && m_jobSet.empty(); }); return m_processCount == 0 && nSuspend_ == 0 && m_jobSet.empty();
});
XRPL_ASSERT( XRPL_ASSERT(
m_processCount == 0, m_processCount == 0,
"ripple::JobQueue::stop : all processes completed"); "ripple::JobQueue::stop : all processes completed");

View File

@@ -128,21 +128,17 @@ doRipplePathFind(RPC::JsonContext& context)
// May 2017 // May 2017
jvResult = context.app.getPathRequests().makeLegacyPathRequest( jvResult = context.app.getPathRequests().makeLegacyPathRequest(
request, request,
[&context]() { [coro = context.coro]() {
// Copying the shared_ptr keeps the coroutine alive up // Capturing the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the // through the return. Otherwise the storage under the
// captured reference could evaporate when we return from // captured reference could evaporate when we return from
// coroCopy->resume(). This is not strictly necessary, but // coro->post().
// will make maintenance easier. // When post() failed, we won't get a thread to let
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro}; // the Coro finish. We should ignore the coroutine and
if (!coroCopy->post()) // let it destruct, as the JobQueu has been signaled to
{ // close, and resuming it manually messes up the internal
// The post() failed, so we won't get a thread to let // state in JobQueue.
// the Coro finish. We'll call Coro::resume() so the coro->post();
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
}
}, },
context.consumer, context.consumer,
lpLedger, lpLedger,
@@ -150,6 +146,14 @@ doRipplePathFind(RPC::JsonContext& context)
if (request) if (request)
{ {
context.coro->yield(); context.coro->yield();
// Each time after we resume from yield(), we should
// check if cancellation has been requested. It would
// be a lot more elegant if we replace boost coroutine
// with c++ standard coroutine.
if (context.coro->shouldStop())
{
return jvResult;
}
jvResult = request->doStatus(context.params); jvResult = request->doStatus(context.params);
} }