Fix multithreading bugs

Signed-off-by: JCW <a1q123456@users.noreply.github.com>
This commit is contained in:
JCW
2025-09-18 22:29:00 +01:00
parent a70e60e0d8
commit 5f3b3a6a1e
3 changed files with 33 additions and 7 deletions

View File

@@ -121,6 +121,9 @@ class JobQueue_test : public beast::unit_test::suite
// Wait for the Job to run and yield. // Wait for the Job to run and yield.
coro->join(); coro->join();
while (yieldCount == 0)
; // We should wait for the job to start and yield
// Now resume until the Coro says it is done. // Now resume until the Coro says it is done.
int old = yieldCount; int old = yieldCount;
while (coro->runnable()) while (coro->runnable())

View File

@@ -47,8 +47,11 @@ JobQueue::Coro::Coro(
self = shared_from_this(); self = shared_from_this();
fn(self); fn(self);
} }
{
std::lock_guard stateLock{mutex_run_};
state_ = CoroState::Finished; state_ = CoroState::Finished;
cv_.notify_all(); cv_.notify_all();
}
}, },
boost::coroutines::attributes(megabytes(1))) boost::coroutines::attributes(megabytes(1)))
{ {
@@ -56,6 +59,7 @@ JobQueue::Coro::Coro(
inline JobQueue::Coro::~Coro() inline JobQueue::Coro::~Coro()
{ {
std::unique_lock stateLock{mutex_run_};
XRPL_ASSERT( XRPL_ASSERT(
state_ != CoroState::Running, state_ != CoroState::Running,
"ripple::JobQueue::Coro::~Coro : is not running"); "ripple::JobQueue::Coro::~Coro : is not running");
@@ -63,7 +67,9 @@ inline JobQueue::Coro::~Coro()
// Resume the coroutine so that it has a chance to clean things up // Resume the coroutine so that it has a chance to clean things up
if (state_ == CoroState::Suspended) if (state_ == CoroState::Suspended)
{ {
stateLock.unlock();
resume(); resume();
stateLock.lock();
} }
#ifndef NDEBUG #ifndef NDEBUG
@@ -79,10 +85,13 @@ JobQueue::Coro::yield()
{ {
std::lock_guard lock(jq_.m_mutex); std::lock_guard lock(jq_.m_mutex);
if (shouldStop()) if (shouldStop())
{
return; return;
}
{
std::lock_guard stateLock{mutex_run_};
state_ = CoroState::Suspended; state_ = CoroState::Suspended;
cv_.notify_all();
}
++jq_.nSuspend_; ++jq_.nSuspend_;
jq_.m_suspendedCoros[this] = weak_from_this(); jq_.m_suspendedCoros[this] = weak_from_this();
jq_.cv_.notify_all(); jq_.cv_.notify_all();
@@ -93,6 +102,13 @@ JobQueue::Coro::yield()
inline bool inline bool
JobQueue::Coro::post() JobQueue::Coro::post()
{ {
#if !defined(NDEBUG)
{
std::lock_guard lk(mutex_run_);
XRPL_ASSERT(state_ == CoroState::Suspended, "JobQueue::Coro::post: coroutine should be suspended!");
}
#endif
// sp keeps 'this' alive // sp keeps 'this' alive
if (jq_.addJob( if (jq_.addJob(
type_, name_, [this, sp = shared_from_this()]() { resume(); })) type_, name_, [this, sp = shared_from_this()]() { resume(); }))
@@ -114,6 +130,7 @@ JobQueue::Coro::resume()
return; return;
} }
state_ = CoroState::Running; state_ = CoroState::Running;
cv_.notify_all();
} }
{ {
std::lock_guard lock(jq_.m_mutex); std::lock_guard lock(jq_.m_mutex);
@@ -135,7 +152,13 @@ JobQueue::Coro::resume()
inline bool inline bool
JobQueue::Coro::runnable() const JobQueue::Coro::runnable() const
{ {
return static_cast<bool>(coro_); std::unique_lock<std::mutex> lk(mutex_run_);
// There's an edge case where the coroutine has updated the status
// to Finished but the function hasn't exited and therefore, coro_ is
// still valid. However, the coroutine is not technically runnable in this
// case, because the coroutine is about to exit and static_cast<bool>(coro_)
// is going to be false.
return static_cast<bool>(coro_) && state_ != CoroState::Finished;
} }
inline void inline void

View File

@@ -70,9 +70,9 @@ public:
JobQueue& jq_; JobQueue& jq_;
JobType type_; JobType type_;
std::string name_; std::string name_;
std::atomic<CoroState> state_ = CoroState::None; CoroState state_ = CoroState::None;
std::mutex mutex_; std::mutex mutex_;
std::mutex mutex_run_; mutable std::mutex mutex_run_;
std::condition_variable cv_; std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_; boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_; boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;