Compare commits

..

21 Commits

Author SHA1 Message Date
Pratik Mankawde
de8f8a5368 Remove Boost.Coroutine dependency and old Coro API
Delete the old Boost.Coroutine-based Coro class and all supporting
infrastructure, now that all callers have been migrated to the C++20
CoroTaskRunner API:

- Delete include/xrpl/core/Coro.ipp (entire file)
- Remove class Coro, Coro_create_t, postCoro() from JobQueue.h
- Remove the large comment block explaining old Coro lifecycle
- Remove boost::coroutine from cmake COMPONENTS and link targets
- Remove Boost::context from cmake find_dependency (was only needed
  by Boost.Coroutine)
- Remove BOOST_COROUTINES_NO_DEPRECATION_WARNING compile definition
- Remove boost::coroutine from conanfile.py requirements

This completes the Boost.Coroutine → C++20 coroutines migration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:21:31 +00:00
Pratik Mankawde
3e37424b19 Use pointer-by-value captures in coroutine test lambdas
Defense-in-depth against a GCC 14 bug where reference captures in
coroutine lambdas are corrupted in the coroutine frame. The root
cause is fixed in CoroTaskRunner::init() (heap storage), but using
explicit pointer captures avoids any residual risk.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:21:20 +00:00
Pratik Mankawde
3dcab21342 Remove redundant CoroTask.h includes from test files
CoroTask.h is already transitively included via JobQueue.h.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:21:19 +00:00
Pratik Mankawde
44fd14b26b Apply pre-commit formatting fixes to test files
clang-format auto-formatting adjustments.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:21:19 +00:00
Pratik Mankawde
16348fd6c5 Migrate Coroutine_test and JobQueue_test from Boost.Coroutine to C++20 coroutines
Rewrite all test coroutine code to use postCoroTask() and co_await
runner->suspend() instead of the old postCoro() / Coro::yield() API:

- Coroutine_test: Migrate correct_order, incorrect_order, and
  thread_specific_storage tests. Replace shared_ptr<JobQueue::Coro>
  with shared_ptr<JobQueue::CoroTaskRunner>, yield() with co_await
  runner->suspend().
- JobQueue_test: Rename testPostCoro to testPostCoroTask. Migrate all
  3 sub-tests (repeated post, repeated resume, shutdown rejection)
  to the new API.

After this change, zero .cpp files reference postCoro() or
JobQueue::Coro. The old API declarations remain in JobQueue.h and
Coro.ipp for removal in the cleanup milestone.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:21:19 +00:00
Pratik Mankawde
f7fd476748 Update processRequest comment to Doxygen format
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:21:11 +00:00
Pratik Mankawde
f1011b60a7 Add missing words to cspell dictionary
Add cppcoro, fcontext, gantt, pratik, repost, stackful to
cspell.config.yaml to fix cspell check failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:21:11 +00:00
Pratik Mankawde
a5418d041e Apply pre-commit formatting fixes
clang-format and prettier auto-formatting adjustments.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:21:11 +00:00
Pratik Mankawde
f8b7500229 Fix missed Context aggregate initialization in Application.cpp
Remove the extra {} that was for the now-deleted Context::coro field
in the RPC::JsonContext construction in Application::startGeometry().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:21:11 +00:00
Pratik Mankawde
71793e4cf2 Migrate production entry points from Boost.Coroutine to C++20 coroutines
Replace all postCoro() call sites with postCoroTask() using C++20
coroutine lambdas. The key changes are:

- Remove Context::coro field (shared_ptr<JobQueue::Coro>) from
  RPC::Context, eliminating it from all aggregate initializations
- Replace RipplePathFind's yield/post/resume pattern with a local
  std::condition_variable that blocks until path-finding completes,
  avoiding colored-function infection across the RPC call chain
- Switch ServerHandler entry points (onRequest, onWSMessage) from
  postCoro to postCoroTask with co_return lambdas
- Switch GRPCServer::CallData::process() to use postCoroTask,
  rename private handler to processRequest()
- Update Path_test and AMMTest to use postCoroTask (they set
  context.coro which no longer exists)

The old postCoro() API remains available for Coroutine_test and
JobQueue_test, which will be migrated in a subsequent commit.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:21:11 +00:00
Pratik Mankawde
51ae47cdc6 Fix shared_ptr reference cycle causing ASAN leaks in CoroTaskRunner
After a coroutine completes, the frame remains alive holding a captured
shared_ptr<CoroTaskRunner> back to its owner. This creates an unreachable
cycle: runner -> task_ -> frame -> shared_ptr<runner>.

