Fix the bug

Signed-off-by: JCW <a1q123456@users.noreply.github.com>
This commit is contained in:
JCW
2025-09-16 16:14:43 +01:00
parent 32a3f0a867
commit a1f6580e54
4 changed files with 60 additions and 25 deletions

View File

@@ -212,6 +212,30 @@ public:
BEAST_EXPECT(shouldStop.has_value() && *shouldStop == true);
}
void
coroutineGetsDestroyedBeforeExecuting()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("Coroutine gets destroyed before executing");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
{
auto coro = std::make_shared<JobQueue::Coro>(
Coro_create_t{}, env.app().getJobQueue(), JobType::jtCLIENT, "test", [](auto coro) {
});
}
pass();
}
void
run() override
{
@@ -219,6 +243,7 @@ public:
incorrect_order();
thread_specific_storage();
stopJobQueueWhenCoroutineSuspended();
coroutineGetsDestroyedBeforeExecuting();
}
};

View File

@@ -34,20 +34,21 @@ JobQueue::Coro::Coro(
: jq_(jq)
, type_(type)
, name_(name)
, running_(false)
, coro_(
[this, fn = std::forward<F>(f)](
boost::coroutines::asymmetric_coroutine<void>::push_type&
do_yield) {
yield_ = &do_yield;
yield();
// self makes Coro alive until this function returns
std::shared_ptr<Coro> self;
if (!shouldStop())
{
fn(shared_from_this());
self = shared_from_this();
fn(self);
}
#ifndef NDEBUG
finished_ = true;
#endif
state_ = CoroState::Finished;
cv_.notify_all();
},
boost::coroutines::attributes(megabytes(1)))
{
@@ -55,8 +56,16 @@ JobQueue::Coro::Coro(
inline JobQueue::Coro::~Coro()
{
XRPL_ASSERT(state_ != CoroState::Running, "ripple::JobQueue::Coro::~Coro : is not running");
exiting_ = true;
// Resume the coroutine so that it has a chance to clean things up
if (state_ == CoroState::Suspended)
{
resume();
}
#ifndef NDEBUG
XRPL_ASSERT(finished_, "ripple::JobQueue::Coro::~Coro : is finished");
XRPL_ASSERT(state_ == CoroState::Finished, "ripple::JobQueue::Coro::~Coro : is finished");
#endif
}
@@ -69,6 +78,7 @@ JobQueue::Coro::yield()
{
return;
}
state_ = CoroState::Suspended;
++jq_.nSuspend_;
jq_.m_suspendedCoros[this] = weak_from_this();
jq_.cv_.notify_all();
@@ -79,11 +89,6 @@ JobQueue::Coro::yield()
inline bool
JobQueue::Coro::post()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// sp keeps 'this' alive
if (jq_.addJob(
type_, name_, [this, sp = shared_from_this()]() { resume(); }))
@@ -91,9 +96,6 @@ JobQueue::Coro::post()
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
@@ -103,12 +105,16 @@ JobQueue::Coro::resume()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
if (state_ != CoroState::Suspended)
{
return;
}
state_ = CoroState::Running;
}
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
jq_.m_suspendedCoros.erase(this);
--jq_.nSuspend_;
jq_.cv_.notify_all();
}
auto saved = detail::getLocalValues().release();
@@ -120,9 +126,6 @@ JobQueue::Coro::resume()
coro_();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
inline bool
@@ -135,7 +138,7 @@ inline void
JobQueue::Coro::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
cv_.wait(lk, [this]() { return state_ != CoroState::Running; });
}
} // namespace ripple

View File

@@ -63,19 +63,26 @@ public:
friend class JobQueue;
private:
enum class CoroState
{
None,
Suspended,
Running,
Finished
};
detail::LocalValues lvs_;
JobQueue& jq_;
JobType type_;
std::string name_;
bool running_;
std::atomic<CoroState> state_ = CoroState::None;
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
#ifndef NDEBUG
bool finished_ = false;
#endif
std::atomic_bool exiting_ = false;
public:
// Private: Used in the implementation

View File

@@ -29,7 +29,7 @@ namespace ripple {
bool
JobQueue::Coro::shouldStop() const
{
return jq_.stopping_ || jq_.stopped_ || !jq_.accepting_;
return jq_.stopping_ || jq_.stopped_ || !jq_.accepting_ || exiting_;
}
JobQueue::JobQueue(