diff --git a/include/xrpl/core/CoroTaskRunner.ipp b/include/xrpl/core/CoroTaskRunner.ipp index a5813473dd..49f4f88c14 100644 --- a/include/xrpl/core/CoroTaskRunner.ipp +++ b/include/xrpl/core/CoroTaskRunner.ipp @@ -23,7 +23,7 @@ * . (externally or by JobQueueAwaiter) * . * +--- (caller calls) -----> post() - * | running_ = true + * | ++runCount_ * | addJob(resume) ----------> job enqueued * | | * | [worker picks up] @@ -36,7 +36,7 @@ * | [coroutine body continues here] * | | * | swap out LocalValues - * | running_ = false + * | --runCount_ * | cv_.notify_all() * v * @@ -45,16 +45,17 @@ * - mutex_ : guards task_.handle().resume() so that post()-before-suspend * races cannot resume the coroutine while it is still running. * (See the race condition discussion in JobQueue.h) - * - mutex_run_ : guards running_ flag; used by join() to wait for completion. + * - mutex_run_ : guards runCount_ counter; used by join() to wait until + * all in-flight resume operations complete. * - jq_.m_mutex: guards nSuspend_ increments/decrements. * * Common Mistakes When Modifying This File * ========================================= * * 1. Changing lock ordering. - * resume() acquires locks in this order: mutex_run_ -> jq_.m_mutex -> mutex_. - * Acquiring them in a different order WILL deadlock. Any new code path - * that touches these mutexes must follow the same order. + * resume() acquires locks in this order: jq_.m_mutex -> mutex_ -> mutex_run_. + * post() acquires only mutex_run_. Any new code path that touches these + * mutexes must follow the same order to avoid deadlocks. * * 2. Removing the shared_from_this() capture in post(). * The lambda passed to addJob captures [this, sp = shared_from_this()]. @@ -81,7 +82,7 @@ namespace xrpl { /** - * Construct a CoroTaskRunner. Sets running_ to false; does not + * Construct a CoroTaskRunner. Sets runCount_ to 0; does not * create the coroutine. Call init() afterwards. * * @param jq The JobQueue this coroutine will run on @@ -93,7 +94,7 @@ inline JobQueue::CoroTaskRunner::CoroTaskRunner( JobQueue& jq, JobType type, std::string const& name) - : jq_(jq), type_(type), name_(name), running_(false) + : jq_(jq), type_(type), name_(name), runCount_(0) { } @@ -215,7 +216,7 @@ JobQueue::CoroTaskRunner::post() { { std::lock_guard lk(mutex_run_); - running_ = true; + ++runCount_; } // sp prevents 'this' from being destroyed while the job is pending @@ -224,9 +225,9 @@ JobQueue::CoroTaskRunner::post() return true; } - // The coroutine will not run. Clean up running_. + // The coroutine will not run. Undo the runCount_ increment. std::lock_guard lk(mutex_run_); - running_ = false; + --runCount_; cv_.notify_all(); return false; } @@ -235,20 +236,18 @@ JobQueue::CoroTaskRunner::post() * Resume the coroutine on the current thread. * * Steps: - * 1. Set running_ = true (under mutex_run_) - * 2. Decrement nSuspend_ (under jq_.m_mutex) - * 3. Swap in this coroutine's LocalValues for thread-local isolation - * 4. Resume the coroutine handle (under mutex_) - * 5. Swap out LocalValues, restoring the thread's previous state - * 6. Set running_ = false and notify join() waiters + * 1. Decrement nSuspend_ (under jq_.m_mutex) + * 2. Swap in this coroutine's LocalValues for thread-local isolation + * 3. Resume the coroutine handle (under mutex_) + * 4. Swap out LocalValues, restoring the thread's previous state + * 5. Decrement runCount_ and notify join() waiters + * + * Note: runCount_ is NOT incremented here — post() already did that. + * This ensures join() stays blocked for the entire post→resume lifetime. */ inline void JobQueue::CoroTaskRunner::resume() { - { - std::lock_guard lk(mutex_run_); - running_ = true; - } { std::lock_guard lock(jq_.m_mutex); --jq_.nSuspend_; @@ -273,7 +272,7 @@ JobQueue::CoroTaskRunner::resume() [[maybe_unused]] auto completed = std::move(task_); } std::lock_guard lk(mutex_run_); - running_ = false; + --runCount_; cv_.notify_all(); } @@ -317,14 +316,14 @@ JobQueue::CoroTaskRunner::expectEarlyExit() } /** - * Block until the coroutine finishes its current execution slice. - * Uses cv_ + mutex_run_ to wait until running_ == false. + * Block until all pending/active resume operations complete. + * Uses cv_ + mutex_run_ to wait until runCount_ reaches 0. */ inline void JobQueue::CoroTaskRunner::join() { std::unique_lock lk(mutex_run_); - cv_.wait(lk, [this]() { return running_ == false; }); + cv_.wait(lk, [this]() { return runCount_ == 0; }); } } // namespace xrpl diff --git a/include/xrpl/core/JobQueue.h b/include/xrpl/core/JobQueue.h index c895a61094..00f393e71a 100644 --- a/include/xrpl/core/JobQueue.h +++ b/include/xrpl/core/JobQueue.h @@ -136,7 +136,7 @@ public: * | - jq_ : JobQueue& | * | - type_ : JobType | * | - name_ : std::string | - * | - running_ : bool | + * | - runCount_ : int (in-flight resumes) | * | - mutex_ : std::mutex (coroutine guard) | * | - mutex_run_ : std::mutex (join guard) | * | - cv_ : condition_variable | @@ -177,10 +177,10 @@ public: * | +-- task_ = f(shared_from_this()) * | [coroutine created, suspended at initial_suspend] * +-- post() + * | +-- ++runCount_ * | +-- addJob(type_, [resume]{}) * | resume() * | | - * | +-- running_ = true * | +-- --nSuspend_ * | +-- swap in LocalValues * | +-- task_.handle().resume() @@ -190,19 +190,20 @@ public: * | | +-- ++nSuspend_ * | | [coroutine suspends] * | +-- swap out LocalValues - * | +-- running_ = false + * | +-- --runCount_ * | +-- cv_.notify_all() * | * post() <-- called externally or by JobQueueAwaiter + * +-- ++runCount_ * +-- addJob(type_, [resume]{}) * resume() * | * +-- [coroutine body continues] * +-- co_return - * +-- running_ = false + * +-- --runCount_ * +-- cv_.notify_all() * join() - * +-- cv_.wait([]{!running_}) + * +-- cv_.wait([]{runCount_ == 0}) * +-- [done] * * Usage Examples @@ -327,9 +328,16 @@ public: // Human-readable name for this coroutine job (for logging). std::string name_; - // True while the coroutine is actively executing on a thread. - // Guarded by mutex_run_. join() blocks until this is false. - bool running_; + // Number of in-flight resume operations (pending + active). + // Incremented by post(), decremented when resume() finishes. + // Guarded by mutex_run_. join() blocks until this reaches 0. + // + // A counter (not a bool) is needed because post() can be called + // from within the coroutine body (e.g. via JobQueueAwaiter), + // enqueuing a second resume while the first is still running. + // A bool would be clobbered: R2.post() sets true, then R1's + // cleanup sets false — losing the fact that R2 is still pending. + int runCount_; // Guards task_.handle().resume() to prevent the coroutine from // running on two threads simultaneously. Handles the race where @@ -337,11 +345,11 @@ public: // suspended (post-before-suspend pattern). std::mutex mutex_; - // Guards running_ flag. Used with cv_ for join() to wait - // until the coroutine finishes its current execution slice. + // Guards runCount_. Used with cv_ for join() to wait + // until all pending/active resume operations complete. std::mutex mutex_run_; - // Notified when running_ transitions to false, allowing + // Notified when runCount_ reaches zero, allowing // join() waiters to wake up. std::condition_variable cv_; @@ -494,8 +502,8 @@ public: expectEarlyExit(); /** - * Block until the coroutine finishes its current execution slice. - * Uses cv_ + mutex_run_ to wait until running_ == false. + * Block until all pending/active resume operations complete. + * Uses cv_ + mutex_run_ to wait until runCount_ reaches 0. * Warning: deadlocks if the coroutine is suspended and never posted. */ void