Break the cycle in resume() by destroying the coroutine frame (task_ = {})
and the stored callable when the coroutine is done. Also fix runnable() to
handle the null-handle state after cleanup.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 17:20:53 +00:00
Pratik Mankawde
185921ea94 comments and document update
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-02-27 16:44:58 +00:00
Pratik Mankawde
403abe6408 Merge branch 'develop' into pratik/std-coro/add-coroutine-primitives 2026-02-27 15:31:40 +00:00
Mayukha Vadari
404f35d556 test: Grep for failures in CI (#6339)
This change adjusts the CI tests to make it easier to spot errors, without needing to sift through the thousands of lines of output.
2026-02-27 03:01:38 +00:00
Pratik Mankawde
eb83e111af Fix coroutine lambda lifetime and add value-returning tests
Store the coroutine callable on the heap in CoroTaskRunner::init()
via a type-erased FuncStore wrapper. Coroutine frames store a
reference to the callable's implicit object parameter (the lambda);
if the callable is a temporary, that reference dangles after the
caller returns. This caused stack-use-after-scope (ASAN), assertion
failures, and hangs across multiple compilers.

Also fix expectEarlyExit() to destroy the coroutine frame when
postCoroTask() fails, breaking a potential shared_ptr cycle.

Switch all coroutine test lambda captures from [&] to explicit
pointer-by-value as defense-in-depth against GCC 14 coroutine
frame corruption. Add value-returning CoroTask<T> tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 14:09:26 +00:00
Pratik Mankawde
464c09efc7 Apply pre-commit formatting fixes
clang-format: collapse single-line initializer lists and function
arguments. prettier: add blank lines in markdown lists.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 13:17:33 +00:00
Pratik Mankawde
897c75bc6b Add cspell dictionary words for coroutine migration plan doc
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 12:37:34 +00:00
Pratik Mankawde
ca15c0efd7 Add C++20 coroutine primitives: CoroTask, CoroTaskRunner, JobQueueAwaiter
Introduce the core building blocks for migrating from Boost.Coroutine to
C++20 stackless coroutines (Milestone 1):

- CoroTask<T>: RAII coroutine return type with promise_type, symmetric
  transfer via FinalAwaiter, and lazy start (suspend_always)
- CoroTaskRunner: Lifecycle manager (nested in JobQueue) mirroring the
  existing Coro class — handles LocalValues swap, nSuspend_ accounting,
  mutex-guarded resume, and join/post semantics
- JobQueueAwaiter: Convenience awaiter combining suspend + auto-repost,
  with graceful fallback when JobQueue is stopping
- postCoroTask(): JobQueue entry point for launching C++20 coroutines
- CoroTask_test.cpp: 8 unit tests covering completion, suspend/resume
  ordering, LocalValue isolation, exception propagation, and shutdown

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 17:38:28 +00:00
Pratik Mankawde
bb4bc1d167 doc updated with branch names
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-02-25 17:38:08 +00:00
Pratik Mankawde
b9d14fb9e1 document update
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-02-25 17:03:00 +00:00
Pratik Mankawde
af30b71043 Plan doc added
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-02-25 16:36:45 +00:00
31 changed files with 4023 additions and 483 deletions

View File

@@ -229,8 +229,21 @@ jobs:
env:
BUILD_NPROC: ${{ steps.nproc.outputs.nproc }}
run: |
./xrpld --unittest --unittest-jobs "${BUILD_NPROC}"
set -o pipefail
./xrpld --unittest --unittest-jobs "${BUILD_NPROC}" 2>&1 | tee unittest.log
- name: Show test failure summary
if: ${{ failure() && !inputs.build_only }}
working-directory: ${{ runner.os == 'Windows' && format('{0}/{1}', env.BUILD_DIR, inputs.build_type) || env.BUILD_DIR }}
run: |
if [ ! -f unittest.log ]; then
echo "unittest.log not found; embedded tests may not have run."
exit 0
fi
if ! grep -E "failed" unittest.log; then
echo "Log present but no failure lines found in unittest.log."
fi
- name: Debug failure (Linux)
if: ${{ failure() && runner.os == 'Linux' && !inputs.build_only }}
run: |

File diff suppressed because it is too large Load Diff

View File

@@ -16,8 +16,6 @@ find_dependency(Boost
COMPONENTS
chrono
container
context
coroutine
date_time
filesystem
program_options

View File

@@ -22,7 +22,6 @@ target_compile_definitions(
BOOST_FILESYSTEM_NO_DEPRECATED
>
$<$<NOT:$<BOOL:${boost_show_deprecated}>>:
BOOST_COROUTINES_NO_DEPRECATION_WARNING
BOOST_BEAST_ALLOW_DEPRECATED
BOOST_FILESYSTEM_DEPRECATED
>

View File

@@ -4,7 +4,6 @@ include(XrplSanitizers)
find_package(Boost REQUIRED
COMPONENTS chrono
container
coroutine
date_time
filesystem
json
@@ -21,7 +20,6 @@ target_link_libraries(
INTERFACE Boost::headers
Boost::chrono
Boost::container
Boost::coroutine
Boost::date_time
Boost::filesystem
Boost::json

View File

@@ -196,7 +196,6 @@ class Xrpl(ConanFile):
"boost::headers",
"boost::chrono",
"boost::container",
"boost::coroutine",
"boost::date_time",
"boost::filesystem",
"boost::json",

View File

@@ -71,12 +71,14 @@ words:
- coldwallet
- compr
- conanfile
- cppcoro
- conanrun
- confs
- connectability
- coro
- coros
- cowid
- cppcoro
- cryptocondition
- cryptoconditional
- cryptoconditions
@@ -99,11 +101,14 @@ words:
- endmacro
- exceptioned
- Falco
- fcontext
- finalizers
- firewalled
- fcontext
- fmtdur
- fsanitize
- funclets
- gantt
- gcov
- gcovr
- ghead
@@ -185,6 +190,7 @@ words:
- ostr
- pargs
- partitioner
- pratik
- paychan
- paychans
- permdex
@@ -192,6 +198,7 @@ words:
- permissioned
- pointee
- populator
- pratik
- preauth
- preauthorization
- preauthorize
@@ -206,6 +213,7 @@ words:
- queuable
- Raphson
- replayer
- repost
- rerere
- retriable
- RIPD
@@ -236,6 +244,7 @@ words:
- soci
- socidb
- sslws
- stackful
- statsd
- STATSDCOLLECTOR
- stissue

View File

@@ -10,7 +10,6 @@
#include <cctype>
#include <iterator>
#include <string>
#include <string_view>
#include <vector>
namespace beast {
@@ -182,7 +181,7 @@ split_commas(FwdIt first, FwdIt last)
template <class Result = std::vector<std::string>>
Result
split_commas(std::string_view const& s)
split_commas(boost::beast::string_view const& s)
{
return split_commas(s.begin(), s.end());
}

View File

@@ -1,122 +0,0 @@
#pragma once
#include <xrpl/basics/ByteUtilities.h>
namespace xrpl {
template <class F>
JobQueue::Coro::Coro(Coro_create_t, JobQueue& jq, JobType type, std::string const& name, F&& f)
: jq_(jq)
, type_(type)
, name_(name)
, running_(false)
, coro_(
[this, fn = std::forward<F>(f)](
boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield) {
yield_ = &do_yield;
yield();
fn(shared_from_this());
#ifndef NDEBUG
finished_ = true;
#endif
},
boost::coroutines::attributes(megabytes(1)))
{
}
inline JobQueue::Coro::~Coro()
{
#ifndef NDEBUG
XRPL_ASSERT(finished_, "xrpl::JobQueue::Coro::~Coro : is finished");
#endif
}
inline void
JobQueue::Coro::yield() const
{
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
(*yield_)();
}
inline bool
JobQueue::Coro::post()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// sp keeps 'this' alive
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
inline void
JobQueue::Coro::resume()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(static_cast<bool>(coro_), "xrpl::JobQueue::Coro::resume : is runnable");
coro_();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
inline bool
JobQueue::Coro::runnable() const
{
return static_cast<bool>(coro_);
}
inline void
JobQueue::Coro::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
// expectEarlyExit() must only ever be called from outside the
// Coro's stack. It you're inside the stack you can simply return
// and be done.
//
// That said, since we're outside the Coro's stack, we need to
// decrement the nSuspend that the Coro's call to yield caused.
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
}
inline void
JobQueue::Coro::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
}
} // namespace xrpl

View File

@@ -0,0 +1,687 @@
#pragma once
#include <coroutine>
#include <exception>
#include <utility>
#include <variant>
namespace xrpl {
template <typename T = void>
class CoroTask;
/**
* CoroTask<void> -- coroutine return type for void-returning coroutines.
*
* Class / Dependency Diagram
* ==========================
*
* CoroTask<void>
* +-----------------------------------------------+
* | - handle_ : Handle (coroutine_handle<promise>) |
* +-----------------------------------------------+
* | + handle(), done() |
* | + await_ready/suspend/resume (Awaiter iface) |
* +-----------------------------------------------+
* | owns
* v
* promise_type
* +-----------------------------------------------+
* | - exception_ : std::exception_ptr |
* | - continuation_ : std::coroutine_handle<> |
* +-----------------------------------------------+
* | + get_return_object() -> CoroTask |
* | + initial_suspend() -> suspend_always (lazy) |
* | + final_suspend() -> FinalAwaiter |
* | + return_void() |
* | + unhandled_exception() |
* +-----------------------------------------------+
* | returns at final_suspend
* v
* FinalAwaiter
* +-----------------------------------------------+
* | await_suspend(h): |
* | if continuation_ set -> symmetric transfer |
* | else -> noop_coroutine |
* +-----------------------------------------------+
*
* Design Notes
* ------------
* - Lazy start: initial_suspend returns suspend_always, so the coroutine
* body does not execute until the handle is explicitly resumed.
* - Symmetric transfer: await_suspend returns a coroutine_handle instead
* of void/bool, allowing the scheduler to jump directly to the next
* coroutine without growing the call stack.
* - Continuation chaining: when one CoroTask is co_await-ed inside
* another, the caller's handle is stored as continuation_ so
* FinalAwaiter can resume it when this task finishes.
* - Move-only: the handle is exclusively owned; copy is deleted.
*
* Usage Examples
* ==============
*
* 1. Basic void coroutine (the most common case in rippled):
*
* CoroTask<void> doWork(std::shared_ptr<CoroTaskRunner> runner) {
* // do something
* co_await runner->suspend(); // yield control
* // resumed later via runner->post() or runner->resume()
* co_return;
* }
*
* 2. co_await-ing one CoroTask<void> from another (chaining):
*
* CoroTask<void> inner() {
* // ...
* co_return;
* }
* CoroTask<void> outer() {
* co_await inner(); // continuation_ links outer -> inner
* co_return; // FinalAwaiter resumes outer
* }
*
* 3. Exceptions propagate through co_await:
*
* CoroTask<void> failing() {
* throw std::runtime_error("oops");
* co_return;
* }
* CoroTask<void> caller() {
* try { co_await failing(); }
* catch (std::runtime_error const&) { // caught here }
* }
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Dangling references in coroutine parameters.
* Coroutine parameters are copied into the frame, but references
* are NOT -- they are stored as-is. If the referent goes out of scope
* before the coroutine finishes, you get use-after-free.
*
* // BROKEN -- local dies before coroutine runs:
* CoroTask<void> bad(int& ref) { co_return; }
* void launch() {
* int local = 42;
* auto task = bad(local); // frame stores &local
* } // local destroyed; frame holds dangling ref
*
* // FIX -- pass by value, or ensure lifetime via shared_ptr.
*
* BUG-RISK: GCC 14 corrupts reference captures in coroutine lambdas.
* When a lambda that returns CoroTask captures by reference ([&]),
* GCC 14 may generate a corrupted coroutine frame. Always capture
* by explicit pointer-to-value instead:
*
* // BROKEN on GCC 14:
* jq.postCoroTask(t, n, [&](auto) -> CoroTask<void> { ... });
*
* // FIX -- capture pointers explicitly:
* jq.postCoroTask(t, n, [ptr = &val](auto) -> CoroTask<void> { ... });
*
* BUG-RISK: Resuming a destroyed or completed CoroTask.
* Calling handle().resume() after the coroutine has already run to
* completion (done() == true) is undefined behavior. The CoroTaskRunner
* guards against this with an XRPL_ASSERT, but standalone usage of
* CoroTask must check done() before resuming.
*
* BUG-RISK: Moving a CoroTask that is being awaited.
* If task A is co_await-ed by task B (so A.continuation_ == B), moving
* or destroying A will invalidate the continuation link. Never move
* or reassign a CoroTask while it is mid-execution or being awaited.
*
* LIMITATION: CoroTask is fire-and-forget for the top-level owner.
* There is no built-in notification when the coroutine finishes.
* The caller must use external synchronization (e.g. CoroTaskRunner::join
* or a gate/condition_variable) to know when it is done.
*
* LIMITATION: No cancellation token.
* There is no way to cancel a suspended CoroTask from outside. The
* coroutine body must cooperatively check a flag (e.g. jq_.isStopping())
* after each co_await and co_return early if needed.
*
* LIMITATION: Stackless -- cannot suspend from nested non-coroutine calls.
* If a coroutine calls a regular function that wants to "yield", it
* cannot. Only the immediate coroutine body can use co_await.
* This is acceptable for rippled because all yield() sites are shallow.
*/
template <>
class CoroTask<void>
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
/**
* Coroutine promise. Compiler uses this to manage coroutine state.
* Stores the exception (if any) and the continuation handle for
* symmetric transfer back to the awaiting coroutine.
*/
struct promise_type
{
// Captured exception from the coroutine body, rethrown in
// await_resume() when this task is co_await-ed by a caller.
std::exception_ptr exception_;
// Handle to the coroutine that is co_await-ing this task.
// Set by await_suspend(). FinalAwaiter uses it for symmetric
// transfer back to the caller. Null if this is a top-level task.
std::coroutine_handle<> continuation_;
/**
* Create the CoroTask return object.
* Called by the compiler at coroutine creation.
*/
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
/**
* Lazy start. The coroutine body does not execute until the
* handle is explicitly resumed (e.g. by CoroTaskRunner::resume).
*/
std::suspend_always
initial_suspend() noexcept
{
return {};
}
/**
* Awaiter returned by final_suspend(). Uses symmetric transfer:
* if a continuation exists, transfers control directly to it
* (tail-call, no stack growth). Otherwise returns noop_coroutine
* so the coroutine frame stays alive for the owner to destroy.
*/
struct FinalAwaiter
{
/**
* Always false. We need await_suspend to run for
* symmetric transfer.
*/
bool
await_ready() noexcept
{
return false;
}
/**
* Symmetric transfer: returns the continuation handle so
* the compiler emits a tail-call instead of a nested resume.
* If no continuation is set, returns noop_coroutine to
* suspend at final_suspend without destroying the frame.
*
* @param h Handle to this completing coroutine
*
* @return Continuation handle, or noop_coroutine
*/
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
/**
* Returns FinalAwaiter for symmetric transfer at coroutine end.
*/
FinalAwaiter
final_suspend() noexcept
{
return {};
}
/**
* Called by the compiler for `co_return;` (void coroutine).
*/
void
return_void()
{
}
/**
* Called by the compiler when an exception escapes the coroutine
* body. Captures it for later rethrowing in await_resume().
*/
void
unhandled_exception()
{
exception_ = std::current_exception();
}
};
/**
* Default constructor. Creates an empty (null handle) task.
*/
CoroTask() = default;
/**
* Takes ownership of a compiler-generated coroutine handle.
*
* @param h Coroutine handle to own
*/
explicit CoroTask(Handle h) : handle_(h)
{
}
/**
* Destroys the coroutine frame if this task owns one.
*/
~CoroTask()
{
if (handle_)
handle_.destroy();
}
/**
* Move constructor. Transfers handle ownership, leaves other empty.
*/
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
/**
* Move assignment. Destroys current frame (if any), takes other's.
*/
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
/**
* @return The underlying coroutine_handle
*/
Handle
handle() const
{
return handle_;
}
/**
* @return true if the coroutine has run to completion (or thrown)
*/
bool
done() const
{
return handle_ && handle_.done();
}
// -- Awaiter interface: allows `co_await someCoroTask;` --
/**
* Always false. This task is lazy, so co_await always suspends
* the caller to set up the continuation link.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Stores the caller's handle as our continuation, then returns
* our handle for symmetric transfer (caller suspends, we resume).
*
* @param caller Handle of the coroutine doing co_await on us
*
* @return Our handle for symmetric transfer
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_; // Symmetric transfer
}
/**
* Called when the caller resumes after co_await. Rethrows any
* exception captured by unhandled_exception().
*/
void
await_resume()
{
if (auto& ep = handle_.promise().exception_)
std::rethrow_exception(ep);
}
private:
// Exclusively-owned coroutine handle. Null after move or default
// construction. Destroyed in the destructor.
Handle handle_;
};
/**
* CoroTask<T> -- coroutine return type for value-returning coroutines.
*
* Class / Dependency Diagram
* ==========================
*
* CoroTask<T>
* +-----------------------------------------------+
* | - handle_ : Handle (coroutine_handle<promise>) |
* +-----------------------------------------------+
* | + handle(), done() |
* | + await_ready/suspend/resume (Awaiter iface) |
* +-----------------------------------------------+
* | owns
* v
* promise_type
* +-----------------------------------------------+
* | - result_ : variant<monostate, T, |
* | exception_ptr> |
* | - continuation_ : std::coroutine_handle<> |
* +-----------------------------------------------+
* | + get_return_object() -> CoroTask |
* | + initial_suspend() -> suspend_always (lazy) |
* | + final_suspend() -> FinalAwaiter |
* | + return_value(T) -> stores in result_[1] |
* | + unhandled_exception -> stores in result_[2] |
* +-----------------------------------------------+
* | returns at final_suspend
* v
* FinalAwaiter (same symmetric-transfer pattern as CoroTask<void>)
*
* Value Extraction
* ----------------
* await_resume() inspects the variant:
* - index 2 (exception_ptr) -> rethrow
* - index 1 (T) -> return value via move
*
* Usage Examples
* ==============
*
* 1. Simple value return:
*
* CoroTask<int> computeAnswer() { co_return 42; }
*
* CoroTask<void> caller() {
* int v = co_await computeAnswer(); // v == 42
* }
*
* 2. Chaining value-returning coroutines:
*
* CoroTask<int> add(int a, int b) { co_return a + b; }
* CoroTask<int> doubleSum(int a, int b) {
* int s = co_await add(a, b);
* co_return s * 2;
* }
*
* 3. Exception propagation from inner to outer:
*
* CoroTask<int> failing() {
* throw std::runtime_error("bad");
* co_return 0; // never reached
* }
* CoroTask<void> caller() {
* try {
* int v = co_await failing(); // throws here
* } catch (std::runtime_error const& e) {
* // e.what() == "bad"
* }
* }
*
* Caveats / Pitfalls (in addition to CoroTask<void> caveats above)
* ================================================================
*
* BUG-RISK: await_resume() moves the value out of the variant.
* Calling co_await on the same CoroTask<T> instance twice is undefined
* behavior -- the second call will see a moved-from T. CoroTask is
* single-shot: one co_return, one co_await.
*
* BUG-RISK: T must be move-constructible.
* return_value(T) takes by value and moves into the variant.
* Types that are not movable cannot be used as T.
*
* LIMITATION: No co_yield support.
* CoroTask<T> only supports a single co_return. It does not implement
* yield_value(), so using co_yield inside a CoroTask<T> coroutine is a
* compile error. For streaming values, a different return type
* (e.g. Generator<T>) would be needed.
*
* LIMITATION: Result is only accessible via co_await.
* There is no .get() or .result() method. The value can only be
* extracted by co_await-ing the CoroTask<T> from inside another
* coroutine. For extracting results in non-coroutine code, pass a
* pointer to the caller and write through it (as the tests do).
*/
template <typename T>
class CoroTask
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
/**
* Coroutine promise for value-returning coroutines.
* Stores the result as a variant: monostate (not yet set),
* T (co_return value), or exception_ptr (unhandled exception).
*/
struct promise_type
{
// Tri-state result:
// index 0 (monostate) -- coroutine has not yet completed
// index 1 (T) -- co_return value stored here
// index 2 (exception) -- unhandled exception captured here
std::variant<std::monostate, T, std::exception_ptr> result_;
// Handle to the coroutine co_await-ing this task. Used by
// FinalAwaiter for symmetric transfer. Null for top-level tasks.
std::coroutine_handle<> continuation_;
/**
* Create the CoroTask return object.
* Called by the compiler at coroutine creation.
*/
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
/**
* Lazy start. Coroutine body does not run until explicitly resumed.
*/
std::suspend_always
initial_suspend() noexcept
{
return {};
}
/**
* Symmetric-transfer awaiter at coroutine completion.
* Same pattern as CoroTask<void>::FinalAwaiter.
*/
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
/**
* Returns continuation for symmetric transfer, or
* noop_coroutine if this is a top-level task.
*
* @param h Handle to this completing coroutine
*
* @return Continuation handle, or noop_coroutine
*/
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
/**
* Called by the compiler for `co_return value;`.
* Moves the value into result_ at index 1.
*
* @param value The value to store
*/
void
return_value(T value)
{
result_.template emplace<1>(std::move(value));
}
/**
* Captures unhandled exceptions at index 2 of result_.
* Rethrown later in await_resume().
*/
void
unhandled_exception()
{
result_.template emplace<2>(std::current_exception());
}
};
/**
* Default constructor. Creates an empty (null handle) task.
*/
CoroTask() = default;
/**
* Takes ownership of a compiler-generated coroutine handle.
*
* @param h Coroutine handle to own
*/
explicit CoroTask(Handle h) : handle_(h)
{
}
/**
* Destroys the coroutine frame if this task owns one.
*/
~CoroTask()
{
if (handle_)
handle_.destroy();
}
/**
* Move constructor. Transfers handle ownership, leaves other empty.
*/
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
/**
* Move assignment. Destroys current frame (if any), takes other's.
*/
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
/**
* @return The underlying coroutine_handle
*/
Handle
handle() const
{
return handle_;
}
/**
* @return true if the coroutine has run to completion (or thrown)
*/
bool
done() const
{
return handle_ && handle_.done();
}
// -- Awaiter interface: allows `T val = co_await someCoroTask;` --
/**
* Always false. co_await always suspends to set up continuation.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Stores caller as continuation, returns our handle for
* symmetric transfer.
*
* @param caller Handle of the coroutine doing co_await on us
*
* @return Our handle for symmetric transfer
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_;
}
/**
* Extracts the result: rethrows if exception, otherwise moves
* the T value out of the variant. Single-shot: calling twice
* on the same task is undefined (moved-from T).
*
* @return The co_return-ed value
*/
T
await_resume()
{
auto& result = handle_.promise().result_;
if (auto* ep = std::get_if<2>(&result))
std::rethrow_exception(*ep);
return std::get<1>(std::move(result));
}
private:
// Exclusively-owned coroutine handle. Null after move or default
// construction. Destroyed in the destructor.
Handle handle_;
};
} // namespace xrpl

View File

@@ -0,0 +1,318 @@
#pragma once
/**
* @file CoroTaskRunner.ipp
*
* CoroTaskRunner inline implementation.
*
* This file contains the business logic for managing C++20 coroutines
* on the JobQueue. It is included at the bottom of JobQueue.h.
*
* Data Flow: suspend / post / resume cycle
* =========================================
*
* coroutine body CoroTaskRunner JobQueue
* -------------- -------------- --------
* |
* co_await runner->suspend()
* |
* +--- await_suspend ------> onSuspend()
* | ++nSuspend_ ------------> nSuspend_
* | [coroutine is now suspended]
* |
* . (externally or by JobQueueAwaiter)
* .
* +--- (caller calls) -----> post()
* | running_ = true
* | addJob(resume) ----------> job enqueued
* | |
* | [worker picks up]
* | |
* +--- <----- resume() <-----------------------------------+
* | --nSuspend_ ------> nSuspend_
* | swap in LocalValues (lvs_)
* | task_.handle().resume()
* | |
* | [coroutine body continues here]
* | |
* | swap out LocalValues
* | running_ = false
* | cv_.notify_all()
* v
*
* Thread Safety
* =============
* - mutex_ : guards task_.handle().resume() so that post()-before-suspend
* races cannot resume the coroutine while it is still running.
* (See the race condition discussion in JobQueue.h)
* - mutex_run_ : guards running_ flag; used by join() to wait for completion.
* - jq_.m_mutex: guards nSuspend_ increments/decrements.
*
* Common Mistakes When Modifying This File
* =========================================
*
* 1. Changing lock ordering.
* resume() acquires locks in this order: mutex_run_ -> jq_.m_mutex -> mutex_.
* Acquiring them in a different order WILL deadlock. Any new code path
* that touches these mutexes must follow the same order.
*
* 2. Removing the shared_from_this() capture in post().
* The lambda passed to addJob captures [this, sp = shared_from_this()].
* If you remove sp, 'this' can be destroyed before the job runs,
* causing use-after-free. The sp capture is load-bearing.
*
* 3. Forgetting to decrement nSuspend_ on a new code path.
* Every ++nSuspend_ must have a matching --nSuspend_. If you add a new
* suspension path (e.g. a new awaiter) and forget to decrement on resume
* or on failure, JobQueue::stop() will hang.
*
* 4. Calling task_.handle().resume() without holding mutex_.
* This allows a race where the coroutine runs on two threads
* simultaneously. Always hold mutex_ around resume().
*
* 5. Swapping LocalValues outside of the mutex_ critical section.
* The swap-in and swap-out of LocalValues must bracket the resume()
* call. If you move the swap-out before the lock_guard(mutex_) is
* released, you break LocalValue isolation for any code that runs
* after the coroutine suspends but before the lock is dropped.
*/
//
namespace xrpl {
/**
* Construct a CoroTaskRunner. Sets running_ to false; does not
* create the coroutine. Call init() afterwards.
*
* @param jq The JobQueue this coroutine will run on
* @param type Job type for scheduling priority
* @param name Human-readable name for logging
*/
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
create_t,
JobQueue& jq,
JobType type,
std::string const& name)
: jq_(jq), type_(type), name_(name), running_(false)
{
}
/**
* Initialize with a coroutine-returning callable.
* Stores the callable on the heap (FuncStore) so it outlives the
* coroutine frame. Coroutine frames store a reference to the
* callable's implicit object parameter (the lambda). If the callable
* is a temporary, that reference dangles after the caller returns.
* Keeping the callable alive here ensures the coroutine's captures
* remain valid.
*
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
*/
template <class F>
void
JobQueue::CoroTaskRunner::init(F&& f)
{
using Fn = std::decay_t<F>;
auto store = std::make_unique<FuncStore<Fn>>(std::forward<F>(f));
task_ = store->func(shared_from_this());
storedFunc_ = std::move(store);
}
/**
* Destructor. Asserts (debug) that the coroutine has finished
* or expectEarlyExit() was called.
*/
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
{
#ifndef NDEBUG
XRPL_ASSERT(finished_, "xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
#endif
}
/**
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
*/
inline void
JobQueue::CoroTaskRunner::onSuspend()
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
/**
* Decrement nSuspend_ without resuming.
*/
inline void
JobQueue::CoroTaskRunner::onUndoSuspend()
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
/**
* Return a SuspendAwaiter whose await_suspend() increments nSuspend_
* before the coroutine actually suspends. The caller must later call
* post() or resume() to continue execution.
*
* @return Awaiter for use with `co_await runner->suspend()`
*/
inline auto
JobQueue::CoroTaskRunner::suspend()
{
/**
* Custom awaiter for suspend(). Always suspends (await_ready
* returns false) and increments nSuspend_ in await_suspend().
*/
struct SuspendAwaiter
{
CoroTaskRunner& runner_; // The runner that owns this coroutine.
/**
* Always returns false so the coroutine suspends.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Called when the coroutine suspends. Increments nSuspend_
* so the JobQueue knows a coroutine is waiting.
*/
void
await_suspend(std::coroutine_handle<>) const
{
runner_.onSuspend();
}
void
await_resume() const noexcept
{
}
};
return SuspendAwaiter{*this};
}
/**
* Schedule coroutine resumption as a job on the JobQueue.
* A shared_ptr capture (sp) prevents this CoroTaskRunner from being
* destroyed while the job is queued but not yet executed.
*
* @return false if the JobQueue rejected the job (shutting down)
*/
inline bool
JobQueue::CoroTaskRunner::post()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// sp prevents 'this' from being destroyed while the job is pending
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
/**
* Resume the coroutine on the current thread.
*
* Steps:
* 1. Set running_ = true (under mutex_run_)
* 2. Decrement nSuspend_ (under jq_.m_mutex)
* 3. Swap in this coroutine's LocalValues for thread-local isolation
* 4. Resume the coroutine handle (under mutex_)
* 5. Swap out LocalValues, restoring the thread's previous state
* 6. Set running_ = false and notify join() waiters
*/
inline void
JobQueue::CoroTaskRunner::resume()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(!task_.done(), "xrpl::JobQueue::CoroTaskRunner::resume : task is not done");
task_.handle().resume();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
if (task_.done())
{
#ifndef NDEBUG
finished_ = true;
#endif
// Destroy the coroutine frame to break the shared_ptr cycle:
// frame -> lambda captures shared_ptr<CoroTaskRunner> -> this.
// Also release the heap-stored callable (no longer needed).
task_ = {};
storedFunc_.reset();
}
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
/**
* @return true if the coroutine has not yet run to completion
*/
inline bool
JobQueue::CoroTaskRunner::runnable() const
{
// After normal completion, task_ is reset to break the shared_ptr cycle
// (handle_ becomes null). A null handle means the coroutine is done.
return task_.handle() && !task_.done();
}
/**
* Handle early termination when the coroutine never ran (e.g. JobQueue
* is stopping). Decrements nSuspend_ and destroys the coroutine frame
* to break the shared_ptr cycle: frame -> lambda -> runner -> frame.
*/
inline void
JobQueue::CoroTaskRunner::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
// Destroy the coroutine frame to break a potential shared_ptr cycle.
// The coroutine is at initial_suspend and never ran user code, so
// destroying it is safe. Without this, the frame holds a shared_ptr
// back to this CoroTaskRunner, creating an unreachable reference cycle.
task_ = {};
storedFunc_.reset();
}
/**
* Block until the coroutine finishes its current execution slice.
* Uses cv_ + mutex_run_ to wait until running_ == false.
*/
inline void
JobQueue::CoroTaskRunner::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
}
} // namespace xrpl

View File

@@ -2,13 +2,13 @@
#include <xrpl/basics/LocalValue.h>
#include <xrpl/core/ClosureCounter.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobTypeData.h>
#include <xrpl/core/JobTypes.h>
#include <xrpl/core/detail/Workers.h>
#include <xrpl/json/json_value.h>
#include <boost/coroutine/all.hpp>
#include <coroutine>
#include <set>
namespace xrpl {
@@ -18,10 +18,6 @@ class PerfLog;
}
class Logs;
struct Coro_create_t
{
explicit Coro_create_t() = default;
};
/** A pool of threads to perform work.
@@ -36,85 +32,382 @@ struct Coro_create_t
class JobQueue : private Workers::Callback
{
public:
/** Coroutines must run to completion. */
class Coro : public std::enable_shared_from_this<Coro>
/** C++20 coroutine lifecycle manager.
*
* Class / Inheritance / Dependency Diagram
* =========================================
*
* std::enable_shared_from_this<CoroTaskRunner>
* ^
* | (public inheritance)
* |
* CoroTaskRunner
* +---------------------------------------------------+
* | - lvs_ : detail::LocalValues |
* | - jq_ : JobQueue& |
* | - type_ : JobType |
* | - name_ : std::string |
* | - running_ : bool |
* | - mutex_ : std::mutex (coroutine guard) |
* | - mutex_run_ : std::mutex (join guard) |
* | - cv_ : condition_variable |
* | - task_ : CoroTask<void> |
* | - storedFunc_ : unique_ptr<FuncBase> (type-erased)|
* +---------------------------------------------------+
* | + init(F&&) : set up coroutine callable |
* | + onSuspend() : ++jq_.nSuspend_ |
* | + onUndoSuspend() : --jq_.nSuspend_ |
* | + suspend() : returns SuspendAwaiter |
* | + post() : schedule resume on JobQueue |
* | + resume() : resume coroutine on caller |
* | + runnable() : !task_.done() |
* | + expectEarlyExit() : teardown for failed post |
* | + join() : block until not running |
* +---------------------------------------------------+
* | |
* | owns | references
* v v
* CoroTask<void> JobQueue
* (coroutine frame) (thread pool + nSuspend_)
*
* FuncBase / FuncStore<F> (type-erased heap storage
* for the coroutine lambda)
*
* Coroutine Lifecycle (Control Flow)
* ===================================
*
* Caller thread JobQueue worker thread
* ------------- ----------------------
* postCoroTask(f)
* |
* +-- make_shared<CoroTaskRunner>
* +-- init(f)
* | +-- store lambda on heap (FuncStore)
* | +-- task_ = f(shared_from_this())
* | [coroutine created, suspended at initial_suspend]
* +-- ++nSuspend_ (lazy start counts as suspended)
* +-- post()
* | +-- addJob(type_, [resume]{})
* | resume()
* | |
* | +-- running_ = true
* | +-- --nSuspend_
* | +-- swap in LocalValues
* | +-- task_.handle().resume()
* | | [coroutine body runs]
* | | ...
* | | co_await suspend()
* | | +-- ++nSuspend_
* | | [coroutine suspends]
* | +-- swap out LocalValues
* | +-- running_ = false
* | +-- cv_.notify_all()
* |
* post() <-- called externally or by JobQueueAwaiter
* +-- addJob(type_, [resume]{})
* resume()
* |
* +-- [coroutine body continues]
* +-- co_return
* +-- running_ = false
* +-- cv_.notify_all()
* join()
* +-- cv_.wait([]{!running_})
* +-- [done]
*
* Usage Examples
* ==============
*
* 1. Fire-and-forget coroutine (most common pattern):
*
* jq.postCoroTask(jtCLIENT, "MyWork",
* [](auto runner) -> CoroTask<void> {
* doSomeWork();
* co_await runner->suspend(); // yield to other jobs
* doMoreWork();
* co_return;
* });
*
* 2. Manually controlling suspend / resume (external trigger):
*
* auto runner = jq.postCoroTask(jtCLIENT, "ExtTrigger",
* [&result](auto runner) -> CoroTask<void> {
* startAsyncOperation(callback);
* co_await runner->suspend();
* // callback called runner->post() to get here
* result = collectResult();
* co_return;
* });
* // ... later, from the callback:
* runner->post(); // reschedule the coroutine on the JobQueue
*
* 3. Using JobQueueAwaiter for automatic suspend + repost:
*
* jq.postCoroTask(jtCLIENT, "AutoRepost",
* [](auto runner) -> CoroTask<void> {
* step1();
* co_await JobQueueAwaiter{runner}; // yield + auto-repost
* step2();
* co_await JobQueueAwaiter{runner};
* step3();
* co_return;
* });
*
* 4. Checking shutdown after co_await (cooperative cancellation):
*
* jq.postCoroTask(jtCLIENT, "Cancellable",
* [&jq](auto runner) -> CoroTask<void> {
* while (moreWork()) {
* co_await JobQueueAwaiter{runner};
* if (jq.isStopping())
* co_return; // bail out cleanly
* processNextItem();
* }
* co_return;
* });
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Calling suspend() without a matching post()/resume().
* After co_await runner->suspend(), the coroutine is parked and
* nSuspend_ is incremented. If nothing ever calls post() or
* resume(), the coroutine is leaked and JobQueue::stop() will
* hang forever waiting for nSuspend_ to reach zero.
*
* BUG-RISK: Calling post() on an already-running coroutine.
* post() schedules a resume() job. If the coroutine has not
* actually suspended yet (no co_await executed), the resume job
* will try to call handle().resume() while the coroutine is still
* running on another thread. This is UB. The mutex_ prevents
* data corruption but the logic is wrong — always co_await
* suspend() before calling post(). (The test testIncorrectOrder
* shows this works only because mutex_ serializes the calls.)
*
* BUG-RISK: Dropping the shared_ptr<CoroTaskRunner> before join().
* The CoroTaskRunner destructor asserts (!finished_ is false).
* If you let the last shared_ptr die while the coroutine is still
* running or suspended, you get an assertion failure in debug and
* UB in release. Always call join() or expectEarlyExit() first.
*
* BUG-RISK: Lambda captures outliving the coroutine frame.
* The lambda passed to postCoroTask is heap-allocated (FuncStore)
* to prevent dangling. But objects captured by pointer still need
* their own lifetime management. If you capture a raw pointer to
* a stack variable, and the stack frame exits before the coroutine
* finishes, the pointer dangles. Use shared_ptr or ensure the
* pointed-to object outlives the coroutine.
*
* BUG-RISK: Forgetting co_return in a void coroutine.
* If the coroutine body falls off the end without co_return,
* the compiler may silently treat it as co_return (per standard),
* but some compilers warn. Always write explicit co_return.
*
* LIMITATION: CoroTaskRunner only supports CoroTask<void>.
* The task_ member is CoroTask<void>. To return values from
* the top-level coroutine, write through a captured pointer
* (as the tests demonstrate), or co_await inner CoroTask<T>
* coroutines that return values.
*
* LIMITATION: One coroutine per CoroTaskRunner.
* init() must be called exactly once. You cannot reuse a
* CoroTaskRunner to run a second coroutine. Create a new one
* via postCoroTask() instead.
*
* LIMITATION: No timeout on join().
* join() blocks indefinitely. If the coroutine is suspended
* and never posted, join() will deadlock. Use timed waits
* on the gate pattern (condition_variable + wait_for) in tests.
*/
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
{
private:
// Per-coroutine thread-local storage. Swapped in before resume()
// and swapped out after, so each coroutine sees its own LocalValue
// state regardless of which worker thread executes it.
detail::LocalValues lvs_;
// Back-reference to the owning JobQueue. Used to post jobs,
// increment/decrement nSuspend_, and acquire jq_.m_mutex.
JobQueue& jq_;
// Job type passed to addJob() when posting this coroutine.
JobType type_;
// Human-readable name for this coroutine job (for logging).
std::string name_;
// True while the coroutine is actively executing on a thread.
// Guarded by mutex_run_. join() blocks until this is false.
bool running_;
// Guards task_.handle().resume() to prevent the coroutine from
// running on two threads simultaneously. Handles the race where
// post() enqueues a resume before the coroutine has actually
// suspended (post-before-suspend pattern).
std::mutex mutex_;
// Guards running_ flag. Used with cv_ for join() to wait
// until the coroutine finishes its current execution slice.
std::mutex mutex_run_;
// Notified when running_ transitions to false, allowing
// join() waiters to wake up.
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
// The coroutine handle wrapper. Owns the coroutine frame.
// Set by init(), reset to empty by expectEarlyExit() on
// early termination.
CoroTask<void> task_;
/**
* Type-erased base for heap-stored callables.
* Prevents the coroutine lambda from being destroyed before
* the coroutine frame is done with it.
*
* @see FuncStore
*/
struct FuncBase
{
virtual ~FuncBase() = default;
};
/**
* Concrete type-erased storage for a callable of type F.
* The coroutine frame stores a reference to the lambda's implicit
* object parameter. If the lambda is a temporary, that reference
* dangles after the call returns. FuncStore keeps it alive on
* the heap for the lifetime of the CoroTaskRunner.
*/
template <class F>
struct FuncStore : FuncBase
{
F func; // The stored callable (coroutine lambda).
explicit FuncStore(F&& f) : func(std::move(f))
{
}
};
// Heap-allocated callable storage. Set by init(), ensures the
// lambda outlives the coroutine frame that references it.
std::unique_ptr<FuncBase> storedFunc_;
#ifndef NDEBUG
// Debug-only flag. True once the coroutine has completed or
// expectEarlyExit() was called. Asserted in the destructor
// to catch leaked runners.
bool finished_ = false;
#endif
public:
// Private: Used in the implementation
/**
* Tag type for private construction. Prevents external code
* from constructing CoroTaskRunner directly. Use postCoroTask().
*/
struct create_t
{
explicit create_t() = default;
};
/**
* Construct a CoroTaskRunner. Private by convention (create_t tag).
*
* @param jq The JobQueue this coroutine will run on
* @param type Job type for scheduling priority
* @param name Human-readable name for logging
*/
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
CoroTaskRunner(CoroTaskRunner const&) = delete;
CoroTaskRunner&
operator=(CoroTaskRunner const&) = delete;
/**
* Destructor. Asserts (debug) that the coroutine has finished
* or expectEarlyExit() was called.
*/
~CoroTaskRunner();
/**
* Initialize with a coroutine-returning callable.
* Must be called exactly once, after the object is managed by
* shared_ptr (because init uses shared_from_this internally).
* This is handled automatically by postCoroTask().
*
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
*/
template <class F>
Coro(Coro_create_t, JobQueue&, JobType, std::string const&, F&&);
// Not copy-constructible or assignable
Coro(Coro const&) = delete;
Coro&
operator=(Coro const&) = delete;
~Coro();
/** Suspend coroutine execution.
Effects:
The coroutine's stack is saved.
The associated Job thread is released.
Note:
The associated Job function returns.
Undefined behavior if called consecutively without a corresponding
post.
*/
void
yield() const;
init(F&& f);
/** Schedule coroutine execution.
Effects:
Returns immediately.
A new job is scheduled to resume the execution of the coroutine.
When the job runs, the coroutine's stack is restored and execution
continues at the beginning of coroutine function or the
statement after the previous call to yield. Undefined behavior if
called after the coroutine has completed with a return (as opposed to
a yield()). Undefined behavior if post() or resume() called
consecutively without a corresponding yield.
/**
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
* Called when the coroutine is about to suspend. Every call
* must be balanced by a corresponding decrement (via resume()
* or onUndoSuspend()), or JobQueue::stop() will hang.
*/
void
onSuspend();
@return true if the Coro's job is added to the JobQueue.
*/
/**
* Decrement nSuspend_ without resuming.
* Used to undo onSuspend() when a scheduled post() fails
* (e.g. JobQueue is stopping).
*/
void
onUndoSuspend();
/**
* Suspend the coroutine.
* The awaiter's await_suspend() increments nSuspend_ before the
* coroutine actually suspends. The caller must later call post()
* or resume() to continue execution.
*
* @return An awaiter for use with `co_await runner->suspend()`
*/
auto
suspend();
/**
* Schedule coroutine resumption as a job on the JobQueue.
* Captures shared_from_this() to prevent this runner from being
* destroyed while the job is queued.
*
* @return true if the job was accepted; false if the JobQueue
* is stopping (caller must handle cleanup)
*/
bool
post();
/** Resume coroutine execution.
Effects:
The coroutine continues execution from where it last left off
using this same thread.
Undefined behavior if called after the coroutine has completed
with a return (as opposed to a yield()).
Undefined behavior if resume() or post() called consecutively
without a corresponding yield.
*/
/**
* Resume the coroutine on the current thread.
* Decrements nSuspend_, swaps in LocalValues, resumes the
* coroutine handle, swaps out LocalValues, and notifies join()
* waiters. Lock ordering: mutex_run_ -> jq_.m_mutex -> mutex_.
*/
void
resume();
/** Returns true if the Coro is still runnable (has not returned). */
/**
* @return true if the coroutine has not yet run to completion
*/
bool
runnable() const;
/** Once called, the Coro allows early exit without an assert. */
/**
* Handle early termination when the coroutine never ran.
* Decrements nSuspend_ and destroys the coroutine frame to
* break the shared_ptr cycle (frame -> lambda -> runner -> frame).
* Called by postCoroTask() when post() fails.
*/
void
expectEarlyExit();
/** Waits until coroutine returns from the user function. */
/**
* Block until the coroutine finishes its current execution slice.
* Uses cv_ + mutex_run_ to wait until running_ == false.
* Warning: deadlocks if the coroutine is suspended and never posted.
*/
void
join();
};
@@ -152,18 +445,18 @@ public:
return false;
}
/** Creates a coroutine and adds a job to the queue which will run it.
/** Creates a C++20 coroutine and adds a job to the queue to run it.
@param t The type of job.
@param name Name of the job.
@param f Has a signature of void(std::shared_ptr<Coro>). Called when the
job executes.
@param f Callable with signature
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
@return shared_ptr to posted Coro. nullptr if post was not successful.
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
*/
template <class F>
std::shared_ptr<Coro>
postCoro(JobType t, std::string const& name, F&& f);
std::shared_ptr<CoroTaskRunner>
postCoroTask(JobType t, std::string const& name, F&& f);
/** Jobs waiting at this priority.
*/
@@ -217,8 +510,6 @@ public:
isStopped() const;
private:
friend class Coro;
using JobDataMap = std::map<JobType, JobTypeData>;
beast::Journal m_journal;
@@ -319,86 +610,59 @@ private:
getJobLimit(JobType type);
};
/*
An RPC command is received and is handled via ServerHandler(HTTP) or
Handler(websocket), depending on the connection type. The handler then calls
the JobQueue::postCoro() method to create a coroutine and run it at a later
point. This frees up the handler thread and allows it to continue handling
other requests while the RPC command completes its work asynchronously.
postCoro() creates a Coro object. When the Coro ctor is called, and its
coro_ member is initialized (a boost::coroutines::pull_type), execution
automatically passes to the coroutine, which we don't want at this point,
since we are still in the handler thread context. It's important to note
here that construction of a boost pull_type automatically passes execution to
the coroutine. A pull_type object automatically generates a push_type that is
passed as a parameter (do_yield) in the signature of the function the
pull_type was created with. This function is immediately called during coro_
construction and within it, Coro::yield_ is assigned the push_type
parameter (do_yield) address and called (yield()) so we can return execution
back to the caller's stack.
postCoro() then calls Coro::post(), which schedules a job on the job
queue to continue execution of the coroutine in a JobQueue worker thread at
some later time. When the job runs, we lock on the Coro::mutex_ and call
coro_ which continues where we had left off. Since we the last thing we did
in coro_ was call yield(), the next thing we continue with is calling the
function param f, that was passed into Coro ctor. It is within this
function body that the caller specifies what he would like to do while
running in the coroutine and allow them to suspend and resume execution.
A task that relies on other events to complete, such as path finding, calls
Coro::yield() to suspend its execution while waiting on those events to
complete and continue when signaled via the Coro::post() method.
There is a potential race condition that exists here where post() can get
called before yield() after f is called. Technically the problem only occurs
if the job that post() scheduled is executed before yield() is called.
If the post() job were to be executed before yield(), undefined behavior
would occur. The lock ensures that coro_ is not called again until we exit
the coroutine. At which point a scheduled resume() job waiting on the lock
would gain entry, harmlessly call coro_ and immediately return as we have
already completed the coroutine.
The race condition occurs as follows:
1- The coroutine is running.
2- The coroutine is about to suspend, but before it can do so, it must
arrange for some event to wake it up.
3- The coroutine arranges for some event to wake it up.
4- Before the coroutine can suspend, that event occurs and the
resumption of the coroutine is scheduled on the job queue. 5- Again, before
the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
Again, before the coroutine can suspend, the resumption code runs the
coroutine.
The coroutine is now running in two threads.
The lock prevents this from happening as step 6 will block until the
lock is released which only happens after the coroutine completes.
*/
} // namespace xrpl
#include <xrpl/core/Coro.ipp>
#include <xrpl/core/CoroTaskRunner.ipp>
namespace xrpl {
// postCoroTask — entry point for launching a C++20 coroutine on the JobQueue.
//
// Control Flow
// ============
//
// postCoroTask(t, name, f)
// |
// +-- 1. Create CoroTaskRunner (shared_ptr, ref-counted)
// |
// +-- 2. runner->init(f)
// | +-- Heap-allocate the lambda (FuncStore) to prevent
// | | dangling captures in the coroutine frame
// | +-- task_ = f(shared_from_this())
// | [coroutine created but NOT started — lazy initial_suspend]
// |
// +-- 3. ++nSuspend_
// | The coroutine is "suspended" from the JobQueue's perspective
// | even though it hasn't run yet — this keeps the JQ shutdown
// | logic correct (it waits for nSuspend_ to reach 0).
// |
// +-- 4. runner->post()
// | +-- success: job queued, worker will call resume()
// | | return runner to caller
// | +-- failure: JQ is stopping
// | +-- runner->expectEarlyExit()
// | | --nSuspend_, destroy coroutine frame
// | +-- return nullptr
//
template <class F>
std::shared_ptr<JobQueue::Coro>
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
std::shared_ptr<JobQueue::CoroTaskRunner>
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
{
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<Coro>).
*/
auto coro = std::make_shared<Coro>(Coro_create_t{}, *this, t, name, std::forward<F>(f));
if (!coro->post())
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
runner->init(std::forward<F>(f));
// Account for the initial suspension (CoroTask uses lazy start).
{
// The Coro was not successfully posted. Disable it so it's destructor
// can run with no negative side effects. Then destroy it.
coro->expectEarlyExit();
coro.reset();
std::lock_guard lock(m_mutex);
++nSuspend_;
}
return coro;
if (!runner->post())
{
runner->expectEarlyExit();
runner.reset();
}
return runner;
}
} // namespace xrpl

