Compare commits

..

5 Commits

Author SHA1 Message Date
Pratik Mankawde
ca15c0efd7 Add C++20 coroutine primitives: CoroTask, CoroTaskRunner, JobQueueAwaiter
Introduce the core building blocks for migrating from Boost.Coroutine to
C++20 stackless coroutines (Milestone 1):

- CoroTask<T>: RAII coroutine return type with promise_type, symmetric
  transfer via FinalAwaiter, and lazy start (suspend_always)
- CoroTaskRunner: Lifecycle manager (nested in JobQueue) mirroring the
  existing Coro class — handles LocalValues swap, nSuspend_ accounting,
  mutex-guarded resume, and join/post semantics
- JobQueueAwaiter: Convenience awaiter combining suspend + auto-repost,
  with graceful fallback when JobQueue is stopping
- postCoroTask(): JobQueue entry point for launching C++20 coroutines
- CoroTask_test.cpp: 8 unit tests covering completion, suspend/resume
  ordering, LocalValue isolation, exception propagation, and shutdown

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 17:38:28 +00:00
Pratik Mankawde
bb4bc1d167 doc updated with branch names
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-02-25 17:38:08 +00:00
Pratik Mankawde
b9d14fb9e1 document update
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-02-25 17:03:00 +00:00
Pratik Mankawde
af30b71043 Plan doc added
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-02-25 16:36:45 +00:00
Ayaz Salikhov
65e63ebef3 chore: Update cleanup-workspace to delete old .conan2 dir on macOS (#6412) 2026-02-25 01:12:16 +00:00
10 changed files with 2376 additions and 51 deletions

View File

@@ -101,7 +101,7 @@ jobs:
steps:
- name: Cleanup workspace (macOS and Windows)
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

View File

@@ -64,7 +64,7 @@ jobs:
steps:
- name: Cleanup workspace (macOS and Windows)
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,291 @@
#pragma once
#include <coroutine>
#include <exception>
#include <utility>
#include <variant>
namespace xrpl {
template <typename T = void>
class CoroTask;
// --------------------------------------------------------------------------
// CoroTask<void> — coroutine return type for void-returning coroutines
// --------------------------------------------------------------------------
template <>
class CoroTask<void>
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
struct promise_type
{
std::exception_ptr exception_;
std::coroutine_handle<> continuation_;
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
std::suspend_always
initial_suspend() noexcept
{
return {};
}
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
void
return_void()
{
}
void
unhandled_exception()
{
exception_ = std::current_exception();
}
};
CoroTask() = default;
explicit CoroTask(Handle h) : handle_(h)
{
}
~CoroTask()
{
if (handle_)
handle_.destroy();
}
CoroTask(CoroTask&& other) noexcept
: handle_(std::exchange(other.handle_, {}))
{
}
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
Handle
handle() const
{
return handle_;
}
bool
done() const
{
return handle_ && handle_.done();
}
// Awaiter interface — allows co_await on a CoroTask
bool
await_ready() const noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_; // Symmetric transfer
}
void
await_resume()
{
if (auto& ep = handle_.promise().exception_)
std::rethrow_exception(ep);
}
private:
Handle handle_;
};
// --------------------------------------------------------------------------
// CoroTask<T> — coroutine return type for value-returning coroutines
// --------------------------------------------------------------------------
template <typename T>
class CoroTask
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
struct promise_type
{
std::variant<std::monostate, T, std::exception_ptr> result_;
std::coroutine_handle<> continuation_;
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
std::suspend_always
initial_suspend() noexcept
{
return {};
}
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
void
return_value(T value)
{
result_.template emplace<1>(std::move(value));
}
void
unhandled_exception()
{
result_.template emplace<2>(std::current_exception());
}
};
CoroTask() = default;
explicit CoroTask(Handle h) : handle_(h)
{
}
~CoroTask()
{
if (handle_)
handle_.destroy();
}
CoroTask(CoroTask&& other) noexcept
: handle_(std::exchange(other.handle_, {}))
{
}
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
Handle
handle() const
{
return handle_;
}
bool
done() const
{
return handle_ && handle_.done();
}
bool
await_ready() const noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_;
}
T
await_resume()
{
auto& result = handle_.promise().result_;
if (auto* ep = std::get_if<2>(&result))
std::rethrow_exception(*ep);
return std::get<1>(std::move(result));
}
private:
Handle handle_;
};
} // namespace xrpl

View File

