Fix join() race in CoroTaskRunner by replacing bool with counter

When post() is called from within the coroutine body (via JobQueueAwaiter),
two resume operations can overlap: R1 is still running while R2 is queued.
With a boolean running_ flag, R1's cleanup (running_=false) clobbers R2's
pending state, causing join() to return prematurely on ARM64.

Replace bool running_ with int runCount_: post() increments before enqueue,
resume() decrements after completion. join() waits for runCount_==0.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-02-28 13:47:04 +00:00
parent 926d8128af
commit 363870eb34
2 changed files with 45 additions and 38 deletions

View File

@@ -23,7 +23,7 @@
* . (externally or by JobQueueAwaiter)
* .
* +--- (caller calls) -----> post()
* | running_ = true
* | ++runCount_
* | addJob(resume) ----------> job enqueued
* | |
* | [worker picks up]
@@ -36,7 +36,7 @@
* | [coroutine body continues here]
* | |
* | swap out LocalValues
* | running_ = false
* | --runCount_
* | cv_.notify_all()
* v
*
@@ -45,16 +45,17 @@
* - mutex_ : guards task_.handle().resume() so that post()-before-suspend
* races cannot resume the coroutine while it is still running.
* (See the race condition discussion in JobQueue.h)
* - mutex_run_ : guards running_ flag; used by join() to wait for completion.
* - mutex_run_ : guards runCount_ counter; used by join() to wait until
* all in-flight resume operations complete.
* - jq_.m_mutex: guards nSuspend_ increments/decrements.
*
* Common Mistakes When Modifying This File
* =========================================
*
* 1. Changing lock ordering.
* resume() acquires locks in this order: mutex_run_ -> jq_.m_mutex -> mutex_.
* Acquiring them in a different order WILL deadlock. Any new code path
* that touches these mutexes must follow the same order.
* resume() acquires locks in this order: jq_.m_mutex -> mutex_ -> mutex_run_.
* post() acquires only mutex_run_. Any new code path that touches these
* mutexes must follow the same order to avoid deadlocks.
*
* 2. Removing the shared_from_this() capture in post().
* The lambda passed to addJob captures [this, sp = shared_from_this()].
@@ -81,7 +82,7 @@
namespace xrpl {
/**
* Construct a CoroTaskRunner. Sets running_ to false; does not
* Construct a CoroTaskRunner. Sets runCount_ to 0; does not
* create the coroutine. Call init() afterwards.
*
* @param jq The JobQueue this coroutine will run on
@@ -93,7 +94,7 @@ inline JobQueue::CoroTaskRunner::CoroTaskRunner(
JobQueue& jq,
JobType type,
std::string const& name)
: jq_(jq), type_(type), name_(name), running_(false)
: jq_(jq), type_(type), name_(name), runCount_(0)
{
}
@@ -215,7 +216,7 @@ JobQueue::CoroTaskRunner::post()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
++runCount_;
}
// sp prevents 'this' from being destroyed while the job is pending
@@ -224,9 +225,9 @@ JobQueue::CoroTaskRunner::post()
return true;
}
// The coroutine will not run. Clean up running_.
// The coroutine will not run. Undo the runCount_ increment.
std::lock_guard lk(mutex_run_);
running_ = false;
--runCount_;
cv_.notify_all();
return false;
}
@@ -235,20 +236,18 @@ JobQueue::CoroTaskRunner::post()
* Resume the coroutine on the current thread.
*
* Steps:
* 1. Set running_ = true (under mutex_run_)
* 2. Decrement nSuspend_ (under jq_.m_mutex)
* 3. Swap in this coroutine's LocalValues for thread-local isolation
* 4. Resume the coroutine handle (under mutex_)
* 5. Swap out LocalValues, restoring the thread's previous state
* 6. Set running_ = false and notify join() waiters
* 1. Decrement nSuspend_ (under jq_.m_mutex)
* 2. Swap in this coroutine's LocalValues for thread-local isolation
* 3. Resume the coroutine handle (under mutex_)
* 4. Swap out LocalValues, restoring the thread's previous state
* 5. Decrement runCount_ and notify join() waiters
*
* Note: runCount_ is NOT incremented here — post() already did that.
* This ensures join() stays blocked for the entire post→resume lifetime.
*/
inline void
JobQueue::CoroTaskRunner::resume()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
@@ -273,7 +272,7 @@ JobQueue::CoroTaskRunner::resume()
[[maybe_unused]] auto completed = std::move(task_);
}
std::lock_guard lk(mutex_run_);
running_ = false;
--runCount_;
cv_.notify_all();
}
@@ -317,14 +316,14 @@ JobQueue::CoroTaskRunner::expectEarlyExit()
}
/**
* Block until the coroutine finishes its current execution slice.
* Uses cv_ + mutex_run_ to wait until running_ == false.
* Block until all pending/active resume operations complete.
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0.
*/
inline void
JobQueue::CoroTaskRunner::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
cv_.wait(lk, [this]() { return runCount_ == 0; });
}
} // namespace xrpl