View File

@@ -0,0 +1,174 @@
#pragma once
#include <xrpl/core/JobQueue.h>
#include <coroutine>
#include <memory>
namespace xrpl {
/**
* Awaiter that suspends and immediately reschedules on the JobQueue.
* Equivalent to calling yield() followed by post() in the old Coro API.
*
* Usage:
* co_await JobQueueAwaiter{runner};
*
* What it waits for: The coroutine is re-queued as a job and resumes
* when a worker thread picks it up.
*
* Which thread resumes: A JobQueue worker thread.
*
* What await_resume() returns: void.
*
* Dependency Diagram
* ==================
*
* JobQueueAwaiter
* +----------------------------------------------+
* | + runner : shared_ptr<CoroTaskRunner> |
* +----------------------------------------------+
* | + await_ready() -> false (always suspend) |
* | + await_suspend() -> bool (suspend or cancel) |
* | + await_resume() -> void |
* +----------------------------------------------+
* | |
* | uses | uses
* v v
* CoroTaskRunner JobQueue
* .onSuspend() (via runner->post() -> addJob)
* .onUndoSuspend()
* .post()
*
* Control Flow (await_suspend)
* ============================
*
* co_await JobQueueAwaiter{runner}
* |
* +-- await_ready() -> false
* +-- await_suspend(handle)
* |
* +-- runner->onSuspend() // ++nSuspend_
* +-- runner->post() // addJob to JobQueue
* | |
* | +-- success? return true // coroutine stays suspended
* | | // worker thread will call resume()
* | +-- failure? (JQ stopping)
* | +-- runner->onUndoSuspend() // --nSuspend_
* | +-- return false // coroutine continues immediately
* | // so it can clean up and co_return
*
* Usage Examples
* ==============
*
* 1. Yield and auto-repost (most common -- replaces yield() + post()):
*
* CoroTask<void> handler(auto runner) {
* doPartA();
* co_await JobQueueAwaiter{runner}; // yield + repost
* doPartB(); // runs on a worker thread
* co_return;
* }
*
* 2. Multiple yield points in a loop:
*
* CoroTask<void> batchProcessor(auto runner) {
* for (auto& item : items) {
* process(item);
* co_await JobQueueAwaiter{runner}; // let other jobs run
* }
* co_return;
* }
*
* 3. Graceful shutdown -- checking after resume:
*
* CoroTask<void> longTask(auto runner, JobQueue& jq) {
* while (hasWork()) {
* co_await JobQueueAwaiter{runner};
* // If JQ is stopping, await_suspend returns false and
* // the coroutine continues immediately without re-queuing.
* // Always check isStopping() to decide whether to proceed:
* if (jq.isStopping())
* co_return;
* doNextChunk();
* }
* co_return;
* }
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Using a stale or null runner.
* The runner shared_ptr must be valid and point to the CoroTaskRunner
* that owns the coroutine currently executing. Passing a runner from
* a different coroutine, or a default-constructed shared_ptr, is UB.
*
* BUG-RISK: Assuming resume happens on the same thread.
* After co_await JobQueueAwaiter, the coroutine resumes on whatever
* worker thread picks up the job. Do not rely on thread-local state
* unless it is managed through LocalValue (which CoroTaskRunner
* automatically swaps in/out).
*
* BUG-RISK: Ignoring the shutdown path.
* When the JobQueue is stopping, post() fails and await_suspend()
* returns false (coroutine does NOT actually suspend). The coroutine
* body continues immediately on the same thread. If your code after
* co_await assumes it was re-queued and is running on a worker thread,
* that assumption breaks during shutdown. Always handle the "JQ is
* stopping" case, either by checking jq.isStopping() or by letting
* the coroutine fall through to co_return naturally.
*
* DIFFERENCE from runner->suspend() + runner->post():
* JobQueueAwaiter combines both in one atomic operation. With the
* manual suspend()/post() pattern, there is a window between the
* two calls where an external event could race. JobQueueAwaiter
* removes that window -- onSuspend() and post() happen within the
* same await_suspend() call while the coroutine is guaranteed to
* be suspended. Prefer JobQueueAwaiter unless you need an external
* party to decide *when* to call post().
*/
struct JobQueueAwaiter
{
// The CoroTaskRunner that owns the currently executing coroutine.
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
/**
* Always returns false so the coroutine suspends.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Increment nSuspend (equivalent to yield()) and schedule resume
* on the JobQueue (equivalent to post()). If the JobQueue is
* stopping, undoes the suspend count and returns false so the
* coroutine continues immediately and can clean up.
*
* @return true if coroutine should stay suspended (job posted);
* false if coroutine should continue (JQ stopping)
*/
bool
await_suspend(std::coroutine_handle<>)
{
runner->onSuspend();
if (!runner->post())
{
// JobQueue is stopping. Undo the suspend count and
// don't actually suspend — the coroutine continues
// immediately so it can clean up and co_return.
runner->onUndoSuspend();
return false;
}
return true;
}
void
await_resume() const noexcept
{
}
};
} // namespace xrpl