@@ -0,0 +1,150 @@
#pragma once
namespace xrpl {
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
create_t,
JobQueue& jq,
JobType type,
std::string const& name)
: jq_(jq), type_(type), name_(name), running_(false)
{
}
template <class F>
void
JobQueue::CoroTaskRunner::init(F&& f)
{
task_ = std::forward<F>(f)(shared_from_this());
}
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
{
#ifndef NDEBUG
XRPL_ASSERT(
finished_,
"xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
#endif
}
inline void
JobQueue::CoroTaskRunner::onSuspend()
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
inline void
JobQueue::CoroTaskRunner::onUndoSuspend()
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
inline auto
JobQueue::CoroTaskRunner::suspend()
{
struct SuspendAwaiter
{
CoroTaskRunner& runner_;
bool
await_ready() const noexcept
{
return false;
}
void
await_suspend(std::coroutine_handle<>) const
{
runner_.onSuspend();
}
void
await_resume() const noexcept
{
}
};
return SuspendAwaiter{*this};
}
inline bool
JobQueue::CoroTaskRunner::post()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// sp prevents 'this' from being destroyed while the job is pending
if (jq_.addJob(
type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
inline void
JobQueue::CoroTaskRunner::resume()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(
!task_.done(),
"xrpl::JobQueue::CoroTaskRunner::resume : task is not done");
task_.handle().resume();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
#ifndef NDEBUG
if (task_.done())
finished_ = true;
#endif
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
inline bool
JobQueue::CoroTaskRunner::runnable() const
{
return !task_.done();
}
inline void
JobQueue::CoroTaskRunner::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
}
inline void
JobQueue::CoroTaskRunner::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
}
} // namespace xrpl

View File

@@ -9,6 +9,9 @@
#include <boost/coroutine/all.hpp>
#include <xrpl/core/CoroTask.h>
#include <coroutine>
#include <set>
namespace xrpl {
@@ -119,6 +122,96 @@ public:
join();
};
/** C++20 coroutine lifecycle manager. Replaces Coro for new code. */
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
{
private:
detail::LocalValues lvs_;
JobQueue& jq_;
JobType type_;
std::string name_;
bool running_;
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
CoroTask<void> task_;
#ifndef NDEBUG
bool finished_ = false;
#endif
public:
struct create_t
{
explicit create_t() = default;
};
// Private: Used in the implementation of postCoroTask
CoroTaskRunner(
create_t,
JobQueue&,
JobType,
std::string const&);
// Not copy-constructible or assignable
CoroTaskRunner(CoroTaskRunner const&) = delete;
CoroTaskRunner&
operator=(CoroTaskRunner const&) = delete;
~CoroTaskRunner();
/** Initialize with a coroutine function.
Must be called exactly once, after the object is managed by
shared_ptr. This is handled automatically by postCoroTask().
*/
template <class F>
void
init(F&& f);
/** Increment the suspended coroutine count.
Called when the coroutine is about to suspend.
*/
void
onSuspend();
/** Decrement the suspended coroutine count without side effects.
Used to undo onSuspend() when a suspend is cancelled.
*/
void
onUndoSuspend();
/** Suspend coroutine execution.
Returns an awaiter for use with co_await.
Effects:
Increments nSuspend_ in the JobQueue.
The coroutine is suspended.
The caller must later call post() or resume() to continue.
*/
auto
suspend();
/** Schedule coroutine execution on the JobQueue.
@return true if the job is added to the JobQueue.
*/
bool
post();
/** Resume coroutine on current thread. */
void
resume();
/** Returns true if coroutine hasn't completed. */
bool
runnable() const;
/** Once called, allows early exit without an assert. */
void
expectEarlyExit();
/** Waits until coroutine completes. */
void
join();
};
using JobFunction = std::function<void()>;
JobQueue(
@@ -165,6 +258,19 @@ public:
std::shared_ptr<Coro>
postCoro(JobType t, std::string const& name, F&& f);
/** Creates a C++20 coroutine and adds a job to the queue to run it.
@param t The type of job.
@param name Name of the job.
@param f Callable with signature
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
*/
template <class F>
std::shared_ptr<CoroTaskRunner>
postCoroTask(JobType t, std::string const& name, F&& f);
/** Jobs waiting at this priority.
*/
int
@@ -379,6 +485,7 @@ private:
} // namespace xrpl
#include <xrpl/core/Coro.ipp>
#include <xrpl/core/CoroTaskRunner.ipp>
namespace xrpl {
@@ -401,4 +508,27 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
return coro;
}
template <class F>
std::shared_ptr<JobQueue::CoroTaskRunner>
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
{
auto runner = std::make_shared<CoroTaskRunner>(
CoroTaskRunner::create_t{}, *this, t, name);
runner->init(std::forward<F>(f));
// Account for the initial suspension (lazy start).
// Mirrors the yield() in the Boost Coro constructor.
{
std::lock_guard lock(m_mutex);
++nSuspend_;
}
if (!runner->post())
{
runner->expectEarlyExit();
runner.reset();
}
return runner;
}
} // namespace xrpl

