#pragma once namespace xrpl { template JobQueue::Coro::Coro(Coro_create_t, JobQueue& jq, JobType type, std::string const& name, F&& f) : jq_(jq) , type_(type) , name_(name) , coro_( // Stack size of 1MB wasn't sufficient for deep calls. ASAN tests flagged the issue. Hence // increasing the size to 1.5MB. boost::context::protected_fixedsize_stack(1536 * 1024), [this, fn = std::forward(f)]( boost::coroutines2::asymmetric_coroutine::push_type& do_yield) { yield_ = &do_yield; yield(); fn(shared_from_this()); #ifndef NDEBUG finished_ = true; #endif }) { } inline JobQueue::Coro::~Coro() { #ifndef NDEBUG XRPL_ASSERT(finished_, "xrpl::JobQueue::Coro::~Coro : is finished"); #endif } inline void JobQueue::Coro::yield() const { { std::lock_guard lock(jq_.m_mutex); ++jq_.nSuspend_; } (*yield_)(); } inline bool JobQueue::Coro::post() { { std::lock_guard lk(mutex_run_); running_ = true; } // sp keeps 'this' alive if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); })) { return true; } // The coroutine will not run. Clean up running_. std::lock_guard lk(mutex_run_); running_ = false; cv_.notify_all(); return false; } inline void JobQueue::Coro::resume() { { std::lock_guard lk(mutex_run_); running_ = true; } { std::lock_guard lk(jq_.m_mutex); --jq_.nSuspend_; } auto saved = detail::getLocalValues().release(); detail::getLocalValues().reset(&lvs_); std::lock_guard lock(mutex_); // A late resume() can arrive after the coroutine has already completed. // This is an expected (if rare) outcome of the race condition documented // in JobQueue.h:354-377 where post() schedules a resume job before the // coroutine yields — the mutex serializes access, but by the time this // resume() acquires the lock the coroutine may have already run to // completion. Calling operator() on a completed boost::coroutine2 is // undefined behavior, so we must check and skip invoking the coroutine // body if it has already completed. if (coro_) { coro_(); } detail::getLocalValues().release(); detail::getLocalValues().reset(saved); std::lock_guard lk(mutex_run_); running_ = false; cv_.notify_all(); } inline bool JobQueue::Coro::runnable() const { return static_cast(coro_); } inline void JobQueue::Coro::expectEarlyExit() { #ifndef NDEBUG if (!finished_) #endif { // expectEarlyExit() must only ever be called from outside the // Coro's stack. It you're inside the stack you can simply return // and be done. // // That said, since we're outside the Coro's stack, we need to // decrement the nSuspend that the Coro's call to yield caused. std::lock_guard lock(jq_.m_mutex); --jq_.nSuspend_; #ifndef NDEBUG finished_ = true; #endif } } inline void JobQueue::Coro::join() { std::unique_lock lk(mutex_run_); cv_.wait(lk, [this]() { return running_ == false; }); } } // namespace xrpl