View File

@@ -1,19 +1,20 @@
#pragma once
#include <boost/beast/core/string.hpp>
#include <functional>
#include <string>
#include <string_view>
namespace Json {
class Value;
using Output = std::function<void(std::string_view const&)>;
using Output = std::function<void(boost::beast::string_view const&)>;
inline Output
stringOutput(std::string& s)
{
return [&](std::string_view const& b) { s.append(b.data(), b.size()); };
return [&](boost::beast::string_view const& b) { s.append(b.data(), b.size()); };
}
/** Writes a minimal representation of a Json value to an Output in O(n) time.

View File

@@ -17,7 +17,6 @@
#include <functional>
#include <list>
#include <string_view>
namespace xrpl {
@@ -49,7 +48,8 @@ private:
bool ping_active_ = false;
boost::beast::websocket::ping_data payload_;
error_code ec_;
std::function<void(boost::beast::websocket::frame_type, std::string_view)> control_callback_;
std::function<void(boost::beast::websocket::frame_type, boost::beast::string_view)>
control_callback_;
public:
template <class Body, class Headers>
@@ -137,7 +137,7 @@ protected:
on_ping(error_code const& ec);
void
on_ping_pong(boost::beast::websocket::frame_type kind, std::string_view payload);
on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload);
void
on_timer(error_code ec);
@@ -414,11 +414,11 @@ template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_ping_pong(
boost::beast::websocket::frame_type kind,
std::string_view payload)
boost::beast::string_view payload)
{
if (kind == boost::beast::websocket::frame_type::pong)
{
std::string_view p(payload_.begin(), payload_.size());
boost::beast::string_view p(payload_.begin());
if (payload == p)
{
close_on_timer_ = false;

View File

@@ -8,7 +8,6 @@
#include <set>
#include <stack>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
@@ -88,14 +87,14 @@ public:
}
void
output(std::string_view const& bytes)
output(boost::beast::string_view const& bytes)
{
markStarted();
output_(bytes);
}
void
stringOutput(std::string_view const& bytes)
stringOutput(boost::beast::string_view const& bytes)
{
markStarted();
std::size_t position = 0, writtenUntil = 0;

View File

@@ -8,6 +8,7 @@
#include <xrpld/rpc/detail/Tuning.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/protocol/ApiVersion.h>
@@ -131,7 +132,6 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -155,11 +155,11 @@ public:
Json::Value result;
gate g;
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;
@@ -240,28 +240,27 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
Json::Value result;
gate g;
// Test RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur + 1);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));
@@ -269,22 +268,22 @@ public:
// Test RPC::Tuning::max_auto_src_cur source currencies.
for (auto i = 0; i < (RPC::Tuning::max_auto_src_cur - 1); ++i)
env.trust(Account("alice")[std::to_string(i + 100)](100), "bob");
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_auto_src_cur source currencies.
env.trust(Account("alice")["AUD"](100), "bob");
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));

View File

@@ -5340,20 +5340,20 @@ class Vault_test : public beast::unit_test::suite
env.close();
// 2. Mantissa larger than uint64 max
env.set_parse_failure_expected(true);
try
{
tx[sfAssetsMaximum] = "18446744073709551617e5"; // uint64 max + 1
env(tx, THISLINE);
BEAST_EXPECT(false);
BEAST_EXPECTS(false, "Expected parse_error for mantissa larger than uint64 max");
}
catch (parse_error const& e)
{
using namespace std::string_literals;
BEAST_EXPECT(
e.what() ==
"invalidParamsField 'tx_json.AssetsMaximum' has invalid "
"data."s);
e.what() == "invalidParamsField 'tx_json.AssetsMaximum' has invalid data."s);
}
env.set_parse_failure_expected(false);
}
}

View File

@@ -0,0 +1,537 @@
#include <test/jtx.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/JobQueueAwaiter.h>
#include <chrono>
#include <mutex>
namespace xrpl {
namespace test {
/**
* Test suite for the C++20 coroutine primitives: CoroTask, CoroTaskRunner,
* and JobQueueAwaiter.
*
* Dependency Diagram
* ==================
*
* CoroTask_test
* +-------------------------------------------------+
* | + gate (inner class) : condition_variable helper |
* +-------------------------------------------------+
* | uses
* v
* jtx::Env --> JobQueue::postCoroTask()
* |
* +-- CoroTaskRunner (suspend / post / resume)
* +-- CoroTask<void> / CoroTask<T>
* +-- JobQueueAwaiter
*
* Test Coverage Matrix
* ====================
*
* Test | Primitives exercised
* --------------------------+----------------------------------------------
* testVoidCompletion | CoroTask<void> basic lifecycle
* testCorrectOrder | suspend() -> join() -> post() -> complete
* testIncorrectOrder | post() before suspend() (race-safe path)
* testJobQueueAwaiter | JobQueueAwaiter suspend + auto-repost
* testThreadSpecificStorage | LocalValue isolation across coroutines
* testExceptionPropagation | unhandled_exception() in promise_type
* testMultipleYields | N sequential suspend/resume cycles
* testValueReturn | CoroTask<T> co_return value
* testValueException | CoroTask<T> exception via co_await
* testValueChaining | nested CoroTask<T> -> CoroTask<T>
* testShutdownRejection | postCoroTask returns nullptr when stopping
*/
class CoroTask_test : public beast::unit_test::suite
{
public:
/**
* Simple one-shot gate for synchronizing between test thread
* and coroutine worker threads. signal() sets the flag;
* wait_for() blocks until signaled or timeout.
*/
class gate
{
private:
std::condition_variable cv_;
std::mutex mutex_;
bool signaled_ = false;
public:
/**
* Block until signaled or timeout expires.
*
* @param rel_time Maximum duration to wait
*
* @return true if signaled before timeout
*/
template <class Rep, class Period>
bool
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
{
std::unique_lock<std::mutex> lk(mutex_);
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
signaled_ = false;
return b;
}
/**
* Signal the gate, waking any waiting thread.
*/
void
signal()
{
std::lock_guard lk(mutex_);
signaled_ = true;
cv_.notify_all();
}
};
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
/**
* CoroTask<void> runs to completion and runner becomes non-runnable.
*/
void
testVoidCompletion()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("void completion");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(!runner->runnable());
}
/**
* Correct order: suspend, join, post, complete.
* Mirrors existing Coroutine_test::correct_order.
*/
void
testCorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("correct order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g1, g2;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[rp = &r, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
*rp = runner;
g1p->signal();
co_await runner->suspend();
g2p->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g1.wait_for(5s));
runner->join();
runner->post();
BEAST_EXPECT(g2.wait_for(5s));
runner->join();
}
/**
* Incorrect order: post() before suspend(). Verifies the
* race-safe path. Mirrors Coroutine_test::incorrect_order.
*/
void
testIncorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("incorrect order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto runner) -> CoroTask<void> {
runner->post();
co_await runner->suspend();
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
/**
* JobQueueAwaiter suspend + auto-repost across multiple yield points.
*/
void
testJobQueueAwaiter()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("JobQueueAwaiter");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int step = 0;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [sp = &step, gp = &g](auto runner) -> CoroTask<void> {
*sp = 1;
co_await JobQueueAwaiter{runner};
*sp = 2;
co_await JobQueueAwaiter{runner};
*sp = 3;
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(step == 3);
}
/**
* Per-coroutine LocalValue isolation. Each coroutine sees its own
* copy of thread-local state. Mirrors Coroutine_test::thread_specific_storage.
*/
void
testThreadSpecificStorage()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("thread specific storage");
Env env(*this);
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
gate g;
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -1);
*lv = -2;
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
for (int i = 0; i < N; ++i)
{
jq.postCoroTask(
jtCLIENT,
"CoroTaskTest",
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
(*ap)[id] = runner;
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(**lvp == -1);
**lvp = id;
this->BEAST_EXPECT(**lvp == id);
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(**lvp == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}
for (auto const& r : a)
{
r->post();
BEAST_EXPECT(g.wait_for(5s));
r->join();
}
for (auto const& r : a)
{
r->post();
r->join();
}
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
}
/**
* Exception thrown in coroutine body is caught by
* promise_type::unhandled_exception(). Coroutine completes.
*/
void
testExceptionPropagation()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("exception propagation");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
gp->signal();
throw std::runtime_error("test exception");
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
// The exception is caught by promise_type::unhandled_exception()
// and the coroutine is considered done
BEAST_EXPECT(!runner->runnable());
}
/**
* Multiple sequential suspend/resume cycles via co_await.
*/
void
testMultipleYields()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("multiple yields");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int counter = 0;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[rp = &r, cp = &counter, gp = &g](auto runner) -> CoroTask<void> {
*rp = runner;
++(*cp);
gp->signal();
co_await runner->suspend();
++(*cp);
gp->signal();
co_await runner->suspend();
++(*cp);
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 1);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 2);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 3);
runner->join();
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> returns a value via co_return. Outer coroutine
* extracts it with co_await.
*/
void
testValueReturn()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value return");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> { co_return 42; };
*rp = co_await inner();
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 42);
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> propagates exceptions from inner coroutines.
* Outer coroutine catches via try/catch around co_await.
*/
void
testValueException()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value exception");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
bool caught = false;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [cp = &caught, gp = &g](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> {
throw std::runtime_error("inner error");
co_return 0;
};
try
{
co_await inner();
}
catch (std::runtime_error const& e)
{
*cp = true;
}
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(caught);
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> chaining. Nested value-returning coroutines
* compose via co_await.
*/
void
testValueChaining()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value chaining");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
auto mul = [add](int a, int b) -> CoroTask<int> {
int sum = co_await add(a, b);
co_return sum * 2;
};
*rp = co_await mul(3, 4);
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 14); // (3 + 4) * 2
BEAST_EXPECT(!runner->runnable());
}
/**
* postCoroTask returns nullptr when JobQueue is stopping.
*/
void
testShutdownRejection()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("shutdown rejection");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
// Stop the JobQueue
env.app().getJobQueue().stop();
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [](auto) -> CoroTask<void> { co_return; });
BEAST_EXPECT(!runner);
}
void
run() override
{
testVoidCompletion();
testCorrectOrder();
testIncorrectOrder();
testJobQueueAwaiter();
testThreadSpecificStorage();
testExceptionPropagation();
testMultipleYields();
testValueReturn();
testValueException();
testValueChaining();
testShutdownRejection();
}
};
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
} // namespace test
} // namespace xrpl

