Fix multithreading bugs

Signed-off-by: JCW <a1q123456@users.noreply.github.com>
This commit is contained in:
JCW
2025-09-18 23:00:35 +01:00
parent 671436c033
commit ab52fde56e
4 changed files with 22 additions and 20 deletions

View File

@@ -101,12 +101,21 @@ public:
}));
gate g;
env.app().getJobQueue().postCoro(
gate gStart;
auto coro = env.app().getJobQueue().postCoro(
jtCLIENT, "Coroutine-Test", [&](auto const& c) {
c->post();
gStart.signal();
c->yield();
g.signal();
});
// Wait for the coroutine to start.
BEAST_EXPECT(gStart.wait_for(5s));
BEAST_EXPECT(coro->state() == JobQueue::Coro::CoroState::Suspended);
// Post the coroutine.
coro->post();
BEAST_EXPECT(g.wait_for(5s));
}

View File

@@ -118,12 +118,12 @@ class JobQueue_test : public beast::unit_test::suite
return;
}
// Wait for the Job to run and yield.
coro->join();
while (yieldCount == 0)
; // We should wait for the job to start and yield
// Wait for the Job to run and yield.
coro->join();
// Now resume until the Coro says it is done.
int old = yieldCount;
while (coro->runnable())

View File

@@ -48,7 +48,6 @@ JobQueue::Coro::Coro(
fn(self);
}
{
std::lock_guard stateLock{mutex_run_};
state_ = CoroState::Finished;
cv_.notify_all();
}
@@ -59,7 +58,6 @@ JobQueue::Coro::Coro(
inline JobQueue::Coro::~Coro()
{
std::unique_lock stateLock{mutex_run_};
XRPL_ASSERT(
state_ != CoroState::Running,
"ripple::JobQueue::Coro::~Coro : is not running");
@@ -67,9 +65,7 @@ inline JobQueue::Coro::~Coro()
// Resume the coroutine so that it has a chance to clean things up
if (state_ == CoroState::Suspended)
{
stateLock.unlock();
resume();
stateLock.lock();
}
#ifndef NDEBUG
@@ -102,12 +98,7 @@ JobQueue::Coro::yield()
inline bool
JobQueue::Coro::post()
{
#if !defined(NDEBUG)
{
std::lock_guard lk(mutex_run_);
XRPL_ASSERT(state_ == CoroState::Suspended, "JobQueue::Coro::post: coroutine should be suspended!");
}
#endif
XRPL_ASSERT(state_ == CoroState::Suspended, "JobQueue::Coro::post: coroutine should be suspended!");
// sp keeps 'this' alive
if (jq_.addJob(
@@ -124,7 +115,6 @@ inline void
JobQueue::Coro::resume()
{
{
std::lock_guard lk(mutex_run_);
if (state_ != CoroState::Suspended)
{
return;
@@ -152,7 +142,6 @@ JobQueue::Coro::resume()
inline bool
JobQueue::Coro::runnable() const
{
std::unique_lock<std::mutex> lk(mutex_run_);
// There's an edge case where the coroutine has updated the status
// to Finished but the function hasn't exited and therefore, coro_ is
// still valid. However, the coroutine is not technically runnable in this

View File

@@ -62,17 +62,18 @@ public:
{
friend class JobQueue;
private:
public:
enum class CoroState { None, Suspended, Running, Finished };
private:
std::atomic_bool exiting_ = false;
detail::LocalValues lvs_;
JobQueue& jq_;
JobType type_;
std::string name_;
CoroState state_ = CoroState::None;
std::atomic<CoroState> state_ = CoroState::None;
std::mutex mutex_;
mutable std::mutex mutex_run_;
std::mutex mutex_run_;
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
@@ -129,6 +130,9 @@ public:
void
resume();
CoroState
state() const { return state_; }
/** Returns true if the Coro is still runnable (has not returned). */
bool
runnable() const;