diff --git a/src/test/core/Coroutine_test.cpp b/src/test/core/Coroutine_test.cpp index 8cd746bbff..bd229f9e7f 100644 --- a/src/test/core/Coroutine_test.cpp +++ b/src/test/core/Coroutine_test.cpp @@ -212,6 +212,30 @@ public: BEAST_EXPECT(shouldStop.has_value() && *shouldStop == true); } + void + coroutineGetsDestroyedBeforeExecuting() + { + using namespace std::chrono_literals; + using namespace jtx; + + testcase("Coroutine gets destroyed before executing"); + + Env env(*this, envconfig([](std::unique_ptr cfg) { + cfg->FORCE_MULTI_THREAD = true; + return cfg; + })); + + { + auto coro = std::make_shared( + Coro_create_t{}, env.app().getJobQueue(), JobType::jtCLIENT, "test", [](auto coro) { + + }); + } + + pass(); + + } + void run() override { @@ -219,6 +243,7 @@ public: incorrect_order(); thread_specific_storage(); stopJobQueueWhenCoroutineSuspended(); + coroutineGetsDestroyedBeforeExecuting(); } }; diff --git a/src/xrpld/core/Coro.ipp b/src/xrpld/core/Coro.ipp index 3c547a8e93..457c662a20 100644 --- a/src/xrpld/core/Coro.ipp +++ b/src/xrpld/core/Coro.ipp @@ -34,20 +34,21 @@ JobQueue::Coro::Coro( : jq_(jq) , type_(type) , name_(name) - , running_(false) , coro_( [this, fn = std::forward(f)]( boost::coroutines::asymmetric_coroutine::push_type& do_yield) { yield_ = &do_yield; yield(); + // self makes Coro alive until this function returns + std::shared_ptr self; if (!shouldStop()) { - fn(shared_from_this()); + self = shared_from_this(); + fn(self); } -#ifndef NDEBUG - finished_ = true; -#endif + state_ = CoroState::Finished; + cv_.notify_all(); }, boost::coroutines::attributes(megabytes(1))) { @@ -55,8 +56,16 @@ JobQueue::Coro::Coro( inline JobQueue::Coro::~Coro() { + XRPL_ASSERT(state_ != CoroState::Running, "ripple::JobQueue::Coro::~Coro : is not running"); + exiting_ = true; + // Resume the coroutine so that it has a chance to clean things up + if (state_ == CoroState::Suspended) + { + resume(); + } + #ifndef NDEBUG - XRPL_ASSERT(finished_, "ripple::JobQueue::Coro::~Coro : is finished"); + XRPL_ASSERT(state_ == CoroState::Finished, "ripple::JobQueue::Coro::~Coro : is finished"); #endif } @@ -69,6 +78,7 @@ JobQueue::Coro::yield() { return; } + state_ = CoroState::Suspended; ++jq_.nSuspend_; jq_.m_suspendedCoros[this] = weak_from_this(); jq_.cv_.notify_all(); @@ -79,11 +89,6 @@ JobQueue::Coro::yield() inline bool JobQueue::Coro::post() { - { - std::lock_guard lk(mutex_run_); - running_ = true; - } - // sp keeps 'this' alive if (jq_.addJob( type_, name_, [this, sp = shared_from_this()]() { resume(); })) @@ -91,9 +96,6 @@ JobQueue::Coro::post() return true; } - // The coroutine will not run. Clean up running_. - std::lock_guard lk(mutex_run_); - running_ = false; cv_.notify_all(); return false; } @@ -103,12 +105,16 @@ JobQueue::Coro::resume() { { std::lock_guard lk(mutex_run_); - running_ = true; + if (state_ != CoroState::Suspended) + { + return; + } + state_ = CoroState::Running; } { std::lock_guard lock(jq_.m_mutex); - --jq_.nSuspend_; jq_.m_suspendedCoros.erase(this); + --jq_.nSuspend_; jq_.cv_.notify_all(); } auto saved = detail::getLocalValues().release(); @@ -120,9 +126,6 @@ JobQueue::Coro::resume() coro_(); detail::getLocalValues().release(); detail::getLocalValues().reset(saved); - std::lock_guard lk(mutex_run_); - running_ = false; - cv_.notify_all(); } inline bool @@ -135,7 +138,7 @@ inline void JobQueue::Coro::join() { std::unique_lock lk(mutex_run_); - cv_.wait(lk, [this]() { return running_ == false; }); + cv_.wait(lk, [this]() { return state_ != CoroState::Running; }); } } // namespace ripple diff --git a/src/xrpld/core/JobQueue.h b/src/xrpld/core/JobQueue.h index 9fcf2519b5..d0259ebaa5 100644 --- a/src/xrpld/core/JobQueue.h +++ b/src/xrpld/core/JobQueue.h @@ -63,19 +63,26 @@ public: friend class JobQueue; private: + enum class CoroState + { + None, + Suspended, + Running, + Finished + }; + detail::LocalValues lvs_; JobQueue& jq_; JobType type_; std::string name_; - bool running_; + std::atomic state_ = CoroState::None; std::mutex mutex_; std::mutex mutex_run_; std::condition_variable cv_; boost::coroutines::asymmetric_coroutine::pull_type coro_; boost::coroutines::asymmetric_coroutine::push_type* yield_; -#ifndef NDEBUG - bool finished_ = false; -#endif + + std::atomic_bool exiting_ = false; public: // Private: Used in the implementation diff --git a/src/xrpld/core/detail/JobQueue.cpp b/src/xrpld/core/detail/JobQueue.cpp index a912fa1ee9..68800fee48 100644 --- a/src/xrpld/core/detail/JobQueue.cpp +++ b/src/xrpld/core/detail/JobQueue.cpp @@ -29,7 +29,7 @@ namespace ripple { bool JobQueue::Coro::shouldStop() const { - return jq_.stopping_ || jq_.stopped_ || !jq_.accepting_; + return jq_.stopping_ || jq_.stopped_ || !jq_.accepting_ || exiting_; } JobQueue::JobQueue(