View File

@@ -40,6 +40,11 @@ public:
}
};
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
void
correct_order()
{
@@ -54,13 +59,15 @@ public:
}));
gate g1, g2;
std::shared_ptr<JobQueue::Coro> c;
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& cr) {
c = cr;
g1.signal();
c->yield();
g2.signal();
});
std::shared_ptr<JobQueue::CoroTaskRunner> c;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTest", [cp = &c, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
*cp = runner;
g1p->signal();
co_await runner->suspend();
g2p->signal();
co_return;
});
BEAST_EXPECT(g1.wait_for(5s));
c->join();
c->post();
@@ -81,11 +88,17 @@ public:
}));
gate g;
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& c) {
c->post();
c->yield();
g.signal();
});
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTest", [gp = &g](auto runner) -> CoroTask<void> {
// Schedule a resume before suspending. The posted job
// cannot actually call resume() until the current resume()
// releases CoroTaskRunner::mutex_, which only happens after
// the coroutine suspends at co_await.
runner->post();
co_await runner->suspend();
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
@@ -101,7 +114,7 @@ public:
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::Coro>, N> a;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
@@ -118,19 +131,23 @@ public:
for (int i = 0; i < N; ++i)
{
jq.postCoro(jtCLIENT, "CoroTest", [&, id = i](auto const& c) {
a[id] = c;
g.signal();
c->yield();
jq.postCoroTask(
jtCLIENT,
"CoroTest",
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
(*ap)[id] = runner;
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == -1);
*lv = id;
this->BEAST_EXPECT(*lv == id);
g.signal();
c->yield();
this->BEAST_EXPECT(**lvp == -1);
**lvp = id;
this->BEAST_EXPECT(**lvp == id);
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == id);
});
this->BEAST_EXPECT(**lvp == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}

