mirror of
https://github.com/XRPLF/rippled.git
synced 2026-02-26 16:52:32 +00:00
Compare commits
7 Commits
develop
...
pratik/std
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb83e111af | ||
|
|
464c09efc7 | ||
|
|
897c75bc6b | ||
|
|
ca15c0efd7 | ||
|
|
bb4bc1d167 | ||
|
|
b9d14fb9e1 | ||
|
|
af30b71043 |
1407
BoostToStdCoroutineSwitchPlan.md
Normal file
1407
BoostToStdCoroutineSwitchPlan.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -71,6 +71,7 @@ words:
|
||||
- coldwallet
|
||||
- compr
|
||||
- conanfile
|
||||
- cppcoro
|
||||
- conanrun
|
||||
- confs
|
||||
- connectability
|
||||
@@ -101,9 +102,11 @@ words:
|
||||
- Falco
|
||||
- finalizers
|
||||
- firewalled
|
||||
- fcontext
|
||||
- fmtdur
|
||||
- fsanitize
|
||||
- funclets
|
||||
- gantt
|
||||
- gcov
|
||||
- gcovr
|
||||
- ghead
|
||||
@@ -185,6 +188,7 @@ words:
|
||||
- ostr
|
||||
- pargs
|
||||
- partitioner
|
||||
- pratik
|
||||
- paychan
|
||||
- paychans
|
||||
- permdex
|
||||
@@ -206,6 +210,7 @@ words:
|
||||
- queuable
|
||||
- Raphson
|
||||
- replayer
|
||||
- repost
|
||||
- rerere
|
||||
- retriable
|
||||
- RIPD
|
||||
@@ -236,6 +241,7 @@ words:
|
||||
- soci
|
||||
- socidb
|
||||
- sslws
|
||||
- stackful
|
||||
- statsd
|
||||
- STATSDCOLLECTOR
|
||||
- stissue
|
||||
|
||||
289
include/xrpl/core/CoroTask.h
Normal file
289
include/xrpl/core/CoroTask.h
Normal file
@@ -0,0 +1,289 @@
|
||||
#pragma once
|
||||
|
||||
#include <coroutine>
|
||||
#include <exception>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
template <typename T = void>
|
||||
class CoroTask;
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// CoroTask<void> — coroutine return type for void-returning coroutines
|
||||
// --------------------------------------------------------------------------
|
||||
|
||||
template <>
|
||||
class CoroTask<void>
|
||||
{
|
||||
public:
|
||||
struct promise_type;
|
||||
using Handle = std::coroutine_handle<promise_type>;
|
||||
|
||||
struct promise_type
|
||||
{
|
||||
std::exception_ptr exception_;
|
||||
std::coroutine_handle<> continuation_;
|
||||
|
||||
CoroTask
|
||||
get_return_object()
|
||||
{
|
||||
return CoroTask{Handle::from_promise(*this)};
|
||||
}
|
||||
|
||||
std::suspend_always
|
||||
initial_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
struct FinalAwaiter
|
||||
{
|
||||
bool
|
||||
await_ready() noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
std::coroutine_handle<>
|
||||
await_suspend(Handle h) noexcept
|
||||
{
|
||||
if (auto cont = h.promise().continuation_)
|
||||
return cont;
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
FinalAwaiter
|
||||
final_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
void
|
||||
return_void()
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
unhandled_exception()
|
||||
{
|
||||
exception_ = std::current_exception();
|
||||
}
|
||||
};
|
||||
|
||||
CoroTask() = default;
|
||||
|
||||
explicit CoroTask(Handle h) : handle_(h)
|
||||
{
|
||||
}
|
||||
|
||||
~CoroTask()
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
}
|
||||
|
||||
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
|
||||
{
|
||||
}
|
||||
|
||||
CoroTask&
|
||||
operator=(CoroTask&& other) noexcept
|
||||
{
|
||||
if (this != &other)
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
handle_ = std::exchange(other.handle_, {});
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
CoroTask(CoroTask const&) = delete;
|
||||
CoroTask&
|
||||
operator=(CoroTask const&) = delete;
|
||||
|
||||
Handle
|
||||
handle() const
|
||||
{
|
||||
return handle_;
|
||||
}
|
||||
|
||||
bool
|
||||
done() const
|
||||
{
|
||||
return handle_ && handle_.done();
|
||||
}
|
||||
|
||||
// Awaiter interface — allows co_await on a CoroTask
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
std::coroutine_handle<>
|
||||
await_suspend(std::coroutine_handle<> caller) noexcept
|
||||
{
|
||||
handle_.promise().continuation_ = caller;
|
||||
return handle_; // Symmetric transfer
|
||||
}
|
||||
|
||||
void
|
||||
await_resume()
|
||||
{
|
||||
if (auto& ep = handle_.promise().exception_)
|
||||
std::rethrow_exception(ep);
|
||||
}
|
||||
|
||||
private:
|
||||
Handle handle_;
|
||||
};
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// CoroTask<T> — coroutine return type for value-returning coroutines
|
||||
// --------------------------------------------------------------------------
|
||||
|
||||
template <typename T>
|
||||
class CoroTask
|
||||
{
|
||||
public:
|
||||
struct promise_type;
|
||||
using Handle = std::coroutine_handle<promise_type>;
|
||||
|
||||
struct promise_type
|
||||
{
|
||||
std::variant<std::monostate, T, std::exception_ptr> result_;
|
||||
std::coroutine_handle<> continuation_;
|
||||
|
||||
CoroTask
|
||||
get_return_object()
|
||||
{
|
||||
return CoroTask{Handle::from_promise(*this)};
|
||||
}
|
||||
|
||||
std::suspend_always
|
||||
initial_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
struct FinalAwaiter
|
||||
{
|
||||
bool
|
||||
await_ready() noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
std::coroutine_handle<>
|
||||
await_suspend(Handle h) noexcept
|
||||
{
|
||||
if (auto cont = h.promise().continuation_)
|
||||
return cont;
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
FinalAwaiter
|
||||
final_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
void
|
||||
return_value(T value)
|
||||
{
|
||||
result_.template emplace<1>(std::move(value));
|
||||
}
|
||||
|
||||
void
|
||||
unhandled_exception()
|
||||
{
|
||||
result_.template emplace<2>(std::current_exception());
|
||||
}
|
||||
};
|
||||
|
||||
CoroTask() = default;
|
||||
|
||||
explicit CoroTask(Handle h) : handle_(h)
|
||||
{
|
||||
}
|
||||
|
||||
~CoroTask()
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
}
|
||||
|
||||
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
|
||||
{
|
||||
}
|
||||
|
||||
CoroTask&
|
||||
operator=(CoroTask&& other) noexcept
|
||||
{
|
||||
if (this != &other)
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
handle_ = std::exchange(other.handle_, {});
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
CoroTask(CoroTask const&) = delete;
|
||||
CoroTask&
|
||||
operator=(CoroTask const&) = delete;
|
||||
|
||||
Handle
|
||||
handle() const
|
||||
{
|
||||
return handle_;
|
||||
}
|
||||
|
||||
bool
|
||||
done() const
|
||||
{
|
||||
return handle_ && handle_.done();
|
||||
}
|
||||
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
std::coroutine_handle<>
|
||||
await_suspend(std::coroutine_handle<> caller) noexcept
|
||||
{
|
||||
handle_.promise().continuation_ = caller;
|
||||
return handle_;
|
||||
}
|
||||
|
||||
T
|
||||
await_resume()
|
||||
{
|
||||
auto& result = handle_.promise().result_;
|
||||
if (auto* ep = std::get_if<2>(&result))
|
||||
std::rethrow_exception(*ep);
|
||||
return std::get<1>(std::move(result));
|
||||
}
|
||||
|
||||
private:
|
||||
Handle handle_;
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
158
include/xrpl/core/CoroTaskRunner.ipp
Normal file
158
include/xrpl/core/CoroTaskRunner.ipp
Normal file
@@ -0,0 +1,158 @@
|
||||
#pragma once
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
|
||||
create_t,
|
||||
JobQueue& jq,
|
||||
JobType type,
|
||||
std::string const& name)
|
||||
: jq_(jq), type_(type), name_(name), running_(false)
|
||||
{
|
||||
}
|
||||
|
||||
template <class F>
|
||||
void
|
||||
JobQueue::CoroTaskRunner::init(F&& f)
|
||||
{
|
||||
// Store the callable on the heap so it outlives the coroutine frame.
|
||||
// Coroutine frames store a reference to the callable's implicit object
|
||||
// parameter (the lambda). If the callable is a temporary, that reference
|
||||
// dangles after the caller returns. Keeping the callable alive here
|
||||
// ensures the coroutine's captures remain valid.
|
||||
using Fn = std::decay_t<F>;
|
||||
auto store = std::make_unique<FuncStore<Fn>>(std::forward<F>(f));
|
||||
task_ = store->func(shared_from_this());
|
||||
storedFunc_ = std::move(store);
|
||||
}
|
||||
|
||||
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
XRPL_ASSERT(finished_, "xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
|
||||
#endif
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::onSuspend()
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
++jq_.nSuspend_;
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::onUndoSuspend()
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
}
|
||||
|
||||
inline auto
|
||||
JobQueue::CoroTaskRunner::suspend()
|
||||
{
|
||||
struct SuspendAwaiter
|
||||
{
|
||||
CoroTaskRunner& runner_;
|
||||
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
await_suspend(std::coroutine_handle<>) const
|
||||
{
|
||||
runner_.onSuspend();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() const noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
return SuspendAwaiter{*this};
|
||||
}
|
||||
|
||||
inline bool
|
||||
JobQueue::CoroTaskRunner::post()
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = true;
|
||||
}
|
||||
|
||||
// sp prevents 'this' from being destroyed while the job is pending
|
||||
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// The coroutine will not run. Clean up running_.
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = false;
|
||||
cv_.notify_all();
|
||||
return false;
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::resume()
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = true;
|
||||
}
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
}
|
||||
auto saved = detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(&lvs_);
|
||||
std::lock_guard lock(mutex_);
|
||||
XRPL_ASSERT(!task_.done(), "xrpl::JobQueue::CoroTaskRunner::resume : task is not done");
|
||||
task_.handle().resume();
|
||||
detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(saved);
|
||||
#ifndef NDEBUG
|
||||
if (task_.done())
|
||||
finished_ = true;
|
||||
#endif
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = false;
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
inline bool
|
||||
JobQueue::CoroTaskRunner::runnable() const
|
||||
{
|
||||
return !task_.done();
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::expectEarlyExit()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
if (!finished_)
|
||||
#endif
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
#ifndef NDEBUG
|
||||
finished_ = true;
|
||||
#endif
|
||||
}
|
||||
// Destroy the coroutine frame to break a potential shared_ptr cycle.
|
||||
// The coroutine is at initial_suspend and never ran user code, so
|
||||
// destroying it is safe. Without this, the frame holds a shared_ptr
|
||||
// back to this CoroTaskRunner, creating an unreachable reference cycle.
|
||||
task_ = {};
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::join()
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex_run_);
|
||||
cv_.wait(lk, [this]() { return running_ == false; });
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
#include <xrpl/basics/LocalValue.h>
|
||||
#include <xrpl/core/ClosureCounter.h>
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/core/JobTypeData.h>
|
||||
#include <xrpl/core/JobTypes.h>
|
||||
#include <xrpl/core/detail/Workers.h>
|
||||
@@ -9,6 +10,7 @@
|
||||
|
||||
#include <boost/coroutine/all.hpp>
|
||||
|
||||
#include <coroutine>
|
||||
#include <set>
|
||||
|
||||
namespace xrpl {
|
||||
@@ -119,6 +121,112 @@ public:
|
||||
join();
|
||||
};
|
||||
|
||||
/** C++20 coroutine lifecycle manager. Replaces Coro for new code. */
|
||||
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
|
||||
{
|
||||
private:
|
||||
detail::LocalValues lvs_;
|
||||
JobQueue& jq_;
|
||||
JobType type_;
|
||||
std::string name_;
|
||||
bool running_;
|
||||
std::mutex mutex_;
|
||||
std::mutex mutex_run_;
|
||||
std::condition_variable cv_;
|
||||
CoroTask<void> task_;
|
||||
|
||||
// Type-erased storage to keep the coroutine callable alive.
|
||||
// Coroutine frames store a reference to the callable's implicit
|
||||
// object parameter (the lambda). If the callable is a temporary,
|
||||
// that reference dangles after the call returns. Storing the
|
||||
// callable on the heap here ensures it outlives the coroutine.
|
||||
struct FuncBase
|
||||
{
|
||||
virtual ~FuncBase() = default;
|
||||
};
|
||||
template <class F>
|
||||
struct FuncStore : FuncBase
|
||||
{
|
||||
F func;
|
||||
explicit FuncStore(F&& f) : func(std::move(f))
|
||||
{
|
||||
}
|
||||
};
|
||||
std::unique_ptr<FuncBase> storedFunc_;
|
||||
|
||||
#ifndef NDEBUG
|
||||
bool finished_ = false;
|
||||
#endif
|
||||
|
||||
public:
|
||||
struct create_t
|
||||
{
|
||||
explicit create_t() = default;
|
||||
};
|
||||
|
||||
// Private: Used in the implementation of postCoroTask
|
||||
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
|
||||
|
||||
// Not copy-constructible or assignable
|
||||
CoroTaskRunner(CoroTaskRunner const&) = delete;
|
||||
CoroTaskRunner&
|
||||
operator=(CoroTaskRunner const&) = delete;
|
||||
|
||||
~CoroTaskRunner();
|
||||
|
||||
/** Initialize with a coroutine function.
|
||||
Must be called exactly once, after the object is managed by
|
||||
shared_ptr. This is handled automatically by postCoroTask().
|
||||
*/
|
||||
template <class F>
|
||||
void
|
||||
init(F&& f);
|
||||
|
||||
/** Increment the suspended coroutine count.
|
||||
Called when the coroutine is about to suspend.
|
||||
*/
|
||||
void
|
||||
onSuspend();
|
||||
|
||||
/** Decrement the suspended coroutine count without side effects.
|
||||
Used to undo onSuspend() when a suspend is cancelled.
|
||||
*/
|
||||
void
|
||||
onUndoSuspend();
|
||||
|
||||
/** Suspend coroutine execution.
|
||||
Returns an awaiter for use with co_await.
|
||||
Effects:
|
||||
Increments nSuspend_ in the JobQueue.
|
||||
The coroutine is suspended.
|
||||
The caller must later call post() or resume() to continue.
|
||||
*/
|
||||
auto
|
||||
suspend();
|
||||
|
||||
/** Schedule coroutine execution on the JobQueue.
|
||||
@return true if the job is added to the JobQueue.
|
||||
*/
|
||||
bool
|
||||
post();
|
||||
|
||||
/** Resume coroutine on current thread. */
|
||||
void
|
||||
resume();
|
||||
|
||||
/** Returns true if coroutine hasn't completed. */
|
||||
bool
|
||||
runnable() const;
|
||||
|
||||
/** Once called, allows early exit without an assert. */
|
||||
void
|
||||
expectEarlyExit();
|
||||
|
||||
/** Waits until coroutine completes. */
|
||||
void
|
||||
join();
|
||||
};
|
||||
|
||||
using JobFunction = std::function<void()>;
|
||||
|
||||
JobQueue(
|
||||
@@ -165,6 +273,19 @@ public:
|
||||
std::shared_ptr<Coro>
|
||||
postCoro(JobType t, std::string const& name, F&& f);
|
||||
|
||||
/** Creates a C++20 coroutine and adds a job to the queue to run it.
|
||||
|
||||
@param t The type of job.
|
||||
@param name Name of the job.
|
||||
@param f Callable with signature
|
||||
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
|
||||
|
||||
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
|
||||
*/
|
||||
template <class F>
|
||||
std::shared_ptr<CoroTaskRunner>
|
||||
postCoroTask(JobType t, std::string const& name, F&& f);
|
||||
|
||||
/** Jobs waiting at this priority.
|
||||
*/
|
||||
int
|
||||
@@ -379,6 +500,7 @@ private:
|
||||
} // namespace xrpl
|
||||
|
||||
#include <xrpl/core/Coro.ipp>
|
||||
#include <xrpl/core/CoroTaskRunner.ipp>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
@@ -401,4 +523,26 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
|
||||
return coro;
|
||||
}
|
||||
|
||||
template <class F>
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner>
|
||||
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
|
||||
{
|
||||
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
|
||||
runner->init(std::forward<F>(f));
|
||||
|
||||
// Account for the initial suspension (lazy start).
|
||||
// Mirrors the yield() in the Boost Coro constructor.
|
||||
{
|
||||
std::lock_guard lock(m_mutex);
|
||||
++nSuspend_;
|
||||
}
|
||||
|
||||
if (!runner->post())
|
||||
{
|
||||
runner->expectEarlyExit();
|
||||
runner.reset();
|
||||
}
|
||||
return runner;
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
56
include/xrpl/core/JobQueueAwaiter.h
Normal file
56
include/xrpl/core/JobQueueAwaiter.h
Normal file
@@ -0,0 +1,56 @@
|
||||
#pragma once
|
||||
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
|
||||
#include <coroutine>
|
||||
#include <memory>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/** Awaiter that suspends and immediately reschedules on the JobQueue.
|
||||
Equivalent to calling yield() followed by post() in the old Coro API.
|
||||
|
||||
Usage:
|
||||
co_await JobQueueAwaiter{runner};
|
||||
|
||||
What it waits for: The coroutine is re-queued as a job and resumes
|
||||
when a worker thread picks it up.
|
||||
|
||||
Which thread resumes: A JobQueue worker thread.
|
||||
|
||||
What await_resume() returns: void.
|
||||
*/
|
||||
struct JobQueueAwaiter
|
||||
{
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
|
||||
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
await_suspend(std::coroutine_handle<>)
|
||||
{
|
||||
// Increment nSuspend (equivalent to yield())
|
||||
runner->onSuspend();
|
||||
// Schedule resume on JobQueue (equivalent to post())
|
||||
if (!runner->post())
|
||||
{
|
||||
// JobQueue is stopping. Undo the suspend count and
|
||||
// don't actually suspend — the coroutine continues
|
||||
// immediately so it can clean up and co_return.
|
||||
runner->onUndoSuspend();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() const noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
460
src/test/core/CoroTask_test.cpp
Normal file
460
src/test/core/CoroTask_test.cpp
Normal file
@@ -0,0 +1,460 @@
|
||||
#include <test/jtx.h>
|
||||
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/core/JobQueueAwaiter.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
|
||||
namespace xrpl {
|
||||
namespace test {
|
||||
|
||||
class CoroTask_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
class gate
|
||||
{
|
||||
private:
|
||||
std::condition_variable cv_;
|
||||
std::mutex mutex_;
|
||||
bool signaled_ = false;
|
||||
|
||||
public:
|
||||
template <class Rep, class Period>
|
||||
bool
|
||||
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex_);
|
||||
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
|
||||
signaled_ = false;
|
||||
return b;
|
||||
}
|
||||
|
||||
void
|
||||
signal()
|
||||
{
|
||||
std::lock_guard lk(mutex_);
|
||||
signaled_ = true;
|
||||
cv_.notify_all();
|
||||
}
|
||||
};
|
||||
|
||||
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
|
||||
// pointer-by-value captures instead of [&] to work around a GCC 14
|
||||
// bug where reference captures in coroutine lambdas are corrupted
|
||||
// in the coroutine frame.
|
||||
|
||||
// Test: CoroTask<void> runs to completion
|
||||
void
|
||||
testVoidCompletion()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("void completion");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: correct_order — suspend, join, post, complete
|
||||
// Mirrors existing Coroutine_test::correct_order
|
||||
void
|
||||
testCorrectOrder()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("correct order");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g1, g2;
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> r;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[rp = &r, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
|
||||
*rp = runner;
|
||||
g1p->signal();
|
||||
co_await runner->suspend();
|
||||
g2p->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g1.wait_for(5s));
|
||||
runner->join();
|
||||
runner->post();
|
||||
BEAST_EXPECT(g2.wait_for(5s));
|
||||
runner->join();
|
||||
}
|
||||
|
||||
// Test: incorrect_order — post before suspend
|
||||
// Mirrors existing Coroutine_test::incorrect_order
|
||||
void
|
||||
testIncorrectOrder()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("incorrect order");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto runner) -> CoroTask<void> {
|
||||
runner->post();
|
||||
co_await runner->suspend();
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
}
|
||||
|
||||
// Test: JobQueueAwaiter — suspend + auto-repost
|
||||
void
|
||||
testJobQueueAwaiter()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("JobQueueAwaiter");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int step = 0;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [sp = &step, gp = &g](auto runner) -> CoroTask<void> {
|
||||
*sp = 1;
|
||||
co_await JobQueueAwaiter{runner};
|
||||
*sp = 2;
|
||||
co_await JobQueueAwaiter{runner};
|
||||
*sp = 3;
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(step == 3);
|
||||
}
|
||||
|
||||
// Test: thread_specific_storage — per-coroutine LocalValue isolation
|
||||
// Mirrors existing Coroutine_test::thread_specific_storage
|
||||
void
|
||||
testThreadSpecificStorage()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("thread specific storage");
|
||||
Env env(*this);
|
||||
|
||||
auto& jq = env.app().getJobQueue();
|
||||
|
||||
static int const N = 4;
|
||||
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
|
||||
|
||||
LocalValue<int> lv(-1);
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
|
||||
gate g;
|
||||
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
|
||||
this->BEAST_EXPECT(*lv == -1);
|
||||
*lv = -2;
|
||||
this->BEAST_EXPECT(*lv == -2);
|
||||
g.signal();
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
jq.postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
|
||||
(*ap)[id] = runner;
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
|
||||
this->BEAST_EXPECT(**lvp == -1);
|
||||
**lvp = id;
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
a[i]->join();
|
||||
}
|
||||
for (auto const& r : a)
|
||||
{
|
||||
r->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
r->join();
|
||||
}
|
||||
for (auto const& r : a)
|
||||
{
|
||||
r->post();
|
||||
r->join();
|
||||
}
|
||||
|
||||
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
|
||||
this->BEAST_EXPECT(*lv == -2);
|
||||
g.signal();
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
}
|
||||
|
||||
// Test: exception propagation
|
||||
void
|
||||
testExceptionPropagation()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("exception propagation");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
|
||||
gp->signal();
|
||||
throw std::runtime_error("test exception");
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
// The exception is caught by promise_type::unhandled_exception()
|
||||
// and the coroutine is considered done
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: multiple sequential co_await points
|
||||
void
|
||||
testMultipleYields()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("multiple yields");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int counter = 0;
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> r;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[rp = &r, cp = &counter, gp = &g](auto runner) -> CoroTask<void> {
|
||||
*rp = runner;
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 1);
|
||||
runner->join();
|
||||
|
||||
runner->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 2);
|
||||
runner->join();
|
||||
|
||||
runner->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 3);
|
||||
runner->join();
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: CoroTask<T> returns a value via co_return
|
||||
void
|
||||
testValueReturn()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value return");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int result = 0;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
|
||||
auto inner = []() -> CoroTask<int> { co_return 42; };
|
||||
*rp = co_await inner();
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(result == 42);
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: CoroTask<T> propagates exceptions from inner coroutines
|
||||
void
|
||||
testValueException()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value exception");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
bool caught = false;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [cp = &caught, gp = &g](auto) -> CoroTask<void> {
|
||||
auto inner = []() -> CoroTask<int> {
|
||||
throw std::runtime_error("inner error");
|
||||
co_return 0;
|
||||
};
|
||||
try
|
||||
{
|
||||
co_await inner();
|
||||
}
|
||||
catch (std::runtime_error const& e)
|
||||
{
|
||||
*cp = true;
|
||||
}
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(caught);
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: CoroTask<T> chaining — nested value-returning coroutines
|
||||
void
|
||||
testValueChaining()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value chaining");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int result = 0;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
|
||||
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
|
||||
auto mul = [add](int a, int b) -> CoroTask<int> {
|
||||
int sum = co_await add(a, b);
|
||||
co_return sum * 2;
|
||||
};
|
||||
*rp = co_await mul(3, 4);
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(result == 14); // (3 + 4) * 2
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
// Test: postCoroTask returns nullptr when JobQueue is stopping
|
||||
void
|
||||
testShutdownRejection()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("shutdown rejection");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
// Stop the JobQueue
|
||||
env.app().getJobQueue().stop();
|
||||
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [](auto) -> CoroTask<void> { co_return; });
|
||||
BEAST_EXPECT(!runner);
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testVoidCompletion();
|
||||
testCorrectOrder();
|
||||
testIncorrectOrder();
|
||||
testJobQueueAwaiter();
|
||||
testThreadSpecificStorage();
|
||||
testExceptionPropagation();
|
||||
testMultipleYields();
|
||||
testValueReturn();
|
||||
testValueException();
|
||||
testValueChaining();
|
||||
testShutdownRejection();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
|
||||
|
||||
} // namespace test
|
||||
} // namespace xrpl
|
||||
Reference in New Issue
Block a user