View File

@@ -0,0 +1,56 @@
#pragma once
#include <xrpl/core/JobQueue.h>
#include <coroutine>
#include <memory>
namespace xrpl {
/** Awaiter that suspends and immediately reschedules on the JobQueue.
Equivalent to calling yield() followed by post() in the old Coro API.
Usage:
co_await JobQueueAwaiter{runner};
What it waits for: The coroutine is re-queued as a job and resumes
when a worker thread picks it up.
Which thread resumes: A JobQueue worker thread.
What await_resume() returns: void.
*/
struct JobQueueAwaiter
{
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
bool
await_ready() const noexcept
{
return false;
}
bool
await_suspend(std::coroutine_handle<>)
{
// Increment nSuspend (equivalent to yield())
runner->onSuspend();
// Schedule resume on JobQueue (equivalent to post())
if (!runner->post())
{
// JobQueue is stopping. Undo the suspend count and
// don't actually suspend — the coroutine continues
// immediately so it can clean up and co_return.
runner->onUndoSuspend();
return false;
}
return true;
}
void
await_resume() const noexcept
{
}
};
} // namespace xrpl

View File

@@ -16,7 +16,6 @@
// Add new amendments to the top of this list.
// Keep it sorted in reverse chronological order.
XRPL_FEATURE(DefragDirectories, Supported::no, VoteBehavior::DefaultNo)
XRPL_FIX (PermissionedDomainInvariant, Supported::yes, VoteBehavior::DefaultNo)
XRPL_FIX (ExpiredNFTokenOfferRemoval, Supported::yes, VoteBehavior::DefaultNo)
XRPL_FIX (BatchInnerSigs, Supported::no, VoteBehavior::DefaultNo)

View File

