From ab52fde56e4a3b783af5893977e737d4743dce84 Mon Sep 17 00:00:00 2001 From: JCW Date: Thu, 18 Sep 2025 23:00:35 +0100 Subject: [PATCH] Fix multithreading bugs Signed-off-by: JCW --- src/test/core/Coroutine_test.cpp | 13 +++++++++++-- src/test/core/JobQueue_test.cpp | 6 +++--- src/xrpld/core/Coro.ipp | 13 +------------ src/xrpld/core/JobQueue.h | 10 +++++++--- 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/test/core/Coroutine_test.cpp b/src/test/core/Coroutine_test.cpp index 6db3f7ee17..2dca74f811 100644 --- a/src/test/core/Coroutine_test.cpp +++ b/src/test/core/Coroutine_test.cpp @@ -101,12 +101,21 @@ public: })); gate g; - env.app().getJobQueue().postCoro( + gate gStart; + auto coro = env.app().getJobQueue().postCoro( jtCLIENT, "Coroutine-Test", [&](auto const& c) { - c->post(); + gStart.signal(); c->yield(); g.signal(); }); + + // Wait for the coroutine to start. + BEAST_EXPECT(gStart.wait_for(5s)); + + BEAST_EXPECT(coro->state() == JobQueue::Coro::CoroState::Suspended); + // Post the coroutine. + coro->post(); + BEAST_EXPECT(g.wait_for(5s)); } diff --git a/src/test/core/JobQueue_test.cpp b/src/test/core/JobQueue_test.cpp index d0b8661714..66d24bc4c9 100644 --- a/src/test/core/JobQueue_test.cpp +++ b/src/test/core/JobQueue_test.cpp @@ -118,12 +118,12 @@ class JobQueue_test : public beast::unit_test::suite return; } - // Wait for the Job to run and yield. - coro->join(); - while (yieldCount == 0) ; // We should wait for the job to start and yield + // Wait for the Job to run and yield. + coro->join(); + // Now resume until the Coro says it is done. int old = yieldCount; while (coro->runnable()) diff --git a/src/xrpld/core/Coro.ipp b/src/xrpld/core/Coro.ipp index 19ae6cc9bf..1c2eac64a6 100644 --- a/src/xrpld/core/Coro.ipp +++ b/src/xrpld/core/Coro.ipp @@ -48,7 +48,6 @@ JobQueue::Coro::Coro( fn(self); } { - std::lock_guard stateLock{mutex_run_}; state_ = CoroState::Finished; cv_.notify_all(); } @@ -59,7 +58,6 @@ JobQueue::Coro::Coro( inline JobQueue::Coro::~Coro() { - std::unique_lock stateLock{mutex_run_}; XRPL_ASSERT( state_ != CoroState::Running, "ripple::JobQueue::Coro::~Coro : is not running"); @@ -67,9 +65,7 @@ inline JobQueue::Coro::~Coro() // Resume the coroutine so that it has a chance to clean things up if (state_ == CoroState::Suspended) { - stateLock.unlock(); resume(); - stateLock.lock(); } #ifndef NDEBUG @@ -102,12 +98,7 @@ JobQueue::Coro::yield() inline bool JobQueue::Coro::post() { -#if !defined(NDEBUG) - { - std::lock_guard lk(mutex_run_); - XRPL_ASSERT(state_ == CoroState::Suspended, "JobQueue::Coro::post: coroutine should be suspended!"); - } -#endif + XRPL_ASSERT(state_ == CoroState::Suspended, "JobQueue::Coro::post: coroutine should be suspended!"); // sp keeps 'this' alive if (jq_.addJob( @@ -124,7 +115,6 @@ inline void JobQueue::Coro::resume() { { - std::lock_guard lk(mutex_run_); if (state_ != CoroState::Suspended) { return; @@ -152,7 +142,6 @@ JobQueue::Coro::resume() inline bool JobQueue::Coro::runnable() const { - std::unique_lock lk(mutex_run_); // There's an edge case where the coroutine has updated the status // to Finished but the function hasn't exited and therefore, coro_ is // still valid. However, the coroutine is not technically runnable in this diff --git a/src/xrpld/core/JobQueue.h b/src/xrpld/core/JobQueue.h index 43b5fd9d5a..6f5b7d95c4 100644 --- a/src/xrpld/core/JobQueue.h +++ b/src/xrpld/core/JobQueue.h @@ -62,17 +62,18 @@ public: { friend class JobQueue; - private: + public: enum class CoroState { None, Suspended, Running, Finished }; + private: std::atomic_bool exiting_ = false; detail::LocalValues lvs_; JobQueue& jq_; JobType type_; std::string name_; - CoroState state_ = CoroState::None; + std::atomic state_ = CoroState::None; std::mutex mutex_; - mutable std::mutex mutex_run_; + std::mutex mutex_run_; std::condition_variable cv_; boost::coroutines::asymmetric_coroutine::pull_type coro_; boost::coroutines::asymmetric_coroutine::push_type* yield_; @@ -129,6 +130,9 @@ public: void resume(); + CoroState + state() const { return state_; } + /** Returns true if the Coro is still runnable (has not returned). */ bool runnable() const;