View File

@@ -43,87 +43,91 @@ class JobQueue_test : public beast::unit_test::suite
}
}
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
void
testPostCoro()
testPostCoroTask()
{
jtx::Env env{*this};
JobQueue& jQueue = env.app().getJobQueue();
{
// Test repeated post()s until the Coro completes.
// Test repeated post()s until the coroutine completes.
std::atomic<int> yieldCount{0};
auto const coro = jQueue.postCoro(
jtCLIENT,
"PostCoroTest1",
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
while (++yieldCount < 4)
coroCopy->yield();
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest1", [ycp = &yieldCount](auto runner) -> CoroTask<void> {
while (++(*ycp) < 4)
co_await runner->suspend();
co_return;
});
BEAST_EXPECT(coro != nullptr);
BEAST_EXPECT(runner != nullptr);
// Wait for the Job to run and yield.
while (yieldCount == 0)
;
// Now re-post until the Coro says it is done.
// Now re-post until the CoroTaskRunner says it is done.
int old = yieldCount;
while (coro->runnable())
while (runner->runnable())
{
BEAST_EXPECT(coro->post());
BEAST_EXPECT(runner->post());
while (old == yieldCount)
{
}
coro->join();
runner->join();
BEAST_EXPECT(++old == yieldCount);
}
BEAST_EXPECT(yieldCount == 4);
}
{
// Test repeated resume()s until the Coro completes.
// Test repeated resume()s until the coroutine completes.
int yieldCount{0};
auto const coro = jQueue.postCoro(
jtCLIENT,
"PostCoroTest2",
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
while (++yieldCount < 4)
coroCopy->yield();
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest2", [ycp = &yieldCount](auto runner) -> CoroTask<void> {
while (++(*ycp) < 4)
co_await runner->suspend();
co_return;
});
if (!coro)
if (!runner)
{
// There's no good reason we should not get a Coro, but we
// There's no good reason we should not get a runner, but we
// can't continue without one.
BEAST_EXPECT(false);
return;
}
// Wait for the Job to run and yield.
coro->join();
runner->join();
// Now resume until the Coro says it is done.
// Now resume until the CoroTaskRunner says it is done.
int old = yieldCount;
while (coro->runnable())
while (runner->runnable())
{
coro->resume(); // Resume runs synchronously on this thread.
runner->resume(); // Resume runs synchronously on this thread.
BEAST_EXPECT(++old == yieldCount);
}
BEAST_EXPECT(yieldCount == 4);
}
{
// If the JobQueue is stopped, we should no
// longer be able to add a Coro (and calling postCoro() should
// return false).
// longer be able to post a coroutine (and calling postCoroTask()
// should return nullptr).
using namespace std::chrono_literals;
jQueue.stop();
// The Coro should never run, so having the Coro access this
// The coroutine should never run, so having it access this
// unprotected variable on the stack should be completely safe.
// Not recommended for the faint of heart...
bool unprotected;
auto const coro = jQueue.postCoro(
jtCLIENT, "PostCoroTest3", [&unprotected](std::shared_ptr<JobQueue::Coro> const&) {
unprotected = false;
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest3", [up = &unprotected](auto) -> CoroTask<void> {
*up = false;
co_return;
});
BEAST_EXPECT(coro == nullptr);
BEAST_EXPECT(runner == nullptr);
}
}
@@ -132,7 +136,7 @@ public:
run() override
{
testAddJob();
testPostCoro();
testPostCoroTask();
}
};