@@ -10,14 +10,6 @@ namespace xrpl {
namespace directory {
struct Gap
{
uint64_t const page;
SLE::pointer node;
uint64_t const nextPage;
SLE::pointer next;
};
std::uint64_t
createRoot(
ApplyView& view,
@@ -119,9 +111,7 @@ insertPage(
if (page == 0)
return std::nullopt;
if (!view.rules().enabled(fixDirectoryLimit) && page >= dirNodeMaxPages) // Old pages limit
{
return std::nullopt;
}
// We are about to create a new node; we'll link it to
// the chain first:
@@ -143,8 +133,12 @@ insertPage(
// it's the default.
if (page != 1)
node->setFieldU64(sfIndexPrevious, page - 1);
XRPL_ASSERT_PARTS(!nextPage, "xrpl::directory::insertPage", "nextPage has default value");
/* Reserved for future use when directory pages may be inserted in
* between two other pages instead of only at the end of the chain.
if (nextPage)
node->setFieldU64(sfIndexNext, nextPage);
*/
describe(node);
view.insert(node);
@@ -160,7 +154,7 @@ ApplyView::dirAdd(
uint256 const& key,
std::function<void(std::shared_ptr<SLE> const&)> const& describe)
{
auto const root = peek(directory);
auto root = peek(directory);
if (!root)
{
@@ -170,43 +164,6 @@ ApplyView::dirAdd(
auto [page, node, indexes] = directory::findPreviousPage(*this, directory, root);
if (rules().enabled(featureDefragDirectories))
{
// If there are more nodes than just the root, and there's no space in
// the last one, walk backwards to find one with space, or to find one
// missing.
std::optional<directory::Gap> gapPages;
while (page && indexes.size() >= dirNodeMaxEntries)
{
// Find a page with space, or a gap in pages.
auto [prevPage, prevNode, prevIndexes] =
directory::findPreviousPage(*this, directory, node);
if (!gapPages && prevPage != page - 1)
gapPages.emplace(prevPage, prevNode, page, node);
page = prevPage;
node = prevNode;
indexes = prevIndexes;
}
// We looped through all the pages back to the root.
if (!page)
{
// If we found a gap, use it.
if (gapPages)
{
return directory::insertPage(
*this,
gapPages->page,
gapPages->node,
gapPages->nextPage,
gapPages->next,
key,
directory,
describe);
}
std::tie(page, node, indexes) = directory::findPreviousPage(*this, directory, root);
}
}
// If there's space, we use it:
if (indexes.size() < dirNodeMaxEntries)
{

View File

@@ -0,0 +1,358 @@
#include <test/jtx.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/JobQueueAwaiter.h>
#include <chrono>
#include <mutex>
namespace xrpl {
namespace test {
class CoroTask_test : public beast::unit_test::suite
{
public:
class gate
{
private:
std::condition_variable cv_;
std::mutex mutex_;
bool signaled_ = false;
public:
template <class Rep, class Period>
bool
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
{
std::unique_lock<std::mutex> lk(mutex_);
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
signaled_ = false;
return b;
}
void
signal()
{
std::lock_guard lk(mutex_);
signaled_ = true;
cv_.notify_all();
}
};
// Test: CoroTask<void> runs to completion
void
testVoidCompletion()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("void completion");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[&](auto) -> CoroTask<void> {
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(!runner->runnable());
}
// Test: correct_order — suspend, join, post, complete
// Mirrors existing Coroutine_test::correct_order
void
testCorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("correct order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g1, g2;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[&](auto runner) -> CoroTask<void> {
r = runner;
g1.signal();
co_await runner->suspend();
g2.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g1.wait_for(5s));
runner->join();
runner->post();
BEAST_EXPECT(g2.wait_for(5s));
runner->join();
}
// Test: incorrect_order — post before suspend
// Mirrors existing Coroutine_test::incorrect_order
void
testIncorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("incorrect order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[&](auto runner) -> CoroTask<void> {
runner->post();
co_await runner->suspend();
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
// Test: JobQueueAwaiter — suspend + auto-repost
void
testJobQueueAwaiter()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("JobQueueAwaiter");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int step = 0;
env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[&](auto runner) -> CoroTask<void> {
step = 1;
co_await JobQueueAwaiter{runner};
step = 2;
co_await JobQueueAwaiter{runner};
step = 3;
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(step == 3);
}
// Test: thread_specific_storage — per-coroutine LocalValue isolation
// Mirrors existing Coroutine_test::thread_specific_storage
void
testThreadSpecificStorage()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("thread specific storage");
Env env(*this);
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
gate g;
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -1);
*lv = -2;
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
for (int i = 0; i < N; ++i)
{
jq.postCoroTask(
jtCLIENT,
"CoroTaskTest",
[&, id = i](auto runner) -> CoroTask<void> {
a[id] = runner;
g.signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == -1);
*lv = id;
this->BEAST_EXPECT(*lv == id);
g.signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}
for (auto const& r : a)
{
r->post();
BEAST_EXPECT(g.wait_for(5s));
r->join();
}
for (auto const& r : a)
{
r->post();
r->join();
}
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
}
// Test: exception propagation
void
testExceptionPropagation()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("exception propagation");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[&](auto) -> CoroTask<void> {
g.signal();
throw std::runtime_error("test exception");
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
// The exception is caught by promise_type::unhandled_exception()
// and the coroutine is considered done
BEAST_EXPECT(!runner->runnable());
}
// Test: multiple sequential co_await points
void
testMultipleYields()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("multiple yields");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int counter = 0;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[&](auto runner) -> CoroTask<void> {
r = runner;
++counter;
g.signal();
co_await runner->suspend();
++counter;
g.signal();
co_await runner->suspend();
++counter;
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 1);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 2);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 3);
runner->join();
BEAST_EXPECT(!runner->runnable());
}
// Test: postCoroTask returns nullptr when JobQueue is stopping
void
testShutdownRejection()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("shutdown rejection");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
// Stop the JobQueue
env.app().getJobQueue().stop();
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[&](auto) -> CoroTask<void> { co_return; });
BEAST_EXPECT(!runner);
}
void
run() override
{
testVoidCompletion();
testCorrectOrder();
testIncorrectOrder();
testJobQueueAwaiter();
testThreadSpecificStorage();
testExceptionPropagation();
testMultipleYields();
testShutdownRejection();
}
};
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
} // namespace test
} // namespace xrpl