View File

@@ -136,7 +136,7 @@ public:
* | - jq_ : JobQueue& |
* | - type_ : JobType |
* | - name_ : std::string |
* | - running_ : bool |
* | - runCount_ : int (in-flight resumes) |
* | - mutex_ : std::mutex (coroutine guard) |
* | - mutex_run_ : std::mutex (join guard) |
* | - cv_ : condition_variable |
@@ -177,10 +177,10 @@ public:
* | +-- task_ = f(shared_from_this())
* | [coroutine created, suspended at initial_suspend]
* +-- post()
* | +-- ++runCount_
* | +-- addJob(type_, [resume]{})
* | resume()
* | |
* | +-- running_ = true
* | +-- --nSuspend_
* | +-- swap in LocalValues
* | +-- task_.handle().resume()
@@ -190,19 +190,20 @@ public:
* | | +-- ++nSuspend_
* | | [coroutine suspends]
* | +-- swap out LocalValues
* | +-- running_ = false
* | +-- --runCount_
* | +-- cv_.notify_all()
* |
* post() <-- called externally or by JobQueueAwaiter
* +-- ++runCount_
* +-- addJob(type_, [resume]{})
* resume()
* |
* +-- [coroutine body continues]
* +-- co_return
* +-- running_ = false
* +-- --runCount_
* +-- cv_.notify_all()
* join()
* +-- cv_.wait([]{!running_})
* +-- cv_.wait([]{runCount_ == 0})
* +-- [done]
*
* Usage Examples
@@ -327,9 +328,16 @@ public:
// Human-readable name for this coroutine job (for logging).
std::string name_;
// True while the coroutine is actively executing on a thread.
// Guarded by mutex_run_. join() blocks until this is false.
bool running_;
// Number of in-flight resume operations (pending + active).
// Incremented by post(), decremented when resume() finishes.
// Guarded by mutex_run_. join() blocks until this reaches 0.
//
// A counter (not a bool) is needed because post() can be called
// from within the coroutine body (e.g. via JobQueueAwaiter),
// enqueuing a second resume while the first is still running.
// A bool would be clobbered: R2.post() sets true, then R1's
// cleanup sets false — losing the fact that R2 is still pending.
int runCount_;
// Guards task_.handle().resume() to prevent the coroutine from
// running on two threads simultaneously. Handles the race where
@@ -337,11 +345,11 @@ public:
// suspended (post-before-suspend pattern).
std::mutex mutex_;
// Guards running_ flag. Used with cv_ for join() to wait
// until the coroutine finishes its current execution slice.
// Guards runCount_. Used with cv_ for join() to wait
// until all pending/active resume operations complete.
std::mutex mutex_run_;
// Notified when running_ transitions to false, allowing
// Notified when runCount_ reaches zero, allowing
// join() waiters to wake up.
std::condition_variable cv_;
@@ -494,8 +502,8 @@ public:
expectEarlyExit();
/**
* Block until the coroutine finishes its current execution slice.
* Uses cv_ + mutex_run_ to wait until running_ == false.
* Block until all pending/active resume operations complete.
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0.
* Warning: deadlocks if the coroutine is suspended and never posted.
*/
void