View File

@@ -6,6 +6,7 @@
#include <xrpld/rpc/RPCHandler.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/protocol/ApiVersion.h>
#include <xrpl/protocol/STParsedJSON.h>
#include <xrpl/resource/Fees.h>
@@ -193,7 +194,6 @@ AMMTest::find_paths_request(
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -215,11 +215,11 @@ AMMTest::find_paths_request(
Json::Value result;
gate g;
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;

View File

@@ -1425,7 +1425,6 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
c,
Role::ADMIN,
{},
{},
RPC::apiMaximumSupportedVersion},
jvCommand};

View File

@@ -3,6 +3,7 @@
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/resource/Fees.h>
namespace xrpl {
@@ -99,13 +100,14 @@ GRPCServerImpl::CallData<Request, Response>::process()
// ensures that finished is always true when this CallData object
// is returned as a tag in handleRpcs(), after sending the response
finished_ = true;
auto coro = app_.getJobQueue().postCoro(
JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
thisShared->process(coro);
auto runner = app_.getJobQueue().postCoroTask(
JobType::jtRPC, "gRPC-Client", [thisShared](auto) -> CoroTask<void> {
thisShared->processRequest();
co_return;
});
// If coro is null, then the JobQueue has already been shutdown
if (!coro)
// If runner is null, then the JobQueue has already been shutdown
if (!runner)
{
grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
responder_.FinishWithError(status, this);
@@ -114,7 +116,7 @@ GRPCServerImpl::CallData<Request, Response>::process()
template <class Request, class Response>
void
GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::Coro> coro)
GRPCServerImpl::CallData<Request, Response>::processRequest()
{
try
{
@@ -156,7 +158,6 @@ GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::C
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
request_};

View File

@@ -206,9 +206,12 @@ private:
clone() override;
private:
// process the request. Called inside the coroutine passed to JobQueue
/**
* Process the gRPC request. Called inside the CoroTask lambda
* posted to the JobQueue by process().
*/
void
process(std::shared_ptr<JobQueue::Coro> coro);
processRequest();
// return load type of this RPC
Resource::Charge

View File

@@ -59,7 +59,7 @@ to_string(ProtocolVersion const& p)
}
std::vector<ProtocolVersion>
parseProtocolVersions(std::string_view const& value)
parseProtocolVersions(boost::beast::string_view const& value)
{
static boost::regex re(
"^" // start of line
@@ -130,7 +130,7 @@ negotiateProtocolVersion(std::vector<ProtocolVersion> const& versions)
}
std::optional<ProtocolVersion>
negotiateProtocolVersion(std::string_view const& versions)
negotiateProtocolVersion(boost::beast::string_view const& versions)
{
auto const them = parseProtocolVersions(versions);

View File

@@ -1,9 +1,10 @@
#pragma once
#include <boost/beast/core/string.hpp>
#include <cstdint>
#include <optional>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
@@ -38,7 +39,7 @@ to_string(ProtocolVersion const& p);
no duplicates and will be sorted in ascending protocol order.
*/
std::vector<ProtocolVersion>
parseProtocolVersions(std::string_view const& s);
parseProtocolVersions(boost::beast::string_view const& s);
/** Given a list of supported protocol versions, choose the one we prefer. */
std::optional<ProtocolVersion>
@@ -46,7 +47,7 @@ negotiateProtocolVersion(std::vector<ProtocolVersion> const& versions);
/** Given a list of supported protocol versions, choose the one we prefer. */
std::optional<ProtocolVersion>
negotiateProtocolVersion(std::string_view const& versions);
negotiateProtocolVersion(boost::beast::string_view const& versions);
/** The list of all the protocol versions we support. */
std::string const&

View File

@@ -3,7 +3,6 @@
#include <xrpld/rpc/Role.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/server/InfoSub.h>
namespace xrpl {
@@ -24,7 +23,6 @@ struct Context
LedgerMaster& ledgerMaster;
Resource::Consumer& consumer;
Role role;
std::shared_ptr<JobQueue::Coro> coro{};
InfoSub::pointer infoSub{};
unsigned int apiVersion;
};

View File

@@ -169,13 +169,10 @@ public:
private:
Json::Value
processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv);
processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv);
void
processSession(std::shared_ptr<Session> const&, std::shared_ptr<JobQueue::Coro> coro);
processSession(std::shared_ptr<Session> const&);
void
processRequest(
@@ -183,7 +180,6 @@ private:
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&&,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user);

View File

@@ -14,6 +14,7 @@
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/beast/rfc2616.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/json/to_string.h>
@@ -34,7 +35,6 @@
#include <algorithm>
#include <memory>
#include <stdexcept>
#include <string_view>
namespace xrpl {
@@ -231,7 +231,7 @@ ServerHandler::onHandoff(
static inline Json::Output
makeOutput(Session& session)
{
return [&](std::string_view const& b) { session.write(b.data(), b.size()); };
return [&](boost::beast::string_view const& b) { session.write(b.data(), b.size()); };
}
static std::map<std::string, std::string>
@@ -285,9 +285,10 @@ ServerHandler::onRequest(Session& session)
}
std::shared_ptr<Session> detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](auto) -> CoroTask<void> {
processSession(detachedSession);
co_return;
});
if (postResult == nullptr)
{
@@ -323,17 +324,18 @@ ServerHandler::onWSMessage(
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
auto const postResult = m_jobQueue.postCoro(
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_WEBSOCKET,
"WS-Client",
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
auto const jr = this->processSession(session, coro, jv);
[this, session, jv = std::move(jv)](auto) -> CoroTask<void> {
auto const jr = this->processSession(session, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
co_return;
});
if (postResult == nullptr)
{
@@ -374,10 +376,7 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ
}
Json::Value
ServerHandler::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv)
ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv)
{
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
if (is->getConsumer().disconnect(m_journal))
@@ -444,7 +443,6 @@ ServerHandler::processSession(
app_.getLedgerMaster(),
is->getConsumer(),
role,
coro,
is,
apiVersion},
jv,
@@ -515,27 +513,20 @@ ServerHandler::processSession(
return jr;
}
// Run as a coroutine.
void
ServerHandler::processSession(
std::shared_ptr<Session> const& session,
std::shared_ptr<JobQueue::Coro> coro)
ServerHandler::processSession(std::shared_ptr<Session> const& session)
{
processRequest(
session->port(),
buffers_to_string(session->request().body().data()),
session->remoteAddress().at_port(0),
makeOutput(*session),
coro,
forwardedFor(session->request()),
[&]() -> std::string_view {
[&] {
auto const iter = session->request().find("X-User");
if (iter != session->request().end())
{
auto const val = iter->value();
return std::string_view(val.data(), val.size());
}
return std::string_view{};
return iter->value();
return boost::beast::string_view{};
}());
if (beast::rfc2616::is_keep_alive(session->request()))
@@ -566,7 +557,6 @@ ServerHandler::processRequest(
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&& output,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user)
{
@@ -823,7 +813,6 @@ ServerHandler::processRequest(
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
params,

View File

@@ -7,6 +7,9 @@
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/resource/Fees.h>
#include <condition_variable>
#include <mutex>
namespace xrpl {
// This interface is deprecated.
@@ -37,98 +40,31 @@ doRipplePathFind(RPC::JsonContext& context)
PathRequest::pointer request;
lpLedger = context.ledgerMaster.getClosedLedger();
// It doesn't look like there's much odd happening here, but you should
// be aware this code runs in a JobQueue::Coro, which is a coroutine.
// And we may be flipping around between threads. Here's an overview:
//
// 1. We're running doRipplePathFind() due to a call to
// ripple_path_find. doRipplePathFind() is currently running
// inside of a JobQueue::Coro using a JobQueue thread.
//
// 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the
// path-finding request. That request will (probably) run at some
// indeterminate future time on a (probably different) JobQueue
// thread.
//
// 3. As a continuation from that path-finding JobQueue thread, the
// coroutine we're currently running in (!) is posted to the
// JobQueue. Because it is a continuation, that post won't
// happen until the path-finding request completes.
//
// 4. Once the continuation is enqueued, and we have reason to think
// the path-finding job is likely to run, then the coroutine we're
// running in yield()s. That means it surrenders its thread in
// the JobQueue. The coroutine is suspended, but ready to run,
// because it is kept resident by a shared_ptr in the
// path-finding continuation.
//
// 5. If all goes well then path-finding runs on a JobQueue thread
// and executes its continuation. The continuation posts this
// same coroutine (!) to the JobQueue.
//
// 6. When the JobQueue calls this coroutine, this coroutine resumes
// from the line below the coro->yield() and returns the
// path-finding result.
//
// With so many moving parts, what could go wrong?
//
// Just in terms of the JobQueue refusing to add jobs at shutdown
// there are two specific things that can go wrong.
//
// 1. The path-finding Job queued by makeLegacyPathRequest() might be
// rejected (because we're shutting down).
//
// Fortunately this problem can be addressed by looking at the
// return value of makeLegacyPathRequest(). If
// makeLegacyPathRequest() cannot get a thread to run the path-find
// on, then it returns an empty request.
//
// 2. The path-finding job might run, but the Coro::post() might be
// rejected by the JobQueue (because we're shutting down).
//
// We handle this case by resuming (not posting) the Coro.
// By resuming the Coro, we allow the Coro to run to completion
// on the current thread instead of requiring that it run on a
// new thread from the JobQueue.
//
// Both of these failure modes are hard to recreate in a unit test
// because they are so dependent on inter-thread timing. However
// the failure modes can be observed by synchronously (inside the
// rippled source code) shutting down the application. The code to
// do so looks like this:
//
// context.app.signalStop();
// while (! context.app.getJobQueue().jobCounter().joined()) { }
//
// The first line starts the process of shutting down the app.
// The second line waits until no more jobs can be added to the
// JobQueue before letting the thread continue.
//
// May 2017
// makeLegacyPathRequest enqueues a path-finding job that runs
// asynchronously. We block this thread with a condition_variable
// until the path-finding continuation signals completion.
// If makeLegacyPathRequest cannot schedule the job (e.g. during
// shutdown), it returns an empty request and we skip the wait.
std::mutex mtx;
std::condition_variable cv;
bool pathDone = false;
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
request,
[&context]() {
// Copying the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the
// captured reference could evaporate when we return from
// coroCopy->resume(). This is not strictly necessary, but
// will make maintenance easier.
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
if (!coroCopy->post())
[&]() {
{
// The post() failed, so we won't get a thread to let
// the Coro finish. We'll call Coro::resume() so the
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
std::lock_guard lk(mtx);
pathDone = true;
}
cv.notify_one();
},
context.consumer,
lpLedger,
context.params);
if (request)
{
context.coro->yield();
std::unique_lock lk(mtx);
cv.wait(lk, [&] { return pathDone; });
jvResult = request->doStatus(context.params);
}