mirror of
https://github.com/XRPLF/rippled.git
synced 2026-02-26 16:52:32 +00:00
Compare commits
16 Commits
ximinez/nu
...
pratik/std
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
deb4613631 | ||
|
|
62aeaad291 | ||
|
|
72fc5533bc | ||
|
|
958b604511 | ||
|
|
ecc60934e4 | ||
|
|
e0ef06fc31 | ||
|
|
60886f0be4 | ||
|
|
e86479facc | ||
|
|
eb83e111af | ||
|
|
464c09efc7 | ||
|
|
897c75bc6b | ||
|
|
ca15c0efd7 | ||
|
|
bb4bc1d167 | ||
|
|
b9d14fb9e1 | ||
|
|
af30b71043 | ||
|
|
65e63ebef3 |
@@ -101,7 +101,7 @@ jobs:
|
||||
steps:
|
||||
- name: Cleanup workspace (macOS and Windows)
|
||||
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
|
||||
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
|
||||
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
|
||||
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
|
||||
2
.github/workflows/upload-conan-deps.yml
vendored
2
.github/workflows/upload-conan-deps.yml
vendored
@@ -64,7 +64,7 @@ jobs:
|
||||
steps:
|
||||
- name: Cleanup workspace (macOS and Windows)
|
||||
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
|
||||
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
|
||||
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
|
||||
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
|
||||
1407
BoostToStdCoroutineSwitchPlan.md
Normal file
1407
BoostToStdCoroutineSwitchPlan.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -71,12 +71,14 @@ words:
|
||||
- coldwallet
|
||||
- compr
|
||||
- conanfile
|
||||
- cppcoro
|
||||
- conanrun
|
||||
- confs
|
||||
- connectability
|
||||
- coro
|
||||
- coros
|
||||
- cowid
|
||||
- cppcoro
|
||||
- cryptocondition
|
||||
- cryptoconditional
|
||||
- cryptoconditions
|
||||
@@ -99,11 +101,14 @@ words:
|
||||
- endmacro
|
||||
- exceptioned
|
||||
- Falco
|
||||
- fcontext
|
||||
- finalizers
|
||||
- firewalled
|
||||
- fcontext
|
||||
- fmtdur
|
||||
- fsanitize
|
||||
- funclets
|
||||
- gantt
|
||||
- gcov
|
||||
- gcovr
|
||||
- ghead
|
||||
@@ -185,6 +190,7 @@ words:
|
||||
- ostr
|
||||
- pargs
|
||||
- partitioner
|
||||
- pratik
|
||||
- paychan
|
||||
- paychans
|
||||
- permdex
|
||||
@@ -192,6 +198,7 @@ words:
|
||||
- permissioned
|
||||
- pointee
|
||||
- populator
|
||||
- pratik
|
||||
- preauth
|
||||
- preauthorization
|
||||
- preauthorize
|
||||
@@ -206,6 +213,7 @@ words:
|
||||
- queuable
|
||||
- Raphson
|
||||
- replayer
|
||||
- repost
|
||||
- rerere
|
||||
- retriable
|
||||
- RIPD
|
||||
@@ -236,6 +244,7 @@ words:
|
||||
- soci
|
||||
- socidb
|
||||
- sslws
|
||||
- stackful
|
||||
- statsd
|
||||
- STATSDCOLLECTOR
|
||||
- stissue
|
||||
|
||||
289
include/xrpl/core/CoroTask.h
Normal file
289
include/xrpl/core/CoroTask.h
Normal file
@@ -0,0 +1,289 @@
|
||||
#pragma once
|
||||
|
||||
#include <coroutine>
|
||||
#include <exception>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
template <typename T = void>
|
||||
class CoroTask;
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// CoroTask<void> — coroutine return type for void-returning coroutines
|
||||
// --------------------------------------------------------------------------
|
||||
|
||||
template <>
|
||||
class CoroTask<void>
|
||||
{
|
||||
public:
|
||||
struct promise_type;
|
||||
using Handle = std::coroutine_handle<promise_type>;
|
||||
|
||||
struct promise_type
|
||||
{
|
||||
std::exception_ptr exception_;
|
||||
std::coroutine_handle<> continuation_;
|
||||
|
||||
CoroTask
|
||||
get_return_object()
|
||||
{
|
||||
return CoroTask{Handle::from_promise(*this)};
|
||||
}
|
||||
|
||||
std::suspend_always
|
||||
initial_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
struct FinalAwaiter
|
||||
{
|
||||
bool
|
||||
await_ready() noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
std::coroutine_handle<>
|
||||
await_suspend(Handle h) noexcept
|
||||
{
|
||||
if (auto cont = h.promise().continuation_)
|
||||
return cont;
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
FinalAwaiter
|
||||
final_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
void
|
||||
return_void()
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
unhandled_exception()
|
||||
{
|
||||
exception_ = std::current_exception();
|
||||
}
|
||||
};
|
||||
|
||||
CoroTask() = default;
|
||||
|
||||
explicit CoroTask(Handle h) : handle_(h)
|
||||
{
|
||||
}
|
||||
|
||||
~CoroTask()
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
}
|
||||
|
||||
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
|
||||
{
|
||||
}
|
||||
|
||||
CoroTask&
|
||||
operator=(CoroTask&& other) noexcept
|
||||
{
|
||||
if (this != &other)
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
handle_ = std::exchange(other.handle_, {});
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
CoroTask(CoroTask const&) = delete;
|
||||
CoroTask&
|
||||
operator=(CoroTask const&) = delete;
|
||||
|
||||
Handle
|
||||
handle() const
|
||||
{
|
||||
return handle_;
|
||||
}
|
||||
|
||||
bool
|
||||
done() const
|
||||
{
|
||||
return handle_ && handle_.done();
|
||||
}
|
||||
|
||||
// Awaiter interface — allows co_await on a CoroTask
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
std::coroutine_handle<>
|
||||
await_suspend(std::coroutine_handle<> caller) noexcept
|
||||
{
|
||||
handle_.promise().continuation_ = caller;
|
||||
return handle_; // Symmetric transfer
|
||||
}
|
||||
|
||||
void
|
||||
await_resume()
|
||||
{
|
||||
if (auto& ep = handle_.promise().exception_)
|
||||
std::rethrow_exception(ep);
|
||||
}
|
||||
|
||||
private:
|
||||
Handle handle_;
|
||||
};
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// CoroTask<T> — coroutine return type for value-returning coroutines
|
||||
// --------------------------------------------------------------------------
|
||||
|
||||
template <typename T>
|
||||
class CoroTask
|
||||
{
|
||||
public:
|
||||
struct promise_type;
|
||||
using Handle = std::coroutine_handle<promise_type>;
|
||||
|
||||
struct promise_type
|
||||
{
|
||||
std::variant<std::monostate, T, std::exception_ptr> result_;
|
||||
std::coroutine_handle<> continuation_;
|
||||
|
||||
CoroTask
|
||||
get_return_object()
|
||||
{
|
||||
return CoroTask{Handle::from_promise(*this)};
|
||||
}
|
||||
|
||||
std::suspend_always
|
||||
initial_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
struct FinalAwaiter
|
||||
{
|
||||
bool
|
||||
await_ready() noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
std::coroutine_handle<>
|
||||
await_suspend(Handle h) noexcept
|
||||
{
|
||||
if (auto cont = h.promise().continuation_)
|
||||
return cont;
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
FinalAwaiter
|
||||
final_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
void
|
||||
return_value(T value)
|
||||
{
|
||||
result_.template emplace<1>(std::move(value));
|
||||
}
|
||||
|
||||
void
|
||||
unhandled_exception()
|
||||
{
|
||||
result_.template emplace<2>(std::current_exception());
|
||||
}
|
||||
};
|
||||
|
||||
CoroTask() = default;
|
||||
|
||||
explicit CoroTask(Handle h) : handle_(h)
|
||||
{
|
||||
}
|
||||
|
||||
~CoroTask()
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
}
|
||||
|
||||
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
|
||||
{
|
||||
}
|
||||
|
||||
CoroTask&
|
||||
operator=(CoroTask&& other) noexcept
|
||||
{
|
||||
if (this != &other)
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
handle_ = std::exchange(other.handle_, {});
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
CoroTask(CoroTask const&) = delete;
|
||||
CoroTask&
|
||||
operator=(CoroTask const&) = delete;
|
||||
|
||||
Handle
|
||||
handle() const
|
||||
{
|
||||
return handle_;
|
||||
}
|
||||
|
||||
bool
|
||||
done() const
|
||||
{
|
||||
return handle_ && handle_.done();
|
||||
}
|
||||
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
std::coroutine_handle<>
|
||||
await_suspend(std::coroutine_handle<> caller) noexcept
|
||||
{
|
||||
handle_.promise().continuation_ = caller;
|
||||
return handle_;
|
||||
}
|
||||
|
||||
T
|
||||
await_resume()
|
||||
{
|
||||
auto& result = handle_.promise().result_;
|
||||
if (auto* ep = std::get_if<2>(&result))
|
||||
std::rethrow_exception(*ep);
|
||||
return std::get<1>(std::move(result));
|
||||
}
|
||||
|
||||
private:
|
||||
Handle handle_;
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
158
include/xrpl/core/CoroTaskRunner.ipp
Normal file
158
include/xrpl/core/CoroTaskRunner.ipp
Normal file
@@ -0,0 +1,158 @@
|
||||
#pragma once
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
|
||||
create_t,
|
||||
JobQueue& jq,
|
||||
JobType type,
|
||||
std::string const& name)
|
||||
: jq_(jq), type_(type), name_(name), running_(false)
|
||||
{
|
||||
}
|
||||
|
||||
template <class F>
|
||||
void
|
||||
JobQueue::CoroTaskRunner::init(F&& f)
|
||||
{
|
||||
// Store the callable on the heap so it outlives the coroutine frame.
|
||||
// Coroutine frames store a reference to the callable's implicit object
|
||||
// parameter (the lambda). If the callable is a temporary, that reference
|
||||
// dangles after the caller returns. Keeping the callable alive here
|
||||
// ensures the coroutine's captures remain valid.
|
||||
using Fn = std::decay_t<F>;
|
||||
auto store = std::make_unique<FuncStore<Fn>>(std::forward<F>(f));
|
||||
task_ = store->func(shared_from_this());
|
||||
storedFunc_ = std::move(store);
|
||||
}
|
||||
|
||||
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
XRPL_ASSERT(finished_, "xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
|
||||
#endif
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::onSuspend()
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
++jq_.nSuspend_;
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::onUndoSuspend()
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
}
|
||||
|
||||
inline auto
|
||||
JobQueue::CoroTaskRunner::suspend()
|
||||
{
|
||||
struct SuspendAwaiter
|
||||
{
|
||||
CoroTaskRunner& runner_;
|
||||
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
await_suspend(std::coroutine_handle<>) const
|
||||
{
|
||||
runner_.onSuspend();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() const noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
return SuspendAwaiter{*this};
|
||||
}
|
||||
|
||||
inline bool
|
||||
JobQueue::CoroTaskRunner::post()
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = true;
|
||||
}
|
||||
|
||||
// sp prevents 'this' from being destroyed while the job is pending
|
||||
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// The coroutine will not run. Clean up running_.
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = false;
|
||||
cv_.notify_all();
|
||||
return false;
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::resume()
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = true;
|
||||
}
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
}
|
||||
auto saved = detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(&lvs_);
|
||||
std::lock_guard lock(mutex_);
|
||||
XRPL_ASSERT(!task_.done(), "xrpl::JobQueue::CoroTaskRunner::resume : task is not done");
|
||||
task_.handle().resume();
|
||||
detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(saved);
|
||||
#ifndef NDEBUG
|
||||
if (task_.done())
|
||||
finished_ = true;
|
||||
#endif
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = false;
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
inline bool
|
||||
JobQueue::CoroTaskRunner::runnable() const
|
||||
{
|
||||
return !task_.done();
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::expectEarlyExit()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
if (!finished_)
|
||||
#endif
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
#ifndef NDEBUG
|
||||
finished_ = true;
|
||||
#endif
|
||||
}
|
||||
// Destroy the coroutine frame to break a potential shared_ptr cycle.
|
||||
// The coroutine is at initial_suspend and never ran user code, so
|
||||
// destroying it is safe. Without this, the frame holds a shared_ptr
|
||||
// back to this CoroTaskRunner, creating an unreachable reference cycle.
|
||||
task_ = {};
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::join()
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex_run_);
|
||||
cv_.wait(lk, [this]() { return running_ == false; });
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
#include <xrpl/basics/LocalValue.h>
|
||||
#include <xrpl/core/ClosureCounter.h>
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/core/JobTypeData.h>
|
||||
#include <xrpl/core/JobTypes.h>
|
||||
#include <xrpl/core/detail/Workers.h>
|
||||
@@ -9,6 +10,7 @@
|
||||
|
||||
#include <boost/coroutine/all.hpp>
|
||||
|
||||
#include <coroutine>
|
||||
#include <set>
|
||||
|
||||
namespace xrpl {
|
||||
@@ -119,6 +121,112 @@ public:
|
||||
join();
|
||||
};
|
||||
|
||||
/** C++20 coroutine lifecycle manager. Replaces Coro for new code. */
|
||||
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
|
||||
{
|
||||
private:
|
||||
detail::LocalValues lvs_;
|
||||
JobQueue& jq_;
|
||||
JobType type_;
|
||||
std::string name_;
|
||||
bool running_;
|
||||
std::mutex mutex_;
|
||||
std::mutex mutex_run_;
|
||||
std::condition_variable cv_;
|
||||
CoroTask<void> task_;
|
||||
|
||||
// Type-erased storage to keep the coroutine callable alive.
|
||||
// Coroutine frames store a reference to the callable's implicit
|
||||
// object parameter (the lambda). If the callable is a temporary,
|
||||
// that reference dangles after the call returns. Storing the
|
||||
// callable on the heap here ensures it outlives the coroutine.
|
||||
struct FuncBase
|
||||
{
|
||||
virtual ~FuncBase() = default;
|
||||
};
|
||||
template <class F>
|
||||
struct FuncStore : FuncBase
|
||||
{
|
||||
F func;
|
||||
explicit FuncStore(F&& f) : func(std::move(f))
|
||||
{
|
||||
}
|
||||
};
|
||||
std::unique_ptr<FuncBase> storedFunc_;
|
||||
|
||||
#ifndef NDEBUG
|
||||
bool finished_ = false;
|
||||
#endif
|
||||
|
||||
public:
|
||||
struct create_t
|
||||
{
|
||||
explicit create_t() = default;
|
||||
};
|
||||
|
||||
// Private: Used in the implementation of postCoroTask
|
||||
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
|
||||
|
||||
// Not copy-constructible or assignable
|
||||
CoroTaskRunner(CoroTaskRunner const&) = delete;
|
||||
CoroTaskRunner&
|
||||
operator=(CoroTaskRunner const&) = delete;
|
||||
|
||||
~CoroTaskRunner();
|
||||
|
||||
/** Initialize with a coroutine function.
|
||||
Must be called exactly once, after the object is managed by
|
||||
shared_ptr. This is handled automatically by postCoroTask().
|
||||
*/
|
||||
template <class F>
|
||||
void
|
||||
init(F&& f);
|
||||
|
||||
/** Increment the suspended coroutine count.
|
||||
Called when the coroutine is about to suspend.
|
||||
*/
|
||||
void
|
||||
onSuspend();
|
||||
|
||||
/** Decrement the suspended coroutine count without side effects.
|
||||
Used to undo onSuspend() when a suspend is cancelled.
|
||||
*/
|
||||
void
|
||||
onUndoSuspend();
|
||||
|
||||
/** Suspend coroutine execution.
|
||||
Returns an awaiter for use with co_await.
|
||||
Effects:
|
||||
Increments nSuspend_ in the JobQueue.
|
||||
The coroutine is suspended.
|
||||
The caller must later call post() or resume() to continue.
|
||||
*/
|
||||
auto
|
||||
suspend();
|
||||
|
||||
/** Schedule coroutine execution on the JobQueue.
|
||||
@return true if the job is added to the JobQueue.
|
||||
*/
|
||||
bool
|
||||
post();
|
||||
|
||||
/** Resume coroutine on current thread. */
|
||||
void
|
||||
resume();
|
||||
|
||||
/** Returns true if coroutine hasn't completed. */
|
||||
bool
|
||||
runnable() const;
|
||||
|
||||
/** Once called, allows early exit without an assert. */
|
||||
void
|
||||
expectEarlyExit();
|
||||
|
||||
/** Waits until coroutine completes. */
|
||||
void
|
||||
join();
|
||||
};
|
||||
|
||||
using JobFunction = std::function<void()>;
|
||||
|
||||
JobQueue(
|
||||
@@ -165,6 +273,19 @@ public:
|
||||
std::shared_ptr<Coro>
|
||||
postCoro(JobType t, std::string const& name, F&& f);
|
||||
|
||||
/** Creates a C++20 coroutine and adds a job to the queue to run it.
|
||||
|
||||
@param t The type of job.
|
||||
@param name Name of the job.
|
||||
@param f Callable with signature
|
||||
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
|
||||
|
||||
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
|
||||
*/
|
||||
template <class F>
|
||||
std::shared_ptr<CoroTaskRunner>
|
||||
postCoroTask(JobType t, std::string const& name, F&& f);
|
||||
|
||||
/** Jobs waiting at this priority.
|
||||
*/
|
||||
int
|
||||
@@ -379,6 +500,7 @@ private:
|
||||
} // namespace xrpl
|
||||
|
||||
#include <xrpl/core/Coro.ipp>
|
||||
#include <xrpl/core/CoroTaskRunner.ipp>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
@@ -401,4 +523,26 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
|
||||
return coro;
|
||||
}
|
||||
|
||||
template <class F>
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner>
|
||||
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
|
||||
{
|
||||
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
|
||||
runner->init(std::forward<F>(f));
|
||||
|
||||
// Account for the initial suspension (lazy start).
|
||||
// Mirrors the yield() in the Boost Coro constructor.
|
||||
{
|
||||
std::lock_guard lock(m_mutex);
|
||||
++nSuspend_;
|
||||
}
|
||||
|
||||
if (!runner->post())
|
||||
{
|
||||
runner->expectEarlyExit();
|
||||
runner.reset();
|
||||
}
|
||||
return runner;
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
56
include/xrpl/core/JobQueueAwaiter.h
Normal file
56
include/xrpl/core/JobQueueAwaiter.h
Normal file
@@ -0,0 +1,56 @@
|
||||
#pragma once
|
||||
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
|
||||
#include <coroutine>
|
||||
#include <memory>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/** Awaiter that suspends and immediately reschedules on the JobQueue.
|
||||
Equivalent to calling yield() followed by post() in the old Coro API.
|
||||
|
||||
Usage:
|
||||
co_await JobQueueAwaiter{runner};
|
||||
|
||||
What it waits for: The coroutine is re-queued as a job and resumes
|
||||
when a worker thread picks it up.
|
||||
|
||||
Which thread resumes: A JobQueue worker thread.
|
||||
|
||||
What await_resume() returns: void.
|
||||
*/
|
||||
struct JobQueueAwaiter
|
||||
{
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
|
||||
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
await_suspend(std::coroutine_handle<>)
|
||||
{
|
||||
// Increment nSuspend (equivalent to yield())
|
||||
runner->onSuspend();
|
||||
// Schedule resume on JobQueue (equivalent to post())
|
||||
if (!runner->post())
|
||||
{
|
||||
// JobQueue is stopping. Undo the suspend count and
|
||||
// don't actually suspend — the coroutine continues
|
||||
// immediately so it can clean up and co_return.
|
||||
runner->onUndoSuspend();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() const noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -8,6 +8,7 @@
|
||||
#include <xrpld/rpc/detail/Tuning.h>
|
||||
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/json/json_reader.h>
|
||||
#include <xrpl/protocol/ApiVersion.h>
|
||||
@@ -131,7 +132,6 @@ public:
|
||||
c,
|
||||
Role::USER,
|
||||
{},
|
||||
{},
|
||||
RPC::apiVersionIfUnspecified},
|
||||
{},
|
||||
{}};
|
||||
@@ -155,11 +155,11 @@ public:
|
||||
|
||||
Json::Value result;
|
||||
gate g;
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
context.params = std::move(params);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
@@ -240,28 +240,27 @@ public:
|
||||
c,
|
||||
Role::USER,
|
||||
{},
|
||||
{},
|
||||
RPC::apiVersionIfUnspecified},
|
||||
{},
|
||||
{}};
|
||||
Json::Value result;
|
||||
gate g;
|
||||
// Test RPC::Tuning::max_src_cur source currencies.
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(!result.isMember(jss::error));
|
||||
|
||||
// Test more than RPC::Tuning::max_src_cur source currencies.
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur + 1);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(result.isMember(jss::error));
|
||||
@@ -269,22 +268,22 @@ public:
|
||||
// Test RPC::Tuning::max_auto_src_cur source currencies.
|
||||
for (auto i = 0; i < (RPC::Tuning::max_auto_src_cur - 1); ++i)
|
||||
env.trust(Account("alice")[std::to_string(i + 100)](100), "bob");
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
context.params = rpf(Account("alice"), Account("bob"), 0);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(!result.isMember(jss::error));
|
||||
|
||||
// Test more than RPC::Tuning::max_auto_src_cur source currencies.
|
||||
env.trust(Account("alice")["AUD"](100), "bob");
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
context.params = rpf(Account("alice"), Account("bob"), 0);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(result.isMember(jss::error));
|
||||
|
||||
460
src/test/core/CoroTask_test.cpp
Normal file
460
src/test/core/CoroTask_test.cpp
Normal file
@@ -0,0 +1,460 @@
|
||||
#include <test/jtx.h>
|
||||
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/core/JobQueueAwaiter.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
|
||||
namespace xrpl {
|
||||
namespace test {
|
||||
|
||||
class CoroTask_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
class gate
|
||||
{
|
||||
private:
|
||||
std::condition_variable cv_;
|
||||
std::mutex mutex_;
|
||||
bool signaled_ = false;
|
||||
|
||||
public:
|
||||
template <class Rep, class Period>
|
||||
bool
|
||||
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex_);
|
||||
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
|
||||
signaled_ = false;
|
||||
return b;
|
||||
}
|
||||
|
||||
void
|
||||
signal()
|
||||
{
|
||||
std::lock_guard lk(mutex_);
|
||||
signaled_ = true;
|
||||
cv_.notify_all();
|
||||
}
|
||||
};
|
||||
|
||||
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
|
||||
// pointer-by-value captures instead of [&] to work around a GCC 14
|
||||
// bug where reference captures in coroutine lambdas are corrupted
|
||||
// in the coroutine frame.
|
||||
|
||||
// Test: CoroTask<void> runs to completion
|
||||
void
|
||||
testVoidCompletion()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("void completion");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: correct_order — suspend, join, post, complete
|
||||
// Mirrors existing Coroutine_test::correct_order
|
||||
void
|
||||
testCorrectOrder()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("correct order");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g1, g2;
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> r;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[rp = &r, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
|
||||
*rp = runner;
|
||||
g1p->signal();
|
||||
co_await runner->suspend();
|
||||
g2p->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g1.wait_for(5s));
|
||||
runner->join();
|
||||
runner->post();
|
||||
BEAST_EXPECT(g2.wait_for(5s));
|
||||
runner->join();
|
||||
}
|
||||
|
||||
// Test: incorrect_order — post before suspend
|
||||
// Mirrors existing Coroutine_test::incorrect_order
|
||||
void
|
||||
testIncorrectOrder()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("incorrect order");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto runner) -> CoroTask<void> {
|
||||
runner->post();
|
||||
co_await runner->suspend();
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
}
|
||||
|
||||
// Test: JobQueueAwaiter — suspend + auto-repost
|
||||
void
|
||||
testJobQueueAwaiter()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("JobQueueAwaiter");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int step = 0;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [sp = &step, gp = &g](auto runner) -> CoroTask<void> {
|
||||
*sp = 1;
|
||||
co_await JobQueueAwaiter{runner};
|
||||
*sp = 2;
|
||||
co_await JobQueueAwaiter{runner};
|
||||
*sp = 3;
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(step == 3);
|
||||
}
|
||||
|
||||
// Test: thread_specific_storage — per-coroutine LocalValue isolation
|
||||
// Mirrors existing Coroutine_test::thread_specific_storage
|
||||
void
|
||||
testThreadSpecificStorage()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("thread specific storage");
|
||||
Env env(*this);
|
||||
|
||||
auto& jq = env.app().getJobQueue();
|
||||
|
||||
static int const N = 4;
|
||||
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
|
||||
|
||||
LocalValue<int> lv(-1);
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
|
||||
gate g;
|
||||
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
|
||||
this->BEAST_EXPECT(*lv == -1);
|
||||
*lv = -2;
|
||||
this->BEAST_EXPECT(*lv == -2);
|
||||
g.signal();
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
jq.postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
|
||||
(*ap)[id] = runner;
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
|
||||
this->BEAST_EXPECT(**lvp == -1);
|
||||
**lvp = id;
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
a[i]->join();
|
||||
}
|
||||
for (auto const& r : a)
|
||||
{
|
||||
r->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
r->join();
|
||||
}
|
||||
for (auto const& r : a)
|
||||
{
|
||||
r->post();
|
||||
r->join();
|
||||
}
|
||||
|
||||
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
|
||||
this->BEAST_EXPECT(*lv == -2);
|
||||
g.signal();
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
}
|
||||
|
||||
// Test: exception propagation
|
||||
void
|
||||
testExceptionPropagation()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("exception propagation");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
|
||||
gp->signal();
|
||||
throw std::runtime_error("test exception");
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
// The exception is caught by promise_type::unhandled_exception()
|
||||
// and the coroutine is considered done
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: multiple sequential co_await points
|
||||
void
|
||||
testMultipleYields()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("multiple yields");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int counter = 0;
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> r;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[rp = &r, cp = &counter, gp = &g](auto runner) -> CoroTask<void> {
|
||||
*rp = runner;
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 1);
|
||||
runner->join();
|
||||
|
||||
runner->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 2);
|
||||
runner->join();
|
||||
|
||||
runner->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 3);
|
||||
runner->join();
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: CoroTask<T> returns a value via co_return
|
||||
void
|
||||
testValueReturn()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value return");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int result = 0;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
|
||||
auto inner = []() -> CoroTask<int> { co_return 42; };
|
||||
*rp = co_await inner();
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(result == 42);
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: CoroTask<T> propagates exceptions from inner coroutines
|
||||
void
|
||||
testValueException()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value exception");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
bool caught = false;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [cp = &caught, gp = &g](auto) -> CoroTask<void> {
|
||||
auto inner = []() -> CoroTask<int> {
|
||||
throw std::runtime_error("inner error");
|
||||
co_return 0;
|
||||
};
|
||||
try
|
||||
{
|
||||
co_await inner();
|
||||
}
|
||||
catch (std::runtime_error const& e)
|
||||
{
|
||||
*cp = true;
|
||||
}
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(caught);
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: CoroTask<T> chaining — nested value-returning coroutines
|
||||
void
|
||||
testValueChaining()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value chaining");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int result = 0;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
|
||||
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
|
||||
auto mul = [add](int a, int b) -> CoroTask<int> {
|
||||
int sum = co_await add(a, b);
|
||||
co_return sum * 2;
|
||||
};
|
||||
*rp = co_await mul(3, 4);
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(result == 14); // (3 + 4) * 2
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: postCoroTask returns nullptr when JobQueue is stopping
|
||||
void
|
||||
testShutdownRejection()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("shutdown rejection");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
// Stop the JobQueue
|
||||
env.app().getJobQueue().stop();
|
||||
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [](auto) -> CoroTask<void> { co_return; });
|
||||
BEAST_EXPECT(!runner);
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testVoidCompletion();
|
||||
testCorrectOrder();
|
||||
testIncorrectOrder();
|
||||
testJobQueueAwaiter();
|
||||
testThreadSpecificStorage();
|
||||
testExceptionPropagation();
|
||||
testMultipleYields();
|
||||
testValueReturn();
|
||||
testValueException();
|
||||
testValueChaining();
|
||||
testShutdownRejection();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
|
||||
|
||||
} // namespace test
|
||||
} // namespace xrpl
|
||||
@@ -40,6 +40,11 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
|
||||
// pointer-by-value captures instead of [&] to work around a GCC 14
|
||||
// bug where reference captures in coroutine lambdas are corrupted
|
||||
// in the coroutine frame.
|
||||
|
||||
void
|
||||
correct_order()
|
||||
{
|
||||
@@ -54,13 +59,15 @@ public:
|
||||
}));
|
||||
|
||||
gate g1, g2;
|
||||
std::shared_ptr<JobQueue::Coro> c;
|
||||
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& cr) {
|
||||
c = cr;
|
||||
g1.signal();
|
||||
c->yield();
|
||||
g2.signal();
|
||||
});
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> c;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTest", [cp = &c, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
|
||||
*cp = runner;
|
||||
g1p->signal();
|
||||
co_await runner->suspend();
|
||||
g2p->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g1.wait_for(5s));
|
||||
c->join();
|
||||
c->post();
|
||||
@@ -81,11 +88,17 @@ public:
|
||||
}));
|
||||
|
||||
gate g;
|
||||
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& c) {
|
||||
c->post();
|
||||
c->yield();
|
||||
g.signal();
|
||||
});
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTest", [gp = &g](auto runner) -> CoroTask<void> {
|
||||
// Schedule a resume before suspending. The posted job
|
||||
// cannot actually call resume() until the current resume()
|
||||
// releases CoroTaskRunner::mutex_, which only happens after
|
||||
// the coroutine suspends at co_await.
|
||||
runner->post();
|
||||
co_await runner->suspend();
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
}
|
||||
|
||||
@@ -101,7 +114,7 @@ public:
|
||||
auto& jq = env.app().getJobQueue();
|
||||
|
||||
static int const N = 4;
|
||||
std::array<std::shared_ptr<JobQueue::Coro>, N> a;
|
||||
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
|
||||
|
||||
LocalValue<int> lv(-1);
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
@@ -118,19 +131,23 @@ public:
|
||||
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
jq.postCoro(jtCLIENT, "CoroTest", [&, id = i](auto const& c) {
|
||||
a[id] = c;
|
||||
g.signal();
|
||||
c->yield();
|
||||
jq.postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTest",
|
||||
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
|
||||
(*ap)[id] = runner;
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
|
||||
this->BEAST_EXPECT(*lv == -1);
|
||||
*lv = id;
|
||||
this->BEAST_EXPECT(*lv == id);
|
||||
g.signal();
|
||||
c->yield();
|
||||
this->BEAST_EXPECT(**lvp == -1);
|
||||
**lvp = id;
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
|
||||
this->BEAST_EXPECT(*lv == id);
|
||||
});
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
a[i]->join();
|
||||
}
|
||||
|
||||
@@ -43,87 +43,91 @@ class JobQueue_test : public beast::unit_test::suite
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
|
||||
// pointer-by-value captures instead of [&] to work around a GCC 14
|
||||
// bug where reference captures in coroutine lambdas are corrupted
|
||||
// in the coroutine frame.
|
||||
|
||||
void
|
||||
testPostCoro()
|
||||
testPostCoroTask()
|
||||
{
|
||||
jtx::Env env{*this};
|
||||
|
||||
JobQueue& jQueue = env.app().getJobQueue();
|
||||
{
|
||||
// Test repeated post()s until the Coro completes.
|
||||
// Test repeated post()s until the coroutine completes.
|
||||
std::atomic<int> yieldCount{0};
|
||||
auto const coro = jQueue.postCoro(
|
||||
jtCLIENT,
|
||||
"PostCoroTest1",
|
||||
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
|
||||
while (++yieldCount < 4)
|
||||
coroCopy->yield();
|
||||
auto const runner = jQueue.postCoroTask(
|
||||
jtCLIENT, "PostCoroTest1", [ycp = &yieldCount](auto runner) -> CoroTask<void> {
|
||||
while (++(*ycp) < 4)
|
||||
co_await runner->suspend();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(coro != nullptr);
|
||||
BEAST_EXPECT(runner != nullptr);
|
||||
|
||||
// Wait for the Job to run and yield.
|
||||
while (yieldCount == 0)
|
||||
;
|
||||
|
||||
// Now re-post until the Coro says it is done.
|
||||
// Now re-post until the CoroTaskRunner says it is done.
|
||||
int old = yieldCount;
|
||||
while (coro->runnable())
|
||||
while (runner->runnable())
|
||||
{
|
||||
BEAST_EXPECT(coro->post());
|
||||
BEAST_EXPECT(runner->post());
|
||||
while (old == yieldCount)
|
||||
{
|
||||
}
|
||||
coro->join();
|
||||
runner->join();
|
||||
BEAST_EXPECT(++old == yieldCount);
|
||||
}
|
||||
BEAST_EXPECT(yieldCount == 4);
|
||||
}
|
||||
{
|
||||
// Test repeated resume()s until the Coro completes.
|
||||
// Test repeated resume()s until the coroutine completes.
|
||||
int yieldCount{0};
|
||||
auto const coro = jQueue.postCoro(
|
||||
jtCLIENT,
|
||||
"PostCoroTest2",
|
||||
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
|
||||
while (++yieldCount < 4)
|
||||
coroCopy->yield();
|
||||
auto const runner = jQueue.postCoroTask(
|
||||
jtCLIENT, "PostCoroTest2", [ycp = &yieldCount](auto runner) -> CoroTask<void> {
|
||||
while (++(*ycp) < 4)
|
||||
co_await runner->suspend();
|
||||
co_return;
|
||||
});
|
||||
if (!coro)
|
||||
if (!runner)
|
||||
{
|
||||
// There's no good reason we should not get a Coro, but we
|
||||
// There's no good reason we should not get a runner, but we
|
||||
// can't continue without one.
|
||||
BEAST_EXPECT(false);
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for the Job to run and yield.
|
||||
coro->join();
|
||||
runner->join();
|
||||
|
||||
// Now resume until the Coro says it is done.
|
||||
// Now resume until the CoroTaskRunner says it is done.
|
||||
int old = yieldCount;
|
||||
while (coro->runnable())
|
||||
while (runner->runnable())
|
||||
{
|
||||
coro->resume(); // Resume runs synchronously on this thread.
|
||||
runner->resume(); // Resume runs synchronously on this thread.
|
||||
BEAST_EXPECT(++old == yieldCount);
|
||||
}
|
||||
BEAST_EXPECT(yieldCount == 4);
|
||||
}
|
||||
{
|
||||
// If the JobQueue is stopped, we should no
|
||||
// longer be able to add a Coro (and calling postCoro() should
|
||||
// return false).
|
||||
// longer be able to post a coroutine (and calling postCoroTask()
|
||||
// should return nullptr).
|
||||
using namespace std::chrono_literals;
|
||||
jQueue.stop();
|
||||
|
||||
// The Coro should never run, so having the Coro access this
|
||||
// The coroutine should never run, so having it access this
|
||||
// unprotected variable on the stack should be completely safe.
|
||||
// Not recommended for the faint of heart...
|
||||
bool unprotected;
|
||||
auto const coro = jQueue.postCoro(
|
||||
jtCLIENT, "PostCoroTest3", [&unprotected](std::shared_ptr<JobQueue::Coro> const&) {
|
||||
unprotected = false;
|
||||
auto const runner = jQueue.postCoroTask(
|
||||
jtCLIENT, "PostCoroTest3", [up = &unprotected](auto) -> CoroTask<void> {
|
||||
*up = false;
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(coro == nullptr);
|
||||
BEAST_EXPECT(runner == nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,7 +136,7 @@ public:
|
||||
run() override
|
||||
{
|
||||
testAddJob();
|
||||
testPostCoro();
|
||||
testPostCoroTask();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
#include <xrpld/rpc/RPCHandler.h>
|
||||
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/protocol/ApiVersion.h>
|
||||
#include <xrpl/protocol/STParsedJSON.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
@@ -193,7 +194,6 @@ AMMTest::find_paths_request(
|
||||
c,
|
||||
Role::USER,
|
||||
{},
|
||||
{},
|
||||
RPC::apiVersionIfUnspecified},
|
||||
{},
|
||||
{}};
|
||||
@@ -215,11 +215,11 @@ AMMTest::find_paths_request(
|
||||
|
||||
Json::Value result;
|
||||
gate g;
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
context.params = std::move(params);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -1425,7 +1425,6 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
|
||||
c,
|
||||
Role::ADMIN,
|
||||
{},
|
||||
{},
|
||||
RPC::apiMaximumSupportedVersion},
|
||||
jvCommand};
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
#include <xrpl/beast/core/CurrentThreadName.h>
|
||||
#include <xrpl/beast/net/IPAddressConversion.h>
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
|
||||
namespace xrpl {
|
||||
@@ -99,13 +100,14 @@ GRPCServerImpl::CallData<Request, Response>::process()
|
||||
// ensures that finished is always true when this CallData object
|
||||
// is returned as a tag in handleRpcs(), after sending the response
|
||||
finished_ = true;
|
||||
auto coro = app_.getJobQueue().postCoro(
|
||||
JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
|
||||
thisShared->process(coro);
|
||||
auto runner = app_.getJobQueue().postCoroTask(
|
||||
JobType::jtRPC, "gRPC-Client", [thisShared](auto) -> CoroTask<void> {
|
||||
thisShared->processRequest();
|
||||
co_return;
|
||||
});
|
||||
|
||||
// If coro is null, then the JobQueue has already been shutdown
|
||||
if (!coro)
|
||||
// If runner is null, then the JobQueue has already been shutdown
|
||||
if (!runner)
|
||||
{
|
||||
grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
|
||||
responder_.FinishWithError(status, this);
|
||||
@@ -114,7 +116,7 @@ GRPCServerImpl::CallData<Request, Response>::process()
|
||||
|
||||
template <class Request, class Response>
|
||||
void
|
||||
GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::Coro> coro)
|
||||
GRPCServerImpl::CallData<Request, Response>::processRequest()
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -156,7 +158,6 @@ GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::C
|
||||
app_.getLedgerMaster(),
|
||||
usage,
|
||||
role,
|
||||
coro,
|
||||
InfoSub::pointer(),
|
||||
apiVersion},
|
||||
request_};
|
||||
|
||||
@@ -208,7 +208,7 @@ private:
|
||||
private:
|
||||
// process the request. Called inside the coroutine passed to JobQueue
|
||||
void
|
||||
process(std::shared_ptr<JobQueue::Coro> coro);
|
||||
processRequest();
|
||||
|
||||
// return load type of this RPC
|
||||
Resource::Charge
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
#include <xrpld/rpc/Role.h>
|
||||
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/server/InfoSub.h>
|
||||
|
||||
namespace xrpl {
|
||||
@@ -24,7 +23,6 @@ struct Context
|
||||
LedgerMaster& ledgerMaster;
|
||||
Resource::Consumer& consumer;
|
||||
Role role;
|
||||
std::shared_ptr<JobQueue::Coro> coro{};
|
||||
InfoSub::pointer infoSub{};
|
||||
unsigned int apiVersion;
|
||||
};
|
||||
|
||||
@@ -169,13 +169,10 @@ public:
|
||||
|
||||
private:
|
||||
Json::Value
|
||||
processSession(
|
||||
std::shared_ptr<WSSession> const& session,
|
||||
std::shared_ptr<JobQueue::Coro> const& coro,
|
||||
Json::Value const& jv);
|
||||
processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv);
|
||||
|
||||
void
|
||||
processSession(std::shared_ptr<Session> const&, std::shared_ptr<JobQueue::Coro> coro);
|
||||
processSession(std::shared_ptr<Session> const&);
|
||||
|
||||
void
|
||||
processRequest(
|
||||
@@ -183,7 +180,6 @@ private:
|
||||
std::string const& request,
|
||||
beast::IP::Endpoint const& remoteIPAddress,
|
||||
Output&&,
|
||||
std::shared_ptr<JobQueue::Coro> coro,
|
||||
std::string_view forwardedFor,
|
||||
std::string_view user);
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <xrpl/basics/make_SSLContext.h>
|
||||
#include <xrpl/beast/net/IPAddressConversion.h>
|
||||
#include <xrpl/beast/rfc2616.h>
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/json/json_reader.h>
|
||||
#include <xrpl/json/to_string.h>
|
||||
@@ -284,9 +285,10 @@ ServerHandler::onRequest(Session& session)
|
||||
}
|
||||
|
||||
std::shared_ptr<Session> detachedSession = session.detach();
|
||||
auto const postResult = m_jobQueue.postCoro(
|
||||
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
|
||||
processSession(detachedSession, coro);
|
||||
auto const postResult = m_jobQueue.postCoroTask(
|
||||
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](auto) -> CoroTask<void> {
|
||||
processSession(detachedSession);
|
||||
co_return;
|
||||
});
|
||||
if (postResult == nullptr)
|
||||
{
|
||||
@@ -322,17 +324,18 @@ ServerHandler::onWSMessage(
|
||||
|
||||
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
|
||||
|
||||
auto const postResult = m_jobQueue.postCoro(
|
||||
auto const postResult = m_jobQueue.postCoroTask(
|
||||
jtCLIENT_WEBSOCKET,
|
||||
"WS-Client",
|
||||
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
|
||||
auto const jr = this->processSession(session, coro, jv);
|
||||
[this, session, jv = std::move(jv)](auto) -> CoroTask<void> {
|
||||
auto const jr = this->processSession(session, jv);
|
||||
auto const s = to_string(jr);
|
||||
auto const n = s.length();
|
||||
boost::beast::multi_buffer sb(n);
|
||||
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
|
||||
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
|
||||
session->complete();
|
||||
co_return;
|
||||
});
|
||||
if (postResult == nullptr)
|
||||
{
|
||||
@@ -373,10 +376,7 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ
|
||||
}
|
||||
|
||||
Json::Value
|
||||
ServerHandler::processSession(
|
||||
std::shared_ptr<WSSession> const& session,
|
||||
std::shared_ptr<JobQueue::Coro> const& coro,
|
||||
Json::Value const& jv)
|
||||
ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv)
|
||||
{
|
||||
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
|
||||
if (is->getConsumer().disconnect(m_journal))
|
||||
@@ -443,7 +443,6 @@ ServerHandler::processSession(
|
||||
app_.getLedgerMaster(),
|
||||
is->getConsumer(),
|
||||
role,
|
||||
coro,
|
||||
is,
|
||||
apiVersion},
|
||||
jv,
|
||||
@@ -514,18 +513,14 @@ ServerHandler::processSession(
|
||||
return jr;
|
||||
}
|
||||
|
||||
// Run as a coroutine.
|
||||
void
|
||||
ServerHandler::processSession(
|
||||
std::shared_ptr<Session> const& session,
|
||||
std::shared_ptr<JobQueue::Coro> coro)
|
||||
ServerHandler::processSession(std::shared_ptr<Session> const& session)
|
||||
{
|
||||
processRequest(
|
||||
session->port(),
|
||||
buffers_to_string(session->request().body().data()),
|
||||
session->remoteAddress().at_port(0),
|
||||
makeOutput(*session),
|
||||
coro,
|
||||
forwardedFor(session->request()),
|
||||
[&] {
|
||||
auto const iter = session->request().find("X-User");
|
||||
@@ -562,7 +557,6 @@ ServerHandler::processRequest(
|
||||
std::string const& request,
|
||||
beast::IP::Endpoint const& remoteIPAddress,
|
||||
Output&& output,
|
||||
std::shared_ptr<JobQueue::Coro> coro,
|
||||
std::string_view forwardedFor,
|
||||
std::string_view user)
|
||||
{
|
||||
@@ -819,7 +813,6 @@ ServerHandler::processRequest(
|
||||
app_.getLedgerMaster(),
|
||||
usage,
|
||||
role,
|
||||
coro,
|
||||
InfoSub::pointer(),
|
||||
apiVersion},
|
||||
params,
|
||||
|
||||
@@ -7,6 +7,9 @@
|
||||
#include <xrpl/protocol/RPCErr.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
// This interface is deprecated.
|
||||
@@ -37,98 +40,31 @@ doRipplePathFind(RPC::JsonContext& context)
|
||||
PathRequest::pointer request;
|
||||
lpLedger = context.ledgerMaster.getClosedLedger();
|
||||
|
||||
// It doesn't look like there's much odd happening here, but you should
|
||||
// be aware this code runs in a JobQueue::Coro, which is a coroutine.
|
||||
// And we may be flipping around between threads. Here's an overview:
|
||||
//
|
||||
// 1. We're running doRipplePathFind() due to a call to
|
||||
// ripple_path_find. doRipplePathFind() is currently running
|
||||
// inside of a JobQueue::Coro using a JobQueue thread.
|
||||
//
|
||||
// 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the
|
||||
// path-finding request. That request will (probably) run at some
|
||||
// indeterminate future time on a (probably different) JobQueue
|
||||
// thread.
|
||||
//
|
||||
// 3. As a continuation from that path-finding JobQueue thread, the
|
||||
// coroutine we're currently running in (!) is posted to the
|
||||
// JobQueue. Because it is a continuation, that post won't
|
||||
// happen until the path-finding request completes.
|
||||
//
|
||||
// 4. Once the continuation is enqueued, and we have reason to think
|
||||
// the path-finding job is likely to run, then the coroutine we're
|
||||
// running in yield()s. That means it surrenders its thread in
|
||||
// the JobQueue. The coroutine is suspended, but ready to run,
|
||||
// because it is kept resident by a shared_ptr in the
|
||||
// path-finding continuation.
|
||||
//
|
||||
// 5. If all goes well then path-finding runs on a JobQueue thread
|
||||
// and executes its continuation. The continuation posts this
|
||||
// same coroutine (!) to the JobQueue.
|
||||
//
|
||||
// 6. When the JobQueue calls this coroutine, this coroutine resumes
|
||||
// from the line below the coro->yield() and returns the
|
||||
// path-finding result.
|
||||
//
|
||||
// With so many moving parts, what could go wrong?
|
||||
//
|
||||
// Just in terms of the JobQueue refusing to add jobs at shutdown
|
||||
// there are two specific things that can go wrong.
|
||||
//
|
||||
// 1. The path-finding Job queued by makeLegacyPathRequest() might be
|
||||
// rejected (because we're shutting down).
|
||||
//
|
||||
// Fortunately this problem can be addressed by looking at the
|
||||
// return value of makeLegacyPathRequest(). If
|
||||
// makeLegacyPathRequest() cannot get a thread to run the path-find
|
||||
// on, then it returns an empty request.
|
||||
//
|
||||
// 2. The path-finding job might run, but the Coro::post() might be
|
||||
// rejected by the JobQueue (because we're shutting down).
|
||||
//
|
||||
// We handle this case by resuming (not posting) the Coro.
|
||||
// By resuming the Coro, we allow the Coro to run to completion
|
||||
// on the current thread instead of requiring that it run on a
|
||||
// new thread from the JobQueue.
|
||||
//
|
||||
// Both of these failure modes are hard to recreate in a unit test
|
||||
// because they are so dependent on inter-thread timing. However
|
||||
// the failure modes can be observed by synchronously (inside the
|
||||
// rippled source code) shutting down the application. The code to
|
||||
// do so looks like this:
|
||||
//
|
||||
// context.app.signalStop();
|
||||
// while (! context.app.getJobQueue().jobCounter().joined()) { }
|
||||
//
|
||||
// The first line starts the process of shutting down the app.
|
||||
// The second line waits until no more jobs can be added to the
|
||||
// JobQueue before letting the thread continue.
|
||||
//
|
||||
// May 2017
|
||||
// makeLegacyPathRequest enqueues a path-finding job that runs
|
||||
// asynchronously. We block this thread with a condition_variable
|
||||
// until the path-finding continuation signals completion.
|
||||
// If makeLegacyPathRequest cannot schedule the job (e.g. during
|
||||
// shutdown), it returns an empty request and we skip the wait.
|
||||
std::mutex mtx;
|
||||
std::condition_variable cv;
|
||||
bool pathDone = false;
|
||||
|
||||
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
|
||||
request,
|
||||
[&context]() {
|
||||
// Copying the shared_ptr keeps the coroutine alive up
|
||||
// through the return. Otherwise the storage under the
|
||||
// captured reference could evaporate when we return from
|
||||
// coroCopy->resume(). This is not strictly necessary, but
|
||||
// will make maintenance easier.
|
||||
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
|
||||
if (!coroCopy->post())
|
||||
[&]() {
|
||||
{
|
||||
// The post() failed, so we won't get a thread to let
|
||||
// the Coro finish. We'll call Coro::resume() so the
|
||||
// Coro can finish on our thread. Otherwise the
|
||||
// application will hang on shutdown.
|
||||
coroCopy->resume();
|
||||
std::lock_guard lk(mtx);
|
||||
pathDone = true;
|
||||
}
|
||||
cv.notify_one();
|
||||
},
|
||||
context.consumer,
|
||||
lpLedger,
|
||||
context.params);
|
||||
if (request)
|
||||
{
|
||||
context.coro->yield();
|
||||
std::unique_lock lk(mtx);
|
||||
cv.wait(lk, [&] { return pathDone; });
|
||||
jvResult = request->doStatus(context.params);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user