mirror of
https://github.com/XRPLF/rippled.git
synced 2026-03-01 18:22:34 +00:00
Compare commits
18 Commits
dangell7/b
...
pratik/std
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
363870eb34 | ||
|
|
926d8128af | ||
|
|
a0a72a6934 | ||
|
|
3ed8a51d48 | ||
|
|
81eb10885b | ||
|
|
8f517e41f6 | ||
|
|
4613377a41 | ||
|
|
51ae47cdc6 | ||
|
|
185921ea94 | ||
|
|
403abe6408 | ||
|
|
404f35d556 | ||
|
|
eb83e111af | ||
|
|
464c09efc7 | ||
|
|
897c75bc6b | ||
|
|
ca15c0efd7 | ||
|
|
bb4bc1d167 | ||
|
|
b9d14fb9e1 | ||
|
|
af30b71043 |
15
.github/workflows/reusable-build-test-config.yml
vendored
15
.github/workflows/reusable-build-test-config.yml
vendored
@@ -229,8 +229,21 @@ jobs:
|
||||
env:
|
||||
BUILD_NPROC: ${{ steps.nproc.outputs.nproc }}
|
||||
run: |
|
||||
./xrpld --unittest --unittest-jobs "${BUILD_NPROC}"
|
||||
set -o pipefail
|
||||
./xrpld --unittest --unittest-jobs "${BUILD_NPROC}" 2>&1 | tee unittest.log
|
||||
|
||||
- name: Show test failure summary
|
||||
if: ${{ failure() && !inputs.build_only }}
|
||||
working-directory: ${{ runner.os == 'Windows' && format('{0}/{1}', env.BUILD_DIR, inputs.build_type) || env.BUILD_DIR }}
|
||||
run: |
|
||||
if [ ! -f unittest.log ]; then
|
||||
echo "unittest.log not found; embedded tests may not have run."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if ! grep -E "failed" unittest.log; then
|
||||
echo "Log present but no failure lines found in unittest.log."
|
||||
fi
|
||||
- name: Debug failure (Linux)
|
||||
if: ${{ failure() && runner.os == 'Linux' && !inputs.build_only }}
|
||||
run: |
|
||||
|
||||
1724
BoostToStdCoroutineSwitchPlan.md
Normal file
1724
BoostToStdCoroutineSwitchPlan.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -71,6 +71,7 @@ words:
|
||||
- coldwallet
|
||||
- compr
|
||||
- conanfile
|
||||
- cppcoro
|
||||
- conanrun
|
||||
- confs
|
||||
- connectability
|
||||
@@ -101,9 +102,11 @@ words:
|
||||
- Falco
|
||||
- finalizers
|
||||
- firewalled
|
||||
- fcontext
|
||||
- fmtdur
|
||||
- fsanitize
|
||||
- funclets
|
||||
- gantt
|
||||
- gcov
|
||||
- gcovr
|
||||
- ghead
|
||||
@@ -185,6 +188,7 @@ words:
|
||||
- ostr
|
||||
- pargs
|
||||
- partitioner
|
||||
- pratik
|
||||
- paychan
|
||||
- paychans
|
||||
- permdex
|
||||
@@ -206,6 +210,7 @@ words:
|
||||
- queuable
|
||||
- Raphson
|
||||
- replayer
|
||||
- repost
|
||||
- rerere
|
||||
- retriable
|
||||
- RIPD
|
||||
@@ -236,6 +241,7 @@ words:
|
||||
- soci
|
||||
- socidb
|
||||
- sslws
|
||||
- stackful
|
||||
- statsd
|
||||
- STATSDCOLLECTOR
|
||||
- stissue
|
||||
|
||||
687
include/xrpl/core/CoroTask.h
Normal file
687
include/xrpl/core/CoroTask.h
Normal file
@@ -0,0 +1,687 @@
|
||||
#pragma once
|
||||
|
||||
#include <coroutine>
|
||||
#include <exception>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
template <typename T = void>
|
||||
class CoroTask;
|
||||
|
||||
/**
|
||||
* CoroTask<void> -- coroutine return type for void-returning coroutines.
|
||||
*
|
||||
* Class / Dependency Diagram
|
||||
* ==========================
|
||||
*
|
||||
* CoroTask<void>
|
||||
* +-----------------------------------------------+
|
||||
* | - handle_ : Handle (coroutine_handle<promise>) |
|
||||
* +-----------------------------------------------+
|
||||
* | + handle(), done() |
|
||||
* | + await_ready/suspend/resume (Awaiter iface) |
|
||||
* +-----------------------------------------------+
|
||||
* | owns
|
||||
* v
|
||||
* promise_type
|
||||
* +-----------------------------------------------+
|
||||
* | - exception_ : std::exception_ptr |
|
||||
* | - continuation_ : std::coroutine_handle<> |
|
||||
* +-----------------------------------------------+
|
||||
* | + get_return_object() -> CoroTask |
|
||||
* | + initial_suspend() -> suspend_always (lazy) |
|
||||
* | + final_suspend() -> FinalAwaiter |
|
||||
* | + return_void() |
|
||||
* | + unhandled_exception() |
|
||||
* +-----------------------------------------------+
|
||||
* | returns at final_suspend
|
||||
* v
|
||||
* FinalAwaiter
|
||||
* +-----------------------------------------------+
|
||||
* | await_suspend(h): |
|
||||
* | if continuation_ set -> symmetric transfer |
|
||||
* | else -> noop_coroutine |
|
||||
* +-----------------------------------------------+
|
||||
*
|
||||
* Design Notes
|
||||
* ------------
|
||||
* - Lazy start: initial_suspend returns suspend_always, so the coroutine
|
||||
* body does not execute until the handle is explicitly resumed.
|
||||
* - Symmetric transfer: await_suspend returns a coroutine_handle instead
|
||||
* of void/bool, allowing the scheduler to jump directly to the next
|
||||
* coroutine without growing the call stack.
|
||||
* - Continuation chaining: when one CoroTask is co_await-ed inside
|
||||
* another, the caller's handle is stored as continuation_ so
|
||||
* FinalAwaiter can resume it when this task finishes.
|
||||
* - Move-only: the handle is exclusively owned; copy is deleted.
|
||||
*
|
||||
* Usage Examples
|
||||
* ==============
|
||||
*
|
||||
* 1. Basic void coroutine (the most common case in rippled):
|
||||
*
|
||||
* CoroTask<void> doWork(std::shared_ptr<CoroTaskRunner> runner) {
|
||||
* // do something
|
||||
* co_await runner->suspend(); // yield control
|
||||
* // resumed later via runner->post() or runner->resume()
|
||||
* co_return;
|
||||
* }
|
||||
*
|
||||
* 2. co_await-ing one CoroTask<void> from another (chaining):
|
||||
*
|
||||
* CoroTask<void> inner() {
|
||||
* // ...
|
||||
* co_return;
|
||||
* }
|
||||
* CoroTask<void> outer() {
|
||||
* co_await inner(); // continuation_ links outer -> inner
|
||||
* co_return; // FinalAwaiter resumes outer
|
||||
* }
|
||||
*
|
||||
* 3. Exceptions propagate through co_await:
|
||||
*
|
||||
* CoroTask<void> failing() {
|
||||
* throw std::runtime_error("oops");
|
||||
* co_return;
|
||||
* }
|
||||
* CoroTask<void> caller() {
|
||||
* try { co_await failing(); }
|
||||
* catch (std::runtime_error const&) { // caught here }
|
||||
* }
|
||||
*
|
||||
* Caveats / Pitfalls
|
||||
* ==================
|
||||
*
|
||||
* BUG-RISK: Dangling references in coroutine parameters.
|
||||
* Coroutine parameters are copied into the frame, but references
|
||||
* are NOT -- they are stored as-is. If the referent goes out of scope
|
||||
* before the coroutine finishes, you get use-after-free.
|
||||
*
|
||||
* // BROKEN -- local dies before coroutine runs:
|
||||
* CoroTask<void> bad(int& ref) { co_return; }
|
||||
* void launch() {
|
||||
* int local = 42;
|
||||
* auto task = bad(local); // frame stores &local
|
||||
* } // local destroyed; frame holds dangling ref
|
||||
*
|
||||
* // FIX -- pass by value, or ensure lifetime via shared_ptr.
|
||||
*
|
||||
* BUG-RISK: GCC 14 corrupts reference captures in coroutine lambdas.
|
||||
* When a lambda that returns CoroTask captures by reference ([&]),
|
||||
* GCC 14 may generate a corrupted coroutine frame. Always capture
|
||||
* by explicit pointer-to-value instead:
|
||||
*
|
||||
* // BROKEN on GCC 14:
|
||||
* jq.postCoroTask(t, n, [&](auto) -> CoroTask<void> { ... });
|
||||
*
|
||||
* // FIX -- capture pointers explicitly:
|
||||
* jq.postCoroTask(t, n, [ptr = &val](auto) -> CoroTask<void> { ... });
|
||||
*
|
||||
* BUG-RISK: Resuming a destroyed or completed CoroTask.
|
||||
* Calling handle().resume() after the coroutine has already run to
|
||||
* completion (done() == true) is undefined behavior. The CoroTaskRunner
|
||||
* guards against this with an XRPL_ASSERT, but standalone usage of
|
||||
* CoroTask must check done() before resuming.
|
||||
*
|
||||
* BUG-RISK: Moving a CoroTask that is being awaited.
|
||||
* If task A is co_await-ed by task B (so A.continuation_ == B), moving
|
||||
* or destroying A will invalidate the continuation link. Never move
|
||||
* or reassign a CoroTask while it is mid-execution or being awaited.
|
||||
*
|
||||
* LIMITATION: CoroTask is fire-and-forget for the top-level owner.
|
||||
* There is no built-in notification when the coroutine finishes.
|
||||
* The caller must use external synchronization (e.g. CoroTaskRunner::join
|
||||
* or a gate/condition_variable) to know when it is done.
|
||||
*
|
||||
* LIMITATION: No cancellation token.
|
||||
* There is no way to cancel a suspended CoroTask from outside. The
|
||||
* coroutine body must cooperatively check a flag (e.g. jq_.isStopping())
|
||||
* after each co_await and co_return early if needed.
|
||||
*
|
||||
* LIMITATION: Stackless -- cannot suspend from nested non-coroutine calls.
|
||||
* If a coroutine calls a regular function that wants to "yield", it
|
||||
* cannot. Only the immediate coroutine body can use co_await.
|
||||
* This is acceptable for rippled because all yield() sites are shallow.
|
||||
*/
|
||||
template <>
|
||||
class CoroTask<void>
|
||||
{
|
||||
public:
|
||||
struct promise_type;
|
||||
using Handle = std::coroutine_handle<promise_type>;
|
||||
|
||||
/**
|
||||
* Coroutine promise. Compiler uses this to manage coroutine state.
|
||||
* Stores the exception (if any) and the continuation handle for
|
||||
* symmetric transfer back to the awaiting coroutine.
|
||||
*/
|
||||
struct promise_type
|
||||
{
|
||||
// Captured exception from the coroutine body, rethrown in
|
||||
// await_resume() when this task is co_await-ed by a caller.
|
||||
std::exception_ptr exception_;
|
||||
|
||||
// Handle to the coroutine that is co_await-ing this task.
|
||||
// Set by await_suspend(). FinalAwaiter uses it for symmetric
|
||||
// transfer back to the caller. Null if this is a top-level task.
|
||||
std::coroutine_handle<> continuation_;
|
||||
|
||||
/**
|
||||
* Create the CoroTask return object.
|
||||
* Called by the compiler at coroutine creation.
|
||||
*/
|
||||
CoroTask
|
||||
get_return_object()
|
||||
{
|
||||
return CoroTask{Handle::from_promise(*this)};
|
||||
}
|
||||
|
||||
/**
|
||||
* Lazy start. The coroutine body does not execute until the
|
||||
* handle is explicitly resumed (e.g. by CoroTaskRunner::resume).
|
||||
*/
|
||||
std::suspend_always
|
||||
initial_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Awaiter returned by final_suspend(). Uses symmetric transfer:
|
||||
* if a continuation exists, transfers control directly to it
|
||||
* (tail-call, no stack growth). Otherwise returns noop_coroutine
|
||||
* so the coroutine frame stays alive for the owner to destroy.
|
||||
*/
|
||||
struct FinalAwaiter
|
||||
{
|
||||
/**
|
||||
* Always false. We need await_suspend to run for
|
||||
* symmetric transfer.
|
||||
*/
|
||||
bool
|
||||
await_ready() noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Symmetric transfer: returns the continuation handle so
|
||||
* the compiler emits a tail-call instead of a nested resume.
|
||||
* If no continuation is set, returns noop_coroutine to
|
||||
* suspend at final_suspend without destroying the frame.
|
||||
*
|
||||
* @param h Handle to this completing coroutine
|
||||
*
|
||||
* @return Continuation handle, or noop_coroutine
|
||||
*/
|
||||
std::coroutine_handle<>
|
||||
await_suspend(Handle h) noexcept
|
||||
{
|
||||
if (auto cont = h.promise().continuation_)
|
||||
return cont;
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns FinalAwaiter for symmetric transfer at coroutine end.
|
||||
*/
|
||||
FinalAwaiter
|
||||
final_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the compiler for `co_return;` (void coroutine).
|
||||
*/
|
||||
void
|
||||
return_void()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the compiler when an exception escapes the coroutine
|
||||
* body. Captures it for later rethrowing in await_resume().
|
||||
*/
|
||||
void
|
||||
unhandled_exception()
|
||||
{
|
||||
exception_ = std::current_exception();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Default constructor. Creates an empty (null handle) task.
|
||||
*/
|
||||
CoroTask() = default;
|
||||
|
||||
/**
|
||||
* Takes ownership of a compiler-generated coroutine handle.
|
||||
*
|
||||
* @param h Coroutine handle to own
|
||||
*/
|
||||
explicit CoroTask(Handle h) : handle_(h)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroys the coroutine frame if this task owns one.
|
||||
*/
|
||||
~CoroTask()
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
}
|
||||
|
||||
/**
|
||||
* Move constructor. Transfers handle ownership, leaves other empty.
|
||||
*/
|
||||
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Move assignment. Destroys current frame (if any), takes other's.
|
||||
*/
|
||||
CoroTask&
|
||||
operator=(CoroTask&& other) noexcept
|
||||
{
|
||||
if (this != &other)
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
handle_ = std::exchange(other.handle_, {});
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
CoroTask(CoroTask const&) = delete;
|
||||
CoroTask&
|
||||
operator=(CoroTask const&) = delete;
|
||||
|
||||
/**
|
||||
* @return The underlying coroutine_handle
|
||||
*/
|
||||
Handle
|
||||
handle() const
|
||||
{
|
||||
return handle_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the coroutine has run to completion (or thrown)
|
||||
*/
|
||||
bool
|
||||
done() const
|
||||
{
|
||||
return handle_ && handle_.done();
|
||||
}
|
||||
|
||||
// -- Awaiter interface: allows `co_await someCoroTask;` --
|
||||
|
||||
/**
|
||||
* Always false. This task is lazy, so co_await always suspends
|
||||
* the caller to set up the continuation link.
|
||||
*/
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the caller's handle as our continuation, then returns
|
||||
* our handle for symmetric transfer (caller suspends, we resume).
|
||||
*
|
||||
* @param caller Handle of the coroutine doing co_await on us
|
||||
*
|
||||
* @return Our handle for symmetric transfer
|
||||
*/
|
||||
std::coroutine_handle<>
|
||||
await_suspend(std::coroutine_handle<> caller) noexcept
|
||||
{
|
||||
handle_.promise().continuation_ = caller;
|
||||
return handle_; // Symmetric transfer
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the caller resumes after co_await. Rethrows any
|
||||
* exception captured by unhandled_exception().
|
||||
*/
|
||||
void
|
||||
await_resume()
|
||||
{
|
||||
if (auto& ep = handle_.promise().exception_)
|
||||
std::rethrow_exception(ep);
|
||||
}
|
||||
|
||||
private:
|
||||
// Exclusively-owned coroutine handle. Null after move or default
|
||||
// construction. Destroyed in the destructor.
|
||||
Handle handle_;
|
||||
};
|
||||
|
||||
/**
|
||||
* CoroTask<T> -- coroutine return type for value-returning coroutines.
|
||||
*
|
||||
* Class / Dependency Diagram
|
||||
* ==========================
|
||||
*
|
||||
* CoroTask<T>
|
||||
* +-----------------------------------------------+
|
||||
* | - handle_ : Handle (coroutine_handle<promise>) |
|
||||
* +-----------------------------------------------+
|
||||
* | + handle(), done() |
|
||||
* | + await_ready/suspend/resume (Awaiter iface) |
|
||||
* +-----------------------------------------------+
|
||||
* | owns
|
||||
* v
|
||||
* promise_type
|
||||
* +-----------------------------------------------+
|
||||
* | - result_ : variant<monostate, T, |
|
||||
* | exception_ptr> |
|
||||
* | - continuation_ : std::coroutine_handle<> |
|
||||
* +-----------------------------------------------+
|
||||
* | + get_return_object() -> CoroTask |
|
||||
* | + initial_suspend() -> suspend_always (lazy) |
|
||||
* | + final_suspend() -> FinalAwaiter |
|
||||
* | + return_value(T) -> stores in result_[1] |
|
||||
* | + unhandled_exception -> stores in result_[2] |
|
||||
* +-----------------------------------------------+
|
||||
* | returns at final_suspend
|
||||
* v
|
||||
* FinalAwaiter (same symmetric-transfer pattern as CoroTask<void>)
|
||||
*
|
||||
* Value Extraction
|
||||
* ----------------
|
||||
* await_resume() inspects the variant:
|
||||
* - index 2 (exception_ptr) -> rethrow
|
||||
* - index 1 (T) -> return value via move
|
||||
*
|
||||
* Usage Examples
|
||||
* ==============
|
||||
*
|
||||
* 1. Simple value return:
|
||||
*
|
||||
* CoroTask<int> computeAnswer() { co_return 42; }
|
||||
*
|
||||
* CoroTask<void> caller() {
|
||||
* int v = co_await computeAnswer(); // v == 42
|
||||
* }
|
||||
*
|
||||
* 2. Chaining value-returning coroutines:
|
||||
*
|
||||
* CoroTask<int> add(int a, int b) { co_return a + b; }
|
||||
* CoroTask<int> doubleSum(int a, int b) {
|
||||
* int s = co_await add(a, b);
|
||||
* co_return s * 2;
|
||||
* }
|
||||
*
|
||||
* 3. Exception propagation from inner to outer:
|
||||
*
|
||||
* CoroTask<int> failing() {
|
||||
* throw std::runtime_error("bad");
|
||||
* co_return 0; // never reached
|
||||
* }
|
||||
* CoroTask<void> caller() {
|
||||
* try {
|
||||
* int v = co_await failing(); // throws here
|
||||
* } catch (std::runtime_error const& e) {
|
||||
* // e.what() == "bad"
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* Caveats / Pitfalls (in addition to CoroTask<void> caveats above)
|
||||
* ================================================================
|
||||
*
|
||||
* BUG-RISK: await_resume() moves the value out of the variant.
|
||||
* Calling co_await on the same CoroTask<T> instance twice is undefined
|
||||
* behavior -- the second call will see a moved-from T. CoroTask is
|
||||
* single-shot: one co_return, one co_await.
|
||||
*
|
||||
* BUG-RISK: T must be move-constructible.
|
||||
* return_value(T) takes by value and moves into the variant.
|
||||
* Types that are not movable cannot be used as T.
|
||||
*
|
||||
* LIMITATION: No co_yield support.
|
||||
* CoroTask<T> only supports a single co_return. It does not implement
|
||||
* yield_value(), so using co_yield inside a CoroTask<T> coroutine is a
|
||||
* compile error. For streaming values, a different return type
|
||||
* (e.g. Generator<T>) would be needed.
|
||||
*
|
||||
* LIMITATION: Result is only accessible via co_await.
|
||||
* There is no .get() or .result() method. The value can only be
|
||||
* extracted by co_await-ing the CoroTask<T> from inside another
|
||||
* coroutine. For extracting results in non-coroutine code, pass a
|
||||
* pointer to the caller and write through it (as the tests do).
|
||||
*/
|
||||
template <typename T>
|
||||
class CoroTask
|
||||
{
|
||||
public:
|
||||
struct promise_type;
|
||||
using Handle = std::coroutine_handle<promise_type>;
|
||||
|
||||
/**
|
||||
* Coroutine promise for value-returning coroutines.
|
||||
* Stores the result as a variant: monostate (not yet set),
|
||||
* T (co_return value), or exception_ptr (unhandled exception).
|
||||
*/
|
||||
struct promise_type
|
||||
{
|
||||
// Tri-state result:
|
||||
// index 0 (monostate) -- coroutine has not yet completed
|
||||
// index 1 (T) -- co_return value stored here
|
||||
// index 2 (exception) -- unhandled exception captured here
|
||||
std::variant<std::monostate, T, std::exception_ptr> result_;
|
||||
|
||||
// Handle to the coroutine co_await-ing this task. Used by
|
||||
// FinalAwaiter for symmetric transfer. Null for top-level tasks.
|
||||
std::coroutine_handle<> continuation_;
|
||||
|
||||
/**
|
||||
* Create the CoroTask return object.
|
||||
* Called by the compiler at coroutine creation.
|
||||
*/
|
||||
CoroTask
|
||||
get_return_object()
|
||||
{
|
||||
return CoroTask{Handle::from_promise(*this)};
|
||||
}
|
||||
|
||||
/**
|
||||
* Lazy start. Coroutine body does not run until explicitly resumed.
|
||||
*/
|
||||
std::suspend_always
|
||||
initial_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Symmetric-transfer awaiter at coroutine completion.
|
||||
* Same pattern as CoroTask<void>::FinalAwaiter.
|
||||
*/
|
||||
struct FinalAwaiter
|
||||
{
|
||||
bool
|
||||
await_ready() noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns continuation for symmetric transfer, or
|
||||
* noop_coroutine if this is a top-level task.
|
||||
*
|
||||
* @param h Handle to this completing coroutine
|
||||
*
|
||||
* @return Continuation handle, or noop_coroutine
|
||||
*/
|
||||
std::coroutine_handle<>
|
||||
await_suspend(Handle h) noexcept
|
||||
{
|
||||
if (auto cont = h.promise().continuation_)
|
||||
return cont;
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
FinalAwaiter
|
||||
final_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the compiler for `co_return value;`.
|
||||
* Moves the value into result_ at index 1.
|
||||
*
|
||||
* @param value The value to store
|
||||
*/
|
||||
void
|
||||
return_value(T value)
|
||||
{
|
||||
result_.template emplace<1>(std::move(value));
|
||||
}
|
||||
|
||||
/**
|
||||
* Captures unhandled exceptions at index 2 of result_.
|
||||
* Rethrown later in await_resume().
|
||||
*/
|
||||
void
|
||||
unhandled_exception()
|
||||
{
|
||||
result_.template emplace<2>(std::current_exception());
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Default constructor. Creates an empty (null handle) task.
|
||||
*/
|
||||
CoroTask() = default;
|
||||
|
||||
/**
|
||||
* Takes ownership of a compiler-generated coroutine handle.
|
||||
*
|
||||
* @param h Coroutine handle to own
|
||||
*/
|
||||
explicit CoroTask(Handle h) : handle_(h)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroys the coroutine frame if this task owns one.
|
||||
*/
|
||||
~CoroTask()
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
}
|
||||
|
||||
/**
|
||||
* Move constructor. Transfers handle ownership, leaves other empty.
|
||||
*/
|
||||
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Move assignment. Destroys current frame (if any), takes other's.
|
||||
*/
|
||||
CoroTask&
|
||||
operator=(CoroTask&& other) noexcept
|
||||
{
|
||||
if (this != &other)
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
handle_ = std::exchange(other.handle_, {});
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
CoroTask(CoroTask const&) = delete;
|
||||
CoroTask&
|
||||
operator=(CoroTask const&) = delete;
|
||||
|
||||
/**
|
||||
* @return The underlying coroutine_handle
|
||||
*/
|
||||
Handle
|
||||
handle() const
|
||||
{
|
||||
return handle_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the coroutine has run to completion (or thrown)
|
||||
*/
|
||||
bool
|
||||
done() const
|
||||
{
|
||||
return handle_ && handle_.done();
|
||||
}
|
||||
|
||||
// -- Awaiter interface: allows `T val = co_await someCoroTask;` --
|
||||
|
||||
/**
|
||||
* Always false. co_await always suspends to set up continuation.
|
||||
*/
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores caller as continuation, returns our handle for
|
||||
* symmetric transfer.
|
||||
*
|
||||
* @param caller Handle of the coroutine doing co_await on us
|
||||
*
|
||||
* @return Our handle for symmetric transfer
|
||||
*/
|
||||
std::coroutine_handle<>
|
||||
await_suspend(std::coroutine_handle<> caller) noexcept
|
||||
{
|
||||
handle_.promise().continuation_ = caller;
|
||||
return handle_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the result: rethrows if exception, otherwise moves
|
||||
* the T value out of the variant. Single-shot: calling twice
|
||||
* on the same task is undefined (moved-from T).
|
||||
*
|
||||
* @return The co_return-ed value
|
||||
*/
|
||||
T
|
||||
await_resume()
|
||||
{
|
||||
auto& result = handle_.promise().result_;
|
||||
if (auto* ep = std::get_if<2>(&result))
|
||||
std::rethrow_exception(*ep);
|
||||
return std::get<1>(std::move(result));
|
||||
}
|
||||
|
||||
private:
|
||||
// Exclusively-owned coroutine handle. Null after move or default
|
||||
// construction. Destroyed in the destructor.
|
||||
Handle handle_;
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
329
include/xrpl/core/CoroTaskRunner.ipp
Normal file
329
include/xrpl/core/CoroTaskRunner.ipp
Normal file
@@ -0,0 +1,329 @@
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* @file CoroTaskRunner.ipp
|
||||
*
|
||||
* CoroTaskRunner inline implementation.
|
||||
*
|
||||
* This file contains the business logic for managing C++20 coroutines
|
||||
* on the JobQueue. It is included at the bottom of JobQueue.h.
|
||||
*
|
||||
* Data Flow: suspend / post / resume cycle
|
||||
* =========================================
|
||||
*
|
||||
* coroutine body CoroTaskRunner JobQueue
|
||||
* -------------- -------------- --------
|
||||
* |
|
||||
* co_await runner->suspend()
|
||||
* |
|
||||
* +--- await_suspend ------> onSuspend()
|
||||
* | ++nSuspend_ ------------> nSuspend_
|
||||
* | [coroutine is now suspended]
|
||||
* |
|
||||
* . (externally or by JobQueueAwaiter)
|
||||
* .
|
||||
* +--- (caller calls) -----> post()
|
||||
* | ++runCount_
|
||||
* | addJob(resume) ----------> job enqueued
|
||||
* | |
|
||||
* | [worker picks up]
|
||||
* | |
|
||||
* +--- <----- resume() <-----------------------------------+
|
||||
* | --nSuspend_ ------> nSuspend_
|
||||
* | swap in LocalValues (lvs_)
|
||||
* | task_.handle().resume()
|
||||
* | |
|
||||
* | [coroutine body continues here]
|
||||
* | |
|
||||
* | swap out LocalValues
|
||||
* | --runCount_
|
||||
* | cv_.notify_all()
|
||||
* v
|
||||
*
|
||||
* Thread Safety
|
||||
* =============
|
||||
* - mutex_ : guards task_.handle().resume() so that post()-before-suspend
|
||||
* races cannot resume the coroutine while it is still running.
|
||||
* (See the race condition discussion in JobQueue.h)
|
||||
* - mutex_run_ : guards runCount_ counter; used by join() to wait until
|
||||
* all in-flight resume operations complete.
|
||||
* - jq_.m_mutex: guards nSuspend_ increments/decrements.
|
||||
*
|
||||
* Common Mistakes When Modifying This File
|
||||
* =========================================
|
||||
*
|
||||
* 1. Changing lock ordering.
|
||||
* resume() acquires locks in this order: jq_.m_mutex -> mutex_ -> mutex_run_.
|
||||
* post() acquires only mutex_run_. Any new code path that touches these
|
||||
* mutexes must follow the same order to avoid deadlocks.
|
||||
*
|
||||
* 2. Removing the shared_from_this() capture in post().
|
||||
* The lambda passed to addJob captures [this, sp = shared_from_this()].
|
||||
* If you remove sp, 'this' can be destroyed before the job runs,
|
||||
* causing use-after-free. The sp capture is load-bearing.
|
||||
*
|
||||
* 3. Forgetting to decrement nSuspend_ on a new code path.
|
||||
* Every ++nSuspend_ must have a matching --nSuspend_. If you add a new
|
||||
* suspension path (e.g. a new awaiter) and forget to decrement on resume
|
||||
* or on failure, JobQueue::stop() will hang.
|
||||
*
|
||||
* 4. Calling task_.handle().resume() without holding mutex_.
|
||||
* This allows a race where the coroutine runs on two threads
|
||||
* simultaneously. Always hold mutex_ around resume().
|
||||
*
|
||||
* 5. Swapping LocalValues outside of the mutex_ critical section.
|
||||
* The swap-in and swap-out of LocalValues must bracket the resume()
|
||||
* call. If you move the swap-out before the lock_guard(mutex_) is
|
||||
* released, you break LocalValue isolation for any code that runs
|
||||
* after the coroutine suspends but before the lock is dropped.
|
||||
*/
|
||||
//
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/**
|
||||
* Construct a CoroTaskRunner. Sets runCount_ to 0; does not
|
||||
* create the coroutine. Call init() afterwards.
|
||||
*
|
||||
* @param jq The JobQueue this coroutine will run on
|
||||
* @param type Job type for scheduling priority
|
||||
* @param name Human-readable name for logging
|
||||
*/
|
||||
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
|
||||
create_t,
|
||||
JobQueue& jq,
|
||||
JobType type,
|
||||
std::string const& name)
|
||||
: jq_(jq), type_(type), name_(name), runCount_(0)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize with a coroutine-returning callable.
|
||||
* Stores the callable on the heap (FuncStore) so it outlives the
|
||||
* coroutine frame. Coroutine frames store a reference to the
|
||||
* callable's implicit object parameter (the lambda). If the callable
|
||||
* is a temporary, that reference dangles after the caller returns.
|
||||
* Keeping the callable alive here ensures the coroutine's captures
|
||||
* remain valid.
|
||||
*
|
||||
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
|
||||
*/
|
||||
template <class F>
|
||||
void
|
||||
JobQueue::CoroTaskRunner::init(F&& f)
|
||||
{
|
||||
using Fn = std::decay_t<F>;
|
||||
auto store = std::make_unique<FuncStore<Fn>>(std::forward<F>(f));
|
||||
task_ = store->func(shared_from_this());
|
||||
storedFunc_ = std::move(store);
|
||||
}
|
||||
|
||||
/**
|
||||
* Destructor. Waits for any in-flight resume() to complete, then
|
||||
* asserts (debug) that the coroutine has finished or
|
||||
* expectEarlyExit() was called.
|
||||
*
|
||||
* The join() call is necessary because with async dispatch the
|
||||
* coroutine runs on a worker thread. The gate signal (which wakes
|
||||
* the test thread) can arrive before resume() has set finished_.
|
||||
* join() synchronizes via mutex_run_, establishing a happens-before
|
||||
* edge: finished_ = true → unlock(mutex_run_) in resume() →
|
||||
* lock(mutex_run_) in join() → read finished_.
|
||||
*/
|
||||
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
join();
|
||||
XRPL_ASSERT(finished_, "xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
|
||||
*/
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::onSuspend()
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
++jq_.nSuspend_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrement nSuspend_ without resuming.
|
||||
*/
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::onUndoSuspend()
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a SuspendAwaiter whose await_suspend() increments nSuspend_
|
||||
* before the coroutine actually suspends. The caller must later call
|
||||
* post() or resume() to continue execution.
|
||||
*
|
||||
* @return Awaiter for use with `co_await runner->suspend()`
|
||||
*/
|
||||
inline auto
|
||||
JobQueue::CoroTaskRunner::suspend()
|
||||
{
|
||||
/**
|
||||
* Custom awaiter for suspend(). Always suspends (await_ready
|
||||
* returns false) and increments nSuspend_ in await_suspend().
|
||||
*/
|
||||
struct SuspendAwaiter
|
||||
{
|
||||
CoroTaskRunner& runner_; // The runner that owns this coroutine.
|
||||
|
||||
/**
|
||||
* Always returns false so the coroutine suspends.
|
||||
*/
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the coroutine suspends. Increments nSuspend_
|
||||
* so the JobQueue knows a coroutine is waiting.
|
||||
*/
|
||||
void
|
||||
await_suspend(std::coroutine_handle<>) const
|
||||
{
|
||||
runner_.onSuspend();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() const noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
return SuspendAwaiter{*this};
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule coroutine resumption as a job on the JobQueue.
|
||||
* A shared_ptr capture (sp) prevents this CoroTaskRunner from being
|
||||
* destroyed while the job is queued but not yet executed.
|
||||
*
|
||||
* @return false if the JobQueue rejected the job (shutting down)
|
||||
*/
|
||||
inline bool
|
||||
JobQueue::CoroTaskRunner::post()
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
++runCount_;
|
||||
}
|
||||
|
||||
// sp prevents 'this' from being destroyed while the job is pending
|
||||
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// The coroutine will not run. Undo the runCount_ increment.
|
||||
std::lock_guard lk(mutex_run_);
|
||||
--runCount_;
|
||||
cv_.notify_all();
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume the coroutine on the current thread.
|
||||
*
|
||||
* Steps:
|
||||
* 1. Decrement nSuspend_ (under jq_.m_mutex)
|
||||
* 2. Swap in this coroutine's LocalValues for thread-local isolation
|
||||
* 3. Resume the coroutine handle (under mutex_)
|
||||
* 4. Swap out LocalValues, restoring the thread's previous state
|
||||
* 5. Decrement runCount_ and notify join() waiters
|
||||
*
|
||||
* Note: runCount_ is NOT incremented here — post() already did that.
|
||||
* This ensures join() stays blocked for the entire post→resume lifetime.
|
||||
*/
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::resume()
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
}
|
||||
auto saved = detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(&lvs_);
|
||||
std::lock_guard lock(mutex_);
|
||||
XRPL_ASSERT(!task_.done(), "xrpl::JobQueue::CoroTaskRunner::resume : task is not done");
|
||||
task_.handle().resume();
|
||||
detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(saved);
|
||||
if (task_.done())
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
finished_ = true;
|
||||
#endif
|
||||
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
|
||||
// Use std::move (not task_ = {}) so task_.handle_ is null BEFORE the
|
||||
// frame is destroyed. operator= would destroy the frame while handle_
|
||||
// still holds the old value -- a re-entrancy hazard on GCC-12 if
|
||||
// frame destruction triggers runner cleanup.
|
||||
[[maybe_unused]] auto completed = std::move(task_);
|
||||
}
|
||||
std::lock_guard lk(mutex_run_);
|
||||
--runCount_;
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the coroutine has not yet run to completion
|
||||
*/
|
||||
inline bool
|
||||
JobQueue::CoroTaskRunner::runnable() const
|
||||
{
|
||||
// After normal completion, task_ is reset to break the shared_ptr cycle
|
||||
// (handle_ becomes null). A null handle means the coroutine is done.
|
||||
return task_.handle() && !task_.done();
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle early termination when the coroutine never ran (e.g. JobQueue
|
||||
* is stopping). Decrements nSuspend_ and destroys the coroutine frame
|
||||
* to break the shared_ptr cycle: frame -> lambda -> runner -> frame.
|
||||
*/
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::expectEarlyExit()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
if (!finished_)
|
||||
#endif
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
#ifndef NDEBUG
|
||||
finished_ = true;
|
||||
#endif
|
||||
}
|
||||
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
|
||||
// The coroutine is at initial_suspend and never ran user code, so
|
||||
// destroying it is safe. Use std::move (not task_ = {}) so
|
||||
// task_.handle_ is null before the frame is destroyed.
|
||||
{
|
||||
[[maybe_unused]] auto completed = std::move(task_);
|
||||
}
|
||||
storedFunc_.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
* Block until all pending/active resume operations complete.
|
||||
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0.
|
||||
*/
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::join()
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex_run_);
|
||||
cv_.wait(lk, [this]() { return runCount_ == 0; });
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
#include <xrpl/basics/LocalValue.h>
|
||||
#include <xrpl/core/ClosureCounter.h>
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/core/JobTypeData.h>
|
||||
#include <xrpl/core/JobTypes.h>
|
||||
#include <xrpl/core/detail/Workers.h>
|
||||
@@ -9,6 +10,7 @@
|
||||
|
||||
#include <boost/coroutine/all.hpp>
|
||||
|
||||
#include <coroutine>
|
||||
#include <set>
|
||||
|
||||
namespace xrpl {
|
||||
@@ -119,6 +121,395 @@ public:
|
||||
join();
|
||||
};
|
||||
|
||||
/** C++20 coroutine lifecycle manager. Replaces Coro for new code.
|
||||
*
|
||||
* Class / Inheritance / Dependency Diagram
|
||||
* =========================================
|
||||
*
|
||||
* std::enable_shared_from_this<CoroTaskRunner>
|
||||
* ^
|
||||
* | (public inheritance)
|
||||
* |
|
||||
* CoroTaskRunner
|
||||
* +---------------------------------------------------+
|
||||
* | - lvs_ : detail::LocalValues |
|
||||
* | - jq_ : JobQueue& |
|
||||
* | - type_ : JobType |
|
||||
* | - name_ : std::string |
|
||||
* | - runCount_ : int (in-flight resumes) |
|
||||
* | - mutex_ : std::mutex (coroutine guard) |
|
||||
* | - mutex_run_ : std::mutex (join guard) |
|
||||
* | - cv_ : condition_variable |
|
||||
* | - task_ : CoroTask<void> |
|
||||
* | - storedFunc_ : unique_ptr<FuncBase> (type-erased)|
|
||||
* +---------------------------------------------------+
|
||||
* | + init(F&&) : set up coroutine callable |
|
||||
* | + onSuspend() : ++jq_.nSuspend_ |
|
||||
* | + onUndoSuspend() : --jq_.nSuspend_ |
|
||||
* | + suspend() : returns SuspendAwaiter |
|
||||
* | + post() : schedule resume on JobQueue |
|
||||
* | + resume() : resume coroutine on caller |
|
||||
* | + runnable() : !task_.done() |
|
||||
* | + expectEarlyExit() : teardown for failed post |
|
||||
* | + join() : block until not running |
|
||||
* +---------------------------------------------------+
|
||||
* | |
|
||||
* | owns | references
|
||||
* v v
|
||||
* CoroTask<void> JobQueue
|
||||
* (coroutine frame) (thread pool + nSuspend_)
|
||||
*
|
||||
* FuncBase / FuncStore<F> (type-erased heap storage
|
||||
* for the coroutine lambda)
|
||||
*
|
||||
* Coroutine Lifecycle (Control Flow)
|
||||
* ===================================
|
||||
*
|
||||
* Caller thread JobQueue worker thread
|
||||
* ------------- ----------------------
|
||||
* postCoroTask(f)
|
||||
* |
|
||||
* +-- check stopping_ (reject if JQ shutting down)
|
||||
* +-- ++nSuspend_ (lazy start counts as suspended)
|
||||
* +-- make_shared<CoroTaskRunner>
|
||||
* +-- init(f)
|
||||
* | +-- store lambda on heap (FuncStore)
|
||||
* | +-- task_ = f(shared_from_this())
|
||||
* | [coroutine created, suspended at initial_suspend]
|
||||
* +-- post()
|
||||
* | +-- ++runCount_
|
||||
* | +-- addJob(type_, [resume]{})
|
||||
* | resume()
|
||||
* | |
|
||||
* | +-- --nSuspend_
|
||||
* | +-- swap in LocalValues
|
||||
* | +-- task_.handle().resume()
|
||||
* | | [coroutine body runs]
|
||||
* | | ...
|
||||
* | | co_await suspend()
|
||||
* | | +-- ++nSuspend_
|
||||
* | | [coroutine suspends]
|
||||
* | +-- swap out LocalValues
|
||||
* | +-- --runCount_
|
||||
* | +-- cv_.notify_all()
|
||||
* |
|
||||
* post() <-- called externally or by JobQueueAwaiter
|
||||
* +-- ++runCount_
|
||||
* +-- addJob(type_, [resume]{})
|
||||
* resume()
|
||||
* |
|
||||
* +-- [coroutine body continues]
|
||||
* +-- co_return
|
||||
* +-- --runCount_
|
||||
* +-- cv_.notify_all()
|
||||
* join()
|
||||
* +-- cv_.wait([]{runCount_ == 0})
|
||||
* +-- [done]
|
||||
*
|
||||
* Usage Examples
|
||||
* ==============
|
||||
*
|
||||
* 1. Fire-and-forget coroutine (most common pattern):
|
||||
*
|
||||
* jq.postCoroTask(jtCLIENT, "MyWork",
|
||||
* [](auto runner) -> CoroTask<void> {
|
||||
* doSomeWork();
|
||||
* co_await runner->suspend(); // yield to other jobs
|
||||
* doMoreWork();
|
||||
* co_return;
|
||||
* });
|
||||
*
|
||||
* 2. Manually controlling suspend / resume (external trigger):
|
||||
*
|
||||
* auto runner = jq.postCoroTask(jtCLIENT, "ExtTrigger",
|
||||
* [&result](auto runner) -> CoroTask<void> {
|
||||
* startAsyncOperation(callback);
|
||||
* co_await runner->suspend();
|
||||
* // callback called runner->post() to get here
|
||||
* result = collectResult();
|
||||
* co_return;
|
||||
* });
|
||||
* // ... later, from the callback:
|
||||
* runner->post(); // reschedule the coroutine on the JobQueue
|
||||
*
|
||||
* 3. Using JobQueueAwaiter for automatic suspend + repost:
|
||||
*
|
||||
* jq.postCoroTask(jtCLIENT, "AutoRepost",
|
||||
* [](auto runner) -> CoroTask<void> {
|
||||
* step1();
|
||||
* co_await JobQueueAwaiter{runner}; // yield + auto-repost
|
||||
* step2();
|
||||
* co_await JobQueueAwaiter{runner};
|
||||
* step3();
|
||||
* co_return;
|
||||
* });
|
||||
*
|
||||
* 4. Checking shutdown after co_await (cooperative cancellation):
|
||||
*
|
||||
* jq.postCoroTask(jtCLIENT, "Cancellable",
|
||||
* [&jq](auto runner) -> CoroTask<void> {
|
||||
* while (moreWork()) {
|
||||
* co_await JobQueueAwaiter{runner};
|
||||
* if (jq.isStopping())
|
||||
* co_return; // bail out cleanly
|
||||
* processNextItem();
|
||||
* }
|
||||
* co_return;
|
||||
* });
|
||||
*
|
||||
* Caveats / Pitfalls
|
||||
* ==================
|
||||
*
|
||||
* BUG-RISK: Calling suspend() without a matching post()/resume().
|
||||
* After co_await runner->suspend(), the coroutine is parked and
|
||||
* nSuspend_ is incremented. If nothing ever calls post() or
|
||||
* resume(), the coroutine is leaked and JobQueue::stop() will
|
||||
* hang forever waiting for nSuspend_ to reach zero.
|
||||
*
|
||||
* BUG-RISK: Calling post() on an already-running coroutine.
|
||||
* post() schedules a resume() job. If the coroutine has not
|
||||
* actually suspended yet (no co_await executed), the resume job
|
||||
* will try to call handle().resume() while the coroutine is still
|
||||
* running on another thread. This is UB. The mutex_ prevents
|
||||
* data corruption but the logic is wrong — always co_await
|
||||
* suspend() before calling post(). (The test testIncorrectOrder
|
||||
* shows this works only because mutex_ serializes the calls.)
|
||||
*
|
||||
* BUG-RISK: Dropping the shared_ptr<CoroTaskRunner> before join().
|
||||
* The CoroTaskRunner destructor asserts (!finished_ is false).
|
||||
* If you let the last shared_ptr die while the coroutine is still
|
||||
* running or suspended, you get an assertion failure in debug and
|
||||
* UB in release. Always call join() or expectEarlyExit() first.
|
||||
*
|
||||
* BUG-RISK: Lambda captures outliving the coroutine frame.
|
||||
* The lambda passed to postCoroTask is heap-allocated (FuncStore)
|
||||
* to prevent dangling. But objects captured by pointer still need
|
||||
* their own lifetime management. If you capture a raw pointer to
|
||||
* a stack variable, and the stack frame exits before the coroutine
|
||||
* finishes, the pointer dangles. Use shared_ptr or ensure the
|
||||
* pointed-to object outlives the coroutine.
|
||||
*
|
||||
* BUG-RISK: Forgetting co_return in a void coroutine.
|
||||
* If the coroutine body falls off the end without co_return,
|
||||
* the compiler may silently treat it as co_return (per standard),
|
||||
* but some compilers warn. Always write explicit co_return.
|
||||
*
|
||||
* LIMITATION: CoroTaskRunner only supports CoroTask<void>.
|
||||
* The task_ member is CoroTask<void>. To return values from
|
||||
* the top-level coroutine, write through a captured pointer
|
||||
* (as the tests demonstrate), or co_await inner CoroTask<T>
|
||||
* coroutines that return values.
|
||||
*
|
||||
* LIMITATION: One coroutine per CoroTaskRunner.
|
||||
* init() must be called exactly once. You cannot reuse a
|
||||
* CoroTaskRunner to run a second coroutine. Create a new one
|
||||
* via postCoroTask() instead.
|
||||
*
|
||||
* LIMITATION: No timeout on join().
|
||||
* join() blocks indefinitely. If the coroutine is suspended
|
||||
* and never posted, join() will deadlock. Use timed waits
|
||||
* on the gate pattern (condition_variable + wait_for) in tests.
|
||||
*/
|
||||
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
|
||||
{
|
||||
private:
|
||||
// Per-coroutine thread-local storage. Swapped in before resume()
|
||||
// and swapped out after, so each coroutine sees its own LocalValue
|
||||
// state regardless of which worker thread executes it.
|
||||
detail::LocalValues lvs_;
|
||||
|
||||
// Back-reference to the owning JobQueue. Used to post jobs,
|
||||
// increment/decrement nSuspend_, and acquire jq_.m_mutex.
|
||||
JobQueue& jq_;
|
||||
|
||||
// Job type passed to addJob() when posting this coroutine.
|
||||
JobType type_;
|
||||
|
||||
// Human-readable name for this coroutine job (for logging).
|
||||
std::string name_;
|
||||
|
||||
// Number of in-flight resume operations (pending + active).
|
||||
// Incremented by post(), decremented when resume() finishes.
|
||||
// Guarded by mutex_run_. join() blocks until this reaches 0.
|
||||
//
|
||||
// A counter (not a bool) is needed because post() can be called
|
||||
// from within the coroutine body (e.g. via JobQueueAwaiter),
|
||||
// enqueuing a second resume while the first is still running.
|
||||
// A bool would be clobbered: R2.post() sets true, then R1's
|
||||
// cleanup sets false — losing the fact that R2 is still pending.
|
||||
int runCount_;
|
||||
|
||||
// Guards task_.handle().resume() to prevent the coroutine from
|
||||
// running on two threads simultaneously. Handles the race where
|
||||
// post() enqueues a resume before the coroutine has actually
|
||||
// suspended (post-before-suspend pattern).
|
||||
std::mutex mutex_;
|
||||
|
||||
// Guards runCount_. Used with cv_ for join() to wait
|
||||
// until all pending/active resume operations complete.
|
||||
std::mutex mutex_run_;
|
||||
|
||||
// Notified when runCount_ reaches zero, allowing
|
||||
// join() waiters to wake up.
|
||||
std::condition_variable cv_;
|
||||
|
||||
// The coroutine handle wrapper. Owns the coroutine frame.
|
||||
// Set by init(), reset to empty by expectEarlyExit() on
|
||||
// early termination.
|
||||
CoroTask<void> task_;
|
||||
|
||||
/**
|
||||
* Type-erased base for heap-stored callables.
|
||||
* Prevents the coroutine lambda from being destroyed before
|
||||
* the coroutine frame is done with it.
|
||||
*
|
||||
* @see FuncStore
|
||||
*/
|
||||
struct FuncBase
|
||||
{
|
||||
virtual ~FuncBase() = default;
|
||||
};
|
||||
|
||||
/**
|
||||
* Concrete type-erased storage for a callable of type F.
|
||||
* The coroutine frame stores a reference to the lambda's implicit
|
||||
* object parameter. If the lambda is a temporary, that reference
|
||||
* dangles after the call returns. FuncStore keeps it alive on
|
||||
* the heap for the lifetime of the CoroTaskRunner.
|
||||
*/
|
||||
template <class F>
|
||||
struct FuncStore : FuncBase
|
||||
{
|
||||
F func; // The stored callable (coroutine lambda).
|
||||
explicit FuncStore(F&& f) : func(std::move(f))
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
// Heap-allocated callable storage. Set by init(), ensures the
|
||||
// lambda outlives the coroutine frame that references it.
|
||||
std::unique_ptr<FuncBase> storedFunc_;
|
||||
|
||||
#ifndef NDEBUG
|
||||
// Debug-only flag. True once the coroutine has completed or
|
||||
// expectEarlyExit() was called. Asserted in the destructor
|
||||
// to catch leaked runners.
|
||||
bool finished_ = false;
|
||||
#endif
|
||||
|
||||
public:
|
||||
/**
|
||||
* Tag type for private construction. Prevents external code
|
||||
* from constructing CoroTaskRunner directly. Use postCoroTask().
|
||||
*/
|
||||
struct create_t
|
||||
{
|
||||
explicit create_t() = default;
|
||||
};
|
||||
|
||||
/**
|
||||
* Construct a CoroTaskRunner. Private by convention (create_t tag).
|
||||
*
|
||||
* @param jq The JobQueue this coroutine will run on
|
||||
* @param type Job type for scheduling priority
|
||||
* @param name Human-readable name for logging
|
||||
*/
|
||||
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
|
||||
|
||||
CoroTaskRunner(CoroTaskRunner const&) = delete;
|
||||
CoroTaskRunner&
|
||||
operator=(CoroTaskRunner const&) = delete;
|
||||
|
||||
/**
|
||||
* Destructor. Asserts (debug) that the coroutine has finished
|
||||
* or expectEarlyExit() was called.
|
||||
*/
|
||||
~CoroTaskRunner();
|
||||
|
||||
/**
|
||||
* Initialize with a coroutine-returning callable.
|
||||
* Must be called exactly once, after the object is managed by
|
||||
* shared_ptr (because init uses shared_from_this internally).
|
||||
* This is handled automatically by postCoroTask().
|
||||
*
|
||||
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
|
||||
*/
|
||||
template <class F>
|
||||
void
|
||||
init(F&& f);
|
||||
|
||||
/**
|
||||
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
|
||||
* Called when the coroutine is about to suspend. Every call
|
||||
* must be balanced by a corresponding decrement (via resume()
|
||||
* or onUndoSuspend()), or JobQueue::stop() will hang.
|
||||
*/
|
||||
void
|
||||
onSuspend();
|
||||
|
||||
/**
|
||||
* Decrement nSuspend_ without resuming.
|
||||
* Used to undo onSuspend() when a scheduled post() fails
|
||||
* (e.g. JobQueue is stopping).
|
||||
*/
|
||||
void
|
||||
onUndoSuspend();
|
||||
|
||||
/**
|
||||
* Suspend the coroutine.
|
||||
* The awaiter's await_suspend() increments nSuspend_ before the
|
||||
* coroutine actually suspends. The caller must later call post()
|
||||
* or resume() to continue execution.
|
||||
*
|
||||
* @return An awaiter for use with `co_await runner->suspend()`
|
||||
*/
|
||||
auto
|
||||
suspend();
|
||||
|
||||
/**
|
||||
* Schedule coroutine resumption as a job on the JobQueue.
|
||||
* Captures shared_from_this() to prevent this runner from being
|
||||
* destroyed while the job is queued.
|
||||
*
|
||||
* @return true if the job was accepted; false if the JobQueue
|
||||
* is stopping (caller must handle cleanup)
|
||||
*/
|
||||
bool
|
||||
post();
|
||||
|
||||
/**
|
||||
* Resume the coroutine on the current thread.
|
||||
* Decrements nSuspend_, swaps in LocalValues, resumes the
|
||||
* coroutine handle, swaps out LocalValues, and notifies join()
|
||||
* waiters. Lock ordering: mutex_run_ -> jq_.m_mutex -> mutex_.
|
||||
*/
|
||||
void
|
||||
resume();
|
||||
|
||||
/**
|
||||
* @return true if the coroutine has not yet run to completion
|
||||
*/
|
||||
bool
|
||||
runnable() const;
|
||||
|
||||
/**
|
||||
* Handle early termination when the coroutine never ran.
|
||||
* Decrements nSuspend_ and destroys the coroutine frame to
|
||||
* break the shared_ptr cycle (frame -> lambda -> runner -> frame).
|
||||
* Called by postCoroTask() when post() fails.
|
||||
*/
|
||||
void
|
||||
expectEarlyExit();
|
||||
|
||||
/**
|
||||
* Block until all pending/active resume operations complete.
|
||||
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0.
|
||||
* Warning: deadlocks if the coroutine is suspended and never posted.
|
||||
*/
|
||||
void
|
||||
join();
|
||||
};
|
||||
|
||||
using JobFunction = std::function<void()>;
|
||||
|
||||
JobQueue(
|
||||
@@ -165,6 +556,19 @@ public:
|
||||
std::shared_ptr<Coro>
|
||||
postCoro(JobType t, std::string const& name, F&& f);
|
||||
|
||||
/** Creates a C++20 coroutine and adds a job to the queue to run it.
|
||||
|
||||
@param t The type of job.
|
||||
@param name Name of the job.
|
||||
@param f Callable with signature
|
||||
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
|
||||
|
||||
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
|
||||
*/
|
||||
template <class F>
|
||||
std::shared_ptr<CoroTaskRunner>
|
||||
postCoroTask(JobType t, std::string const& name, F&& f);
|
||||
|
||||
/** Jobs waiting at this priority.
|
||||
*/
|
||||
int
|
||||
@@ -379,6 +783,7 @@ private:
|
||||
} // namespace xrpl
|
||||
|
||||
#include <xrpl/core/Coro.ipp>
|
||||
#include <xrpl/core/CoroTaskRunner.ipp>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
@@ -401,4 +806,69 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
|
||||
return coro;
|
||||
}
|
||||
|
||||
// postCoroTask — entry point for launching a C++20 coroutine on the JobQueue.
|
||||
//
|
||||
// Control Flow
|
||||
// ============
|
||||
//
|
||||
// postCoroTask(t, name, f)
|
||||
// |
|
||||
// +-- 1. Check stopping_ — reject if JQ shutting down
|
||||
// |
|
||||
// +-- 2. ++nSuspend_ (mirrors Boost Coro ctor's implicit yield)
|
||||
// | The coroutine is "suspended" from the JobQueue's perspective
|
||||
// | even though it hasn't run yet — this keeps the JQ shutdown
|
||||
// | logic correct (it waits for nSuspend_ to reach 0).
|
||||
// |
|
||||
// +-- 3. Create CoroTaskRunner (shared_ptr, ref-counted)
|
||||
// |
|
||||
// +-- 4. runner->init(f)
|
||||
// | +-- Heap-allocate the lambda (FuncStore) to prevent
|
||||
// | | dangling captures in the coroutine frame
|
||||
// | +-- task_ = f(shared_from_this())
|
||||
// | [coroutine created but NOT started — lazy initial_suspend]
|
||||
// |
|
||||
// +-- 5. runner->post()
|
||||
// | +-- addJob(type_, [resume]{}) → resume on worker thread
|
||||
// | +-- failure (JQ stopping):
|
||||
// | +-- runner->expectEarlyExit()
|
||||
// | | --nSuspend_, destroy coroutine frame
|
||||
// | +-- return nullptr
|
||||
//
|
||||
// Why async post() instead of synchronous resume()?
|
||||
// ==================================================
|
||||
// The initial dispatch MUST use async post() so the coroutine body runs on
|
||||
// a JobQueue worker thread, not the caller's thread. resume() swaps the
|
||||
// caller's thread-local LocalValues with the coroutine's private copy.
|
||||
// If the coroutine mutates LocalValues (e.g. thread_specific_storage test),
|
||||
// those mutations bleed back into the caller's thread-local state after the
|
||||
// swap-out, corrupting subsequent tests that share the same thread pool.
|
||||
// Async post() avoids this by running the coroutine on a worker thread whose
|
||||
// LocalValues are managed by the thread pool, not by the caller.
|
||||
//
|
||||
template <class F>
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner>
|
||||
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
|
||||
{
|
||||
// Reject if the JQ is shutting down — matches addJob()'s stopping_ check.
|
||||
// Must check before incrementing nSuspend_ to avoid leaving an orphan
|
||||
// count that would cause stop() to hang.
|
||||
if (stopping_)
|
||||
return nullptr;
|
||||
|
||||
{
|
||||
std::lock_guard lock(m_mutex);
|
||||
++nSuspend_;
|
||||
}
|
||||
|
||||
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
|
||||
runner->init(std::forward<F>(f));
|
||||
if (!runner->post())
|
||||
{
|
||||
runner->expectEarlyExit();
|
||||
runner.reset();
|
||||
}
|
||||
return runner;
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
174
include/xrpl/core/JobQueueAwaiter.h
Normal file
174
include/xrpl/core/JobQueueAwaiter.h
Normal file
@@ -0,0 +1,174 @@
|
||||
#pragma once
|
||||
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
|
||||
#include <coroutine>
|
||||
#include <memory>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/**
|
||||
* Awaiter that suspends and immediately reschedules on the JobQueue.
|
||||
* Equivalent to calling yield() followed by post() in the old Coro API.
|
||||
*
|
||||
* Usage:
|
||||
* co_await JobQueueAwaiter{runner};
|
||||
*
|
||||
* What it waits for: The coroutine is re-queued as a job and resumes
|
||||
* when a worker thread picks it up.
|
||||
*
|
||||
* Which thread resumes: A JobQueue worker thread.
|
||||
*
|
||||
* What await_resume() returns: void.
|
||||
*
|
||||
* Dependency Diagram
|
||||
* ==================
|
||||
*
|
||||
* JobQueueAwaiter
|
||||
* +----------------------------------------------+
|
||||
* | + runner : shared_ptr<CoroTaskRunner> |
|
||||
* +----------------------------------------------+
|
||||
* | + await_ready() -> false (always suspend) |
|
||||
* | + await_suspend() -> bool (suspend or cancel) |
|
||||
* | + await_resume() -> void |
|
||||
* +----------------------------------------------+
|
||||
* | |
|
||||
* | uses | uses
|
||||
* v v
|
||||
* CoroTaskRunner JobQueue
|
||||
* .onSuspend() (via runner->post() -> addJob)
|
||||
* .onUndoSuspend()
|
||||
* .post()
|
||||
*
|
||||
* Control Flow (await_suspend)
|
||||
* ============================
|
||||
*
|
||||
* co_await JobQueueAwaiter{runner}
|
||||
* |
|
||||
* +-- await_ready() -> false
|
||||
* +-- await_suspend(handle)
|
||||
* |
|
||||
* +-- runner->onSuspend() // ++nSuspend_
|
||||
* +-- runner->post() // addJob to JobQueue
|
||||
* | |
|
||||
* | +-- success? return true // coroutine stays suspended
|
||||
* | | // worker thread will call resume()
|
||||
* | +-- failure? (JQ stopping)
|
||||
* | +-- runner->onUndoSuspend() // --nSuspend_
|
||||
* | +-- return false // coroutine continues immediately
|
||||
* | // so it can clean up and co_return
|
||||
*
|
||||
* Usage Examples
|
||||
* ==============
|
||||
*
|
||||
* 1. Yield and auto-repost (most common -- replaces yield() + post()):
|
||||
*
|
||||
* CoroTask<void> handler(auto runner) {
|
||||
* doPartA();
|
||||
* co_await JobQueueAwaiter{runner}; // yield + repost
|
||||
* doPartB(); // runs on a worker thread
|
||||
* co_return;
|
||||
* }
|
||||
*
|
||||
* 2. Multiple yield points in a loop:
|
||||
*
|
||||
* CoroTask<void> batchProcessor(auto runner) {
|
||||
* for (auto& item : items) {
|
||||
* process(item);
|
||||
* co_await JobQueueAwaiter{runner}; // let other jobs run
|
||||
* }
|
||||
* co_return;
|
||||
* }
|
||||
*
|
||||
* 3. Graceful shutdown -- checking after resume:
|
||||
*
|
||||
* CoroTask<void> longTask(auto runner, JobQueue& jq) {
|
||||
* while (hasWork()) {
|
||||
* co_await JobQueueAwaiter{runner};
|
||||
* // If JQ is stopping, await_suspend returns false and
|
||||
* // the coroutine continues immediately without re-queuing.
|
||||
* // Always check isStopping() to decide whether to proceed:
|
||||
* if (jq.isStopping())
|
||||
* co_return;
|
||||
* doNextChunk();
|
||||
* }
|
||||
* co_return;
|
||||
* }
|
||||
*
|
||||
* Caveats / Pitfalls
|
||||
* ==================
|
||||
*
|
||||
* BUG-RISK: Using a stale or null runner.
|
||||
* The runner shared_ptr must be valid and point to the CoroTaskRunner
|
||||
* that owns the coroutine currently executing. Passing a runner from
|
||||
* a different coroutine, or a default-constructed shared_ptr, is UB.
|
||||
*
|
||||
* BUG-RISK: Assuming resume happens on the same thread.
|
||||
* After co_await JobQueueAwaiter, the coroutine resumes on whatever
|
||||
* worker thread picks up the job. Do not rely on thread-local state
|
||||
* unless it is managed through LocalValue (which CoroTaskRunner
|
||||
* automatically swaps in/out).
|
||||
*
|
||||
* BUG-RISK: Ignoring the shutdown path.
|
||||
* When the JobQueue is stopping, post() fails and await_suspend()
|
||||
* returns false (coroutine does NOT actually suspend). The coroutine
|
||||
* body continues immediately on the same thread. If your code after
|
||||
* co_await assumes it was re-queued and is running on a worker thread,
|
||||
* that assumption breaks during shutdown. Always handle the "JQ is
|
||||
* stopping" case, either by checking jq.isStopping() or by letting
|
||||
* the coroutine fall through to co_return naturally.
|
||||
*
|
||||
* DIFFERENCE from runner->suspend() + runner->post():
|
||||
* JobQueueAwaiter combines both in one atomic operation. With the
|
||||
* manual suspend()/post() pattern, there is a window between the
|
||||
* two calls where an external event could race. JobQueueAwaiter
|
||||
* removes that window -- onSuspend() and post() happen within the
|
||||
* same await_suspend() call while the coroutine is guaranteed to
|
||||
* be suspended. Prefer JobQueueAwaiter unless you need an external
|
||||
* party to decide *when* to call post().
|
||||
*/
|
||||
struct JobQueueAwaiter
|
||||
{
|
||||
// The CoroTaskRunner that owns the currently executing coroutine.
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
|
||||
|
||||
/**
|
||||
* Always returns false so the coroutine suspends.
|
||||
*/
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment nSuspend (equivalent to yield()) and schedule resume
|
||||
* on the JobQueue (equivalent to post()). If the JobQueue is
|
||||
* stopping, undoes the suspend count and returns false so the
|
||||
* coroutine continues immediately and can clean up.
|
||||
*
|
||||
* @return true if coroutine should stay suspended (job posted);
|
||||
* false if coroutine should continue (JQ stopping)
|
||||
*/
|
||||
bool
|
||||
await_suspend(std::coroutine_handle<>)
|
||||
{
|
||||
runner->onSuspend();
|
||||
if (!runner->post())
|
||||
{
|
||||
// JobQueue is stopping. Undo the suspend count and
|
||||
// don't actually suspend — the coroutine continues
|
||||
// immediately so it can clean up and co_return.
|
||||
runner->onUndoSuspend();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() const noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -15,9 +15,10 @@
|
||||
|
||||
// Add new amendments to the top of this list.
|
||||
// Keep it sorted in reverse chronological order.
|
||||
XRPL_FEATURE(BatchV1_1, Supported::no, VoteBehavior::DefaultNo)
|
||||
|
||||
XRPL_FIX (PermissionedDomainInvariant, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (ExpiredNFTokenOfferRemoval, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (BatchInnerSigs, Supported::no, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(LendingProtocol, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(PermissionDelegationV1_1, Supported::no, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (DirectoryLimit, Supported::yes, VoteBehavior::DefaultNo)
|
||||
@@ -31,6 +32,7 @@ XRPL_FEATURE(TokenEscrow, Supported::yes, VoteBehavior::DefaultNo
|
||||
XRPL_FIX (EnforceNFTokenTrustlineV2, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (AMMv1_3, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(PermissionedDEX, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(Batch, Supported::no, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(SingleAssetVault, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (PayChanCancelAfter, Supported::yes, VoteBehavior::DefaultNo)
|
||||
// Check flags in Credential transactions
|
||||
|
||||
@@ -918,7 +918,7 @@ TRANSACTION(ttVAULT_CLAWBACK, 70, VaultClawback,
|
||||
#endif
|
||||
TRANSACTION(ttBATCH, 71, Batch,
|
||||
Delegation::notDelegable,
|
||||
featureBatchV1_1,
|
||||
featureBatch,
|
||||
noPriv,
|
||||
({
|
||||
{sfRawTransactions, soeREQUIRED},
|
||||
|
||||
@@ -162,6 +162,9 @@ public:
|
||||
static NotTEC
|
||||
checkSign(PreclaimContext const& ctx);
|
||||
|
||||
static NotTEC
|
||||
checkBatchSign(PreclaimContext const& ctx);
|
||||
|
||||
// Returns the fee in fee units, not scaled for load.
|
||||
static XRPAmount
|
||||
calculateBaseFee(ReadView const& view, STTx const& tx);
|
||||
@@ -290,7 +293,14 @@ protected:
|
||||
std::optional<T> value,
|
||||
unit::ValueUnit<Unit, T> min = unit::ValueUnit<Unit, T>{});
|
||||
|
||||
protected:
|
||||
private:
|
||||
std::pair<TER, XRPAmount>
|
||||
reset(XRPAmount fee);
|
||||
|
||||
TER
|
||||
consumeSeqProxy(SLE::pointer const& sleAccount);
|
||||
TER
|
||||
payFee();
|
||||
static NotTEC
|
||||
checkSingleSign(
|
||||
ReadView const& view,
|
||||
@@ -306,15 +316,6 @@ protected:
|
||||
STObject const& sigObject,
|
||||
beast::Journal const j);
|
||||
|
||||
private:
|
||||
std::pair<TER, XRPAmount>
|
||||
reset(XRPAmount fee);
|
||||
|
||||
TER
|
||||
consumeSeqProxy(SLE::pointer const& sleAccount);
|
||||
TER
|
||||
payFee();
|
||||
|
||||
void trapTransaction(uint256) const;
|
||||
|
||||
/** Performs early sanity checks on the account and fee fields.
|
||||
|
||||
@@ -27,9 +27,6 @@ public:
|
||||
static NotTEC
|
||||
preflightSigValidated(PreflightContext const& ctx);
|
||||
|
||||
static NotTEC
|
||||
checkBatchSign(PreclaimContext const& ctx);
|
||||
|
||||
static NotTEC
|
||||
checkSign(PreclaimContext const& ctx);
|
||||
|
||||
|
||||
@@ -278,8 +278,6 @@ STTx::checkBatchSign(Rules const& rules) const
|
||||
JLOG(debugLog().fatal()) << "not a batch transaction";
|
||||
return Unexpected("Not a batch transaction.");
|
||||
}
|
||||
if (!isFieldPresent(sfBatchSigners))
|
||||
return Unexpected("Missing BatchSigners field.");
|
||||
STArray const& signers{getFieldArray(sfBatchSigners)};
|
||||
for (auto const& signer : signers)
|
||||
{
|
||||
@@ -294,8 +292,9 @@ STTx::checkBatchSign(Rules const& rules) const
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
return Unexpected(std::string("Internal batch signature check failure: ") + e.what());
|
||||
JLOG(debugLog().error()) << "Batch signature check failed: " << e.what();
|
||||
}
|
||||
return Unexpected("Internal batch signature check failure.");
|
||||
}
|
||||
|
||||
Json::Value
|
||||
@@ -417,7 +416,6 @@ STTx::checkBatchSingleSign(STObject const& batchSigner) const
|
||||
{
|
||||
Serializer msg;
|
||||
serializeBatch(msg, getFlags(), getBatchTransactionIDs());
|
||||
finishMultiSigningData(batchSigner.getAccountID(sfAccount), msg);
|
||||
return singleSignHelper(batchSigner, msg.slice());
|
||||
}
|
||||
|
||||
@@ -490,7 +488,7 @@ multiSignHelper(
|
||||
if (!validSig)
|
||||
return Unexpected(
|
||||
std::string("Invalid signature on account ") + toBase58(accountID) +
|
||||
(errorWhat ? ": " + *errorWhat : "") + ".");
|
||||
errorWhat.value_or("") + ".");
|
||||
}
|
||||
// All signatures verified.
|
||||
return {};
|
||||
|
||||
@@ -175,12 +175,12 @@ Transactor::preflight1(PreflightContext const& ctx, std::uint32_t flagMask)
|
||||
if (ctx.tx.getSeqProxy().isTicket() && ctx.tx.isFieldPresent(sfAccountTxnID))
|
||||
return temINVALID;
|
||||
|
||||
if (ctx.tx.isFlag(tfInnerBatchTxn) && !ctx.rules.enabled(featureBatchV1_1))
|
||||
if (ctx.tx.isFlag(tfInnerBatchTxn) && !ctx.rules.enabled(featureBatch))
|
||||
return temINVALID_FLAG;
|
||||
|
||||
XRPL_ASSERT(
|
||||
ctx.tx.isFlag(tfInnerBatchTxn) == ctx.parentBatchId.has_value() ||
|
||||
!ctx.rules.enabled(featureBatchV1_1),
|
||||
!ctx.rules.enabled(featureBatch),
|
||||
"Inner batch transaction must have a parent batch ID.");
|
||||
|
||||
return tesSUCCESS;
|
||||
@@ -196,13 +196,13 @@ Transactor::preflight2(PreflightContext const& ctx)
|
||||
return *ret;
|
||||
|
||||
// It should be impossible for the InnerBatchTxn flag to be set without
|
||||
// featureBatchV1_1 being enabled
|
||||
// featureBatch being enabled
|
||||
XRPL_ASSERT_PARTS(
|
||||
!ctx.tx.isFlag(tfInnerBatchTxn) || ctx.rules.enabled(featureBatchV1_1),
|
||||
!ctx.tx.isFlag(tfInnerBatchTxn) || ctx.rules.enabled(featureBatch),
|
||||
"xrpl::Transactor::preflight2",
|
||||
"InnerBatch flag only set if feature enabled");
|
||||
// Skip signature check on batch inner transactions
|
||||
if (ctx.tx.isFlag(tfInnerBatchTxn) && ctx.rules.enabled(featureBatchV1_1))
|
||||
if (ctx.tx.isFlag(tfInnerBatchTxn) && ctx.rules.enabled(featureBatch))
|
||||
return tesSUCCESS;
|
||||
// Do not add any checks after this point that are relevant for
|
||||
// batch inner transactions. They will be skipped.
|
||||
@@ -647,7 +647,7 @@ Transactor::checkSign(
|
||||
|
||||
auto const pkSigner = sigObject.getFieldVL(sfSigningPubKey);
|
||||
// Ignore signature check on batch inner transactions
|
||||
if (parentBatchId && view.rules().enabled(featureBatchV1_1))
|
||||
if (parentBatchId && view.rules().enabled(featureBatch))
|
||||
{
|
||||
// Defensive Check: These values are also checked in Batch::preflight
|
||||
if (sigObject.isFieldPresent(sfTxnSignature) || !pkSigner.empty() ||
|
||||
@@ -699,6 +699,50 @@ Transactor::checkSign(PreclaimContext const& ctx)
|
||||
return checkSign(ctx.view, ctx.flags, ctx.parentBatchId, idAccount, ctx.tx, ctx.j);
|
||||
}
|
||||
|
||||
NotTEC
|
||||
Transactor::checkBatchSign(PreclaimContext const& ctx)
|
||||
{
|
||||
NotTEC ret = tesSUCCESS;
|
||||
STArray const& signers{ctx.tx.getFieldArray(sfBatchSigners)};
|
||||
for (auto const& signer : signers)
|
||||
{
|
||||
auto const idAccount = signer.getAccountID(sfAccount);
|
||||
|
||||
Blob const& pkSigner = signer.getFieldVL(sfSigningPubKey);
|
||||
if (pkSigner.empty())
|
||||
{
|
||||
if (ret = checkMultiSign(ctx.view, ctx.flags, idAccount, signer, ctx.j);
|
||||
!isTesSuccess(ret))
|
||||
return ret;
|
||||
}
|
||||
else
|
||||
{
|
||||
// LCOV_EXCL_START
|
||||
if (!publicKeyType(makeSlice(pkSigner)))
|
||||
return tefBAD_AUTH;
|
||||
// LCOV_EXCL_STOP
|
||||
|
||||
auto const idSigner = calcAccountID(PublicKey(makeSlice(pkSigner)));
|
||||
auto const sleAccount = ctx.view.read(keylet::account(idAccount));
|
||||
|
||||
// A batch can include transactions from an un-created account ONLY
|
||||
// when the account master key is the signer
|
||||
if (!sleAccount)
|
||||
{
|
||||
if (idAccount != idSigner)
|
||||
return tefBAD_AUTH;
|
||||
|
||||
return tesSUCCESS;
|
||||
}
|
||||
|
||||
if (ret = checkSingleSign(ctx.view, idSigner, idAccount, sleAccount, ctx.j);
|
||||
!isTesSuccess(ret))
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
NotTEC
|
||||
Transactor::checkSingleSign(
|
||||
ReadView const& view,
|
||||
|
||||
@@ -24,12 +24,29 @@ checkValidity(HashRouter& router, STTx const& tx, Rules const& rules)
|
||||
auto const flags = router.getFlags(id);
|
||||
|
||||
// Ignore signature check on batch inner transactions
|
||||
if (tx.isFlag(tfInnerBatchTxn) && rules.enabled(featureBatchV1_1))
|
||||
if (tx.isFlag(tfInnerBatchTxn) && rules.enabled(featureBatch))
|
||||
{
|
||||
// Defensive Check: These values are also checked in Batch::preflight
|
||||
if (tx.isFieldPresent(sfTxnSignature) || !tx.getSigningPubKey().empty() ||
|
||||
tx.isFieldPresent(sfSigners))
|
||||
return {Validity::SigBad, "Malformed: Invalid inner batch transaction."};
|
||||
|
||||
// This block should probably have never been included in the
|
||||
// original `Batch` implementation. An inner transaction never
|
||||
// has a valid signature.
|
||||
bool const neverValid = rules.enabled(fixBatchInnerSigs);
|
||||
if (!neverValid)
|
||||
{
|
||||
std::string reason;
|
||||
if (!passesLocalChecks(tx, reason))
|
||||
{
|
||||
router.setFlags(id, SF_LOCALBAD);
|
||||
return {Validity::SigGoodOnly, reason};
|
||||
}
|
||||
|
||||
router.setFlags(id, SF_SIGGOOD);
|
||||
return {Validity::Valid, ""};
|
||||
}
|
||||
}
|
||||
|
||||
if (any(flags & SF_SIGBAD))
|
||||
|
||||
@@ -107,18 +107,7 @@ Batch::calculateBaseFee(ReadView const& view, STTx const& tx)
|
||||
if (signer.isFieldPresent(sfTxnSignature))
|
||||
signerCount += 1;
|
||||
else if (signer.isFieldPresent(sfSigners))
|
||||
{
|
||||
auto const& nestedSigners = signer.getFieldArray(sfSigners);
|
||||
// LCOV_EXCL_START
|
||||
if (nestedSigners.size() > STTx::maxMultiSigners)
|
||||
{
|
||||
JLOG(debugLog().error())
|
||||
<< "BatchTrace: Nested Signers array exceeds max entries.";
|
||||
return XRPAmount{INITIAL_XRP};
|
||||
}
|
||||
// LCOV_EXCL_STOP
|
||||
signerCount += nestedSigners.size();
|
||||
}
|
||||
signerCount += signer.getFieldArray(sfSigners).size();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,14 +205,6 @@ Batch::preflight(PreflightContext const& ctx)
|
||||
return temARRAY_TOO_LARGE;
|
||||
}
|
||||
|
||||
if (ctx.tx.isFieldPresent(sfBatchSigners) &&
|
||||
ctx.tx.getFieldArray(sfBatchSigners).size() > maxBatchTxCount)
|
||||
{
|
||||
JLOG(ctx.j.debug()) << "BatchTrace[" << parentBatchId << "]:"
|
||||
<< "signers array exceeds 8 entries.";
|
||||
return temARRAY_TOO_LARGE;
|
||||
}
|
||||
|
||||
// Validation Inner Batch Txns
|
||||
std::unordered_set<uint256> uniqueHashes;
|
||||
std::unordered_map<AccountID, std::unordered_set<std::uint32_t>> accountSeqTicket;
|
||||
@@ -445,7 +426,7 @@ Batch::preflightSigValidated(PreflightContext const& ctx)
|
||||
if (requiredSigners.erase(signerAccount) == 0)
|
||||
{
|
||||
JLOG(ctx.j.debug()) << "BatchTrace[" << parentBatchId << "]: "
|
||||
<< "extra signer provided: " << signerAccount;
|
||||
<< "no account signature for inner txn.";
|
||||
return temBAD_SIGNER;
|
||||
}
|
||||
}
|
||||
@@ -470,54 +451,6 @@ Batch::preflightSigValidated(PreflightContext const& ctx)
|
||||
return tesSUCCESS;
|
||||
}
|
||||
|
||||
NotTEC
|
||||
Batch::checkBatchSign(PreclaimContext const& ctx)
|
||||
{
|
||||
NotTEC ret = tesSUCCESS;
|
||||
STArray const& signers{ctx.tx.getFieldArray(sfBatchSigners)};
|
||||
for (auto const& signer : signers)
|
||||
{
|
||||
auto const idAccount = signer.getAccountID(sfAccount);
|
||||
|
||||
Blob const& pkSigner = signer.getFieldVL(sfSigningPubKey);
|
||||
if (pkSigner.empty())
|
||||
{
|
||||
if (ret = checkMultiSign(ctx.view, ctx.flags, idAccount, signer, ctx.j);
|
||||
!isTesSuccess(ret))
|
||||
return ret;
|
||||
}
|
||||
else
|
||||
{
|
||||
// LCOV_EXCL_START
|
||||
if (!publicKeyType(makeSlice(pkSigner)))
|
||||
return tefBAD_AUTH;
|
||||
// LCOV_EXCL_STOP
|
||||
|
||||
auto const idSigner = calcAccountID(PublicKey(makeSlice(pkSigner)));
|
||||
auto const sleAccount = ctx.view.read(keylet::account(idAccount));
|
||||
|
||||
if (sleAccount)
|
||||
{
|
||||
if (isPseudoAccount(sleAccount))
|
||||
return tefBAD_AUTH;
|
||||
|
||||
if (ret = checkSingleSign(ctx.view, idSigner, idAccount, sleAccount, ctx.j);
|
||||
!isTesSuccess(ret))
|
||||
return ret;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (idAccount != idSigner)
|
||||
return tefBAD_AUTH;
|
||||
|
||||
// A batch can include transactions from an un-created account ONLY
|
||||
// when the account master key is the signer
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Checks the validity of signatures for a batch transaction.
|
||||
*
|
||||
@@ -526,7 +459,7 @@ Batch::checkBatchSign(PreclaimContext const& ctx)
|
||||
* corresponding error code.
|
||||
*
|
||||
* Next, it verifies the batch-specific signature requirements by calling
|
||||
* Batch::checkBatchSign. If this check fails, it also returns the
|
||||
* Transactor::checkBatchSign. If this check fails, it also returns the
|
||||
* corresponding error code.
|
||||
*
|
||||
* If both checks succeed, the function returns tesSUCCESS.
|
||||
@@ -541,11 +474,8 @@ Batch::checkSign(PreclaimContext const& ctx)
|
||||
if (auto ret = Transactor::checkSign(ctx); !isTesSuccess(ret))
|
||||
return ret;
|
||||
|
||||
if (ctx.tx.isFieldPresent(sfBatchSigners))
|
||||
{
|
||||
if (auto ret = checkBatchSign(ctx); !isTesSuccess(ret))
|
||||
return ret;
|
||||
}
|
||||
if (auto ret = Transactor::checkBatchSign(ctx); !isTesSuccess(ret))
|
||||
return ret;
|
||||
|
||||
return tesSUCCESS;
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ LoanSet::preflight(PreflightContext const& ctx)
|
||||
auto const& tx = ctx.tx;
|
||||
|
||||
// Special case for Batch inner transactions
|
||||
if (tx.isFlag(tfInnerBatchTxn) && ctx.rules.enabled(featureBatchV1_1) &&
|
||||
if (tx.isFlag(tfInnerBatchTxn) && ctx.rules.enabled(featureBatch) &&
|
||||
!tx.isFieldPresent(sfCounterparty))
|
||||
{
|
||||
auto const parentBatchId = ctx.parentBatchId.value_or(uint256{0});
|
||||
|
||||
@@ -141,11 +141,14 @@ class Batch_test : public beast::unit_test::suite
|
||||
using namespace test::jtx;
|
||||
using namespace std::literals;
|
||||
|
||||
bool const withInnerSigFix = features[fixBatchInnerSigs];
|
||||
|
||||
for (bool const withBatch : {true, false})
|
||||
{
|
||||
testcase << "enabled: Batch " << (withBatch ? "enabled" : "disabled");
|
||||
testcase << "enabled: Batch " << (withBatch ? "enabled" : "disabled")
|
||||
<< ", Inner Sig Fix: " << (withInnerSigFix ? "enabled" : "disabled");
|
||||
|
||||
auto const amend = withBatch ? features : features - featureBatchV1_1;
|
||||
auto const amend = withBatch ? features : features - featureBatch;
|
||||
|
||||
test::jtx::Env env{*this, amend};
|
||||
|
||||
@@ -550,7 +553,6 @@ class Batch_test : public beast::unit_test::suite
|
||||
|
||||
Serializer msg;
|
||||
serializeBatch(msg, tfAllOrNothing, jt.stx->getBatchTransactionIDs());
|
||||
finishMultiSigningData(bob.id(), msg);
|
||||
auto const sig = xrpl::sign(bob.pk(), bob.sk(), msg.slice());
|
||||
jt.jv[sfBatchSigners.jsonName][0u][sfBatchSigner.jsonName][sfAccount.jsonName] =
|
||||
bob.human();
|
||||
@@ -1403,7 +1405,7 @@ class Batch_test : public beast::unit_test::suite
|
||||
env.close();
|
||||
}
|
||||
|
||||
// temARRAY_TOO_LARGE: Batch preflight: signers array exceeds 8 entries.
|
||||
// temARRAY_TOO_LARGE: Batch: signers array exceeds 8 entries.
|
||||
{
|
||||
test::jtx::Env env{*this, features};
|
||||
|
||||
@@ -2189,16 +2191,22 @@ class Batch_test : public beast::unit_test::suite
|
||||
void
|
||||
doTestInnerSubmitRPC(FeatureBitset features, bool withBatch)
|
||||
{
|
||||
std::string const testName =
|
||||
std::string("inner submit rpc: batch ") + (withBatch ? "enabled" : "disabled") + ": ";
|
||||
bool const withInnerSigFix = features[fixBatchInnerSigs];
|
||||
|
||||
auto const amend = withBatch ? features : features - featureBatchV1_1;
|
||||
std::string const testName = [&]() {
|
||||
std::stringstream ss;
|
||||
ss << "inner submit rpc: batch " << (withBatch ? "enabled" : "disabled")
|
||||
<< ", inner sig fix: " << (withInnerSigFix ? "enabled" : "disabled") << ": ";
|
||||
return ss.str();
|
||||
}();
|
||||
|
||||
auto const amend = withBatch ? features : features - featureBatch;
|
||||
|
||||
using namespace test::jtx;
|
||||
using namespace std::literals;
|
||||
|
||||
test::jtx::Env env{*this, amend};
|
||||
if (!BEAST_EXPECT(amend[featureBatchV1_1] == withBatch))
|
||||
if (!BEAST_EXPECT(amend[featureBatch] == withBatch))
|
||||
return;
|
||||
|
||||
auto const alice = Account("alice");
|
||||
@@ -2320,7 +2328,8 @@ class Batch_test : public beast::unit_test::suite
|
||||
s.slice(),
|
||||
__LINE__,
|
||||
"fails local checks: Empty SigningPubKey.",
|
||||
"fails local checks: Empty SigningPubKey.");
|
||||
"fails local checks: Empty SigningPubKey.",
|
||||
withBatch && !withInnerSigFix);
|
||||
}
|
||||
|
||||
// Invalid RPC Submission: tfInnerBatchTxn pseudo-transaction
|
||||
@@ -2331,7 +2340,7 @@ class Batch_test : public beast::unit_test::suite
|
||||
{
|
||||
STTx amendTx(ttAMENDMENT, [seq = env.closed()->header().seq + 1](auto& obj) {
|
||||
obj.setAccountID(sfAccount, AccountID());
|
||||
obj.setFieldH256(sfAmendment, featureBatchV1_1);
|
||||
obj.setFieldH256(sfAmendment, fixBatchInnerSigs);
|
||||
obj.setFieldU32(sfLedgerSequence, seq);
|
||||
obj.setFieldU32(sfFlags, tfInnerBatchTxn);
|
||||
});
|
||||
@@ -2343,7 +2352,8 @@ class Batch_test : public beast::unit_test::suite
|
||||
"Pseudo-transaction",
|
||||
s.slice(),
|
||||
__LINE__,
|
||||
"fails local checks: Empty SigningPubKey.",
|
||||
withInnerSigFix ? "fails local checks: Empty SigningPubKey."
|
||||
: "fails local checks: Cannot submit pseudo transactions.",
|
||||
"fails local checks: Empty SigningPubKey.");
|
||||
}
|
||||
}
|
||||
@@ -2404,53 +2414,6 @@ class Batch_test : public beast::unit_test::suite
|
||||
BEAST_EXPECT(env.balance(bob) == XRP(1000));
|
||||
}
|
||||
|
||||
void
|
||||
testCheckAllSignatures(FeatureBitset features)
|
||||
{
|
||||
testcase("check all signatures");
|
||||
|
||||
using namespace test::jtx;
|
||||
using namespace std::literals;
|
||||
|
||||
// Verifies that checkBatchSign validates all signers even when an
|
||||
// unfunded account (signed with its master key) appears first in the
|
||||
// sorted signer list. A funded account with an invalid signature must
|
||||
// still be rejected with tefBAD_AUTH.
|
||||
|
||||
test::jtx::Env env{*this, features};
|
||||
|
||||
auto const alice = Account("alice");
|
||||
// "aaa" sorts before other accounts alphabetically, ensuring the
|
||||
// unfunded account is checked first in the sorted signer list
|
||||
auto const unfunded = Account("aaa");
|
||||
auto const carol = Account("carol");
|
||||
env.fund(XRP(10000), alice, carol);
|
||||
env.close();
|
||||
|
||||
// Verify sort order: unfunded.id() < carol.id()
|
||||
BEAST_EXPECT(unfunded.id() < carol.id());
|
||||
|
||||
auto const seq = env.seq(alice);
|
||||
auto const ledSeq = env.current()->seq();
|
||||
auto const batchFee = batch::calcBatchFee(env, 2, 3);
|
||||
|
||||
// The batch includes:
|
||||
// 1. alice pays unfunded (to create unfunded's account)
|
||||
// 2. unfunded does a noop (signed by unfunded's master key - valid)
|
||||
// 3. carol pays alice (signed by alice's key - INVALID since alice is
|
||||
// not carol's regular key)
|
||||
//
|
||||
// checkBatchSign must validate all signers regardless of order.
|
||||
// This must fail with tefBAD_AUTH.
|
||||
env(batch::outer(alice, seq, batchFee, tfAllOrNothing),
|
||||
batch::inner(pay(alice, unfunded, XRP(100)), seq + 1),
|
||||
batch::inner(noop(unfunded), ledSeq),
|
||||
batch::inner(pay(carol, alice, XRP(1000)), env.seq(carol)),
|
||||
batch::sig(unfunded, Reg{carol, alice}),
|
||||
ter(tefBAD_AUTH));
|
||||
env.close();
|
||||
}
|
||||
|
||||
void
|
||||
testAccountSet(FeatureBitset features)
|
||||
{
|
||||
@@ -4368,7 +4331,6 @@ class Batch_test : public beast::unit_test::suite
|
||||
testIndependent(features);
|
||||
testInnerSubmitRPC(features);
|
||||
testAccountActivation(features);
|
||||
testCheckAllSignatures(features);
|
||||
testAccountSet(features);
|
||||
testAccountDelete(features);
|
||||
testLoan(features);
|
||||
@@ -4394,6 +4356,7 @@ public:
|
||||
{
|
||||
using namespace test::jtx;
|
||||
auto const sa = testable_amendments();
|
||||
testWithFeats(sa - fixBatchInnerSigs);
|
||||
testWithFeats(sa);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -5340,20 +5340,20 @@ class Vault_test : public beast::unit_test::suite
|
||||
env.close();
|
||||
|
||||
// 2. Mantissa larger than uint64 max
|
||||
env.set_parse_failure_expected(true);
|
||||
try
|
||||
{
|
||||
tx[sfAssetsMaximum] = "18446744073709551617e5"; // uint64 max + 1
|
||||
env(tx, THISLINE);
|
||||
BEAST_EXPECT(false);
|
||||
BEAST_EXPECTS(false, "Expected parse_error for mantissa larger than uint64 max");
|
||||
}
|
||||
catch (parse_error const& e)
|
||||
{
|
||||
using namespace std::string_literals;
|
||||
BEAST_EXPECT(
|
||||
e.what() ==
|
||||
"invalidParamsField 'tx_json.AssetsMaximum' has invalid "
|
||||
"data."s);
|
||||
e.what() == "invalidParamsField 'tx_json.AssetsMaximum' has invalid data."s);
|
||||
}
|
||||
env.set_parse_failure_expected(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
537
src/test/core/CoroTask_test.cpp
Normal file
537
src/test/core/CoroTask_test.cpp
Normal file
@@ -0,0 +1,537 @@
|
||||
#include <test/jtx.h>
|
||||
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/core/JobQueueAwaiter.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
|
||||
namespace xrpl {
|
||||
namespace test {
|
||||
|
||||
/**
|
||||
* Test suite for the C++20 coroutine primitives: CoroTask, CoroTaskRunner,
|
||||
* and JobQueueAwaiter.
|
||||
*
|
||||
* Dependency Diagram
|
||||
* ==================
|
||||
*
|
||||
* CoroTask_test
|
||||
* +-------------------------------------------------+
|
||||
* | + gate (inner class) : condition_variable helper |
|
||||
* +-------------------------------------------------+
|
||||
* | uses
|
||||
* v
|
||||
* jtx::Env --> JobQueue::postCoroTask()
|
||||
* |
|
||||
* +-- CoroTaskRunner (suspend / post / resume)
|
||||
* +-- CoroTask<void> / CoroTask<T>
|
||||
* +-- JobQueueAwaiter
|
||||
*
|
||||
* Test Coverage Matrix
|
||||
* ====================
|
||||
*
|
||||
* Test | Primitives exercised
|
||||
* --------------------------+----------------------------------------------
|
||||
* testVoidCompletion | CoroTask<void> basic lifecycle
|
||||
* testCorrectOrder | suspend() -> join() -> post() -> complete
|
||||
* testIncorrectOrder | post() before suspend() (race-safe path)
|
||||
* testJobQueueAwaiter | JobQueueAwaiter suspend + auto-repost
|
||||
* testThreadSpecificStorage | LocalValue isolation across coroutines
|
||||
* testExceptionPropagation | unhandled_exception() in promise_type
|
||||
* testMultipleYields | N sequential suspend/resume cycles
|
||||
* testValueReturn | CoroTask<T> co_return value
|
||||
* testValueException | CoroTask<T> exception via co_await
|
||||
* testValueChaining | nested CoroTask<T> -> CoroTask<T>
|
||||
* testShutdownRejection | postCoroTask returns nullptr when stopping
|
||||
*/
|
||||
class CoroTask_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Simple one-shot gate for synchronizing between test thread
|
||||
* and coroutine worker threads. signal() sets the flag;
|
||||
* wait_for() blocks until signaled or timeout.
|
||||
*/
|
||||
class gate
|
||||
{
|
||||
private:
|
||||
std::condition_variable cv_;
|
||||
std::mutex mutex_;
|
||||
bool signaled_ = false;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Block until signaled or timeout expires.
|
||||
*
|
||||
* @param rel_time Maximum duration to wait
|
||||
*
|
||||
* @return true if signaled before timeout
|
||||
*/
|
||||
template <class Rep, class Period>
|
||||
bool
|
||||
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex_);
|
||||
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
|
||||
signaled_ = false;
|
||||
return b;
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal the gate, waking any waiting thread.
|
||||
*/
|
||||
void
|
||||
signal()
|
||||
{
|
||||
std::lock_guard lk(mutex_);
|
||||
signaled_ = true;
|
||||
cv_.notify_all();
|
||||
}
|
||||
};
|
||||
|
||||
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
|
||||
// pointer-by-value captures instead of [&] to work around a GCC 14
|
||||
// bug where reference captures in coroutine lambdas are corrupted
|
||||
// in the coroutine frame.
|
||||
|
||||
/**
|
||||
* CoroTask<void> runs to completion and runner becomes non-runnable.
|
||||
*/
|
||||
void
|
||||
testVoidCompletion()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("void completion");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* Correct order: suspend, join, post, complete.
|
||||
* Mirrors existing Coroutine_test::correct_order.
|
||||
*/
|
||||
void
|
||||
testCorrectOrder()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("correct order");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g1, g2;
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> r;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[rp = &r, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
|
||||
*rp = runner;
|
||||
g1p->signal();
|
||||
co_await runner->suspend();
|
||||
g2p->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g1.wait_for(5s));
|
||||
runner->join();
|
||||
runner->post();
|
||||
BEAST_EXPECT(g2.wait_for(5s));
|
||||
runner->join();
|
||||
}
|
||||
|
||||
/**
|
||||
* Incorrect order: post() before suspend(). Verifies the
|
||||
* race-safe path. Mirrors Coroutine_test::incorrect_order.
|
||||
*/
|
||||
void
|
||||
testIncorrectOrder()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("incorrect order");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto runner) -> CoroTask<void> {
|
||||
runner->post();
|
||||
co_await runner->suspend();
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
}
|
||||
|
||||
/**
|
||||
* JobQueueAwaiter suspend + auto-repost across multiple yield points.
|
||||
*/
|
||||
void
|
||||
testJobQueueAwaiter()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("JobQueueAwaiter");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int step = 0;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [sp = &step, gp = &g](auto runner) -> CoroTask<void> {
|
||||
*sp = 1;
|
||||
co_await JobQueueAwaiter{runner};
|
||||
*sp = 2;
|
||||
co_await JobQueueAwaiter{runner};
|
||||
*sp = 3;
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(step == 3);
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-coroutine LocalValue isolation. Each coroutine sees its own
|
||||
* copy of thread-local state. Mirrors Coroutine_test::thread_specific_storage.
|
||||
*/
|
||||
void
|
||||
testThreadSpecificStorage()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("thread specific storage");
|
||||
Env env(*this);
|
||||
|
||||
auto& jq = env.app().getJobQueue();
|
||||
|
||||
static int const N = 4;
|
||||
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
|
||||
|
||||
LocalValue<int> lv(-1);
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
|
||||
gate g;
|
||||
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
|
||||
this->BEAST_EXPECT(*lv == -1);
|
||||
*lv = -2;
|
||||
this->BEAST_EXPECT(*lv == -2);
|
||||
g.signal();
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
jq.postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
|
||||
(*ap)[id] = runner;
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
|
||||
this->BEAST_EXPECT(**lvp == -1);
|
||||
**lvp = id;
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
a[i]->join();
|
||||
}
|
||||
for (auto const& r : a)
|
||||
{
|
||||
r->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
r->join();
|
||||
}
|
||||
for (auto const& r : a)
|
||||
{
|
||||
r->post();
|
||||
r->join();
|
||||
}
|
||||
|
||||
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
|
||||
this->BEAST_EXPECT(*lv == -2);
|
||||
g.signal();
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Exception thrown in coroutine body is caught by
|
||||
* promise_type::unhandled_exception(). Coroutine completes.
|
||||
*/
|
||||
void
|
||||
testExceptionPropagation()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("exception propagation");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
|
||||
gp->signal();
|
||||
throw std::runtime_error("test exception");
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
// The exception is caught by promise_type::unhandled_exception()
|
||||
// and the coroutine is considered done
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* Multiple sequential suspend/resume cycles via co_await.
|
||||
*/
|
||||
void
|
||||
testMultipleYields()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("multiple yields");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int counter = 0;
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> r;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[rp = &r, cp = &counter, gp = &g](auto runner) -> CoroTask<void> {
|
||||
*rp = runner;
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 1);
|
||||
runner->join();
|
||||
|
||||
runner->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 2);
|
||||
runner->join();
|
||||
|
||||
runner->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 3);
|
||||
runner->join();
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* CoroTask<T> returns a value via co_return. Outer coroutine
|
||||
* extracts it with co_await.
|
||||
*/
|
||||
void
|
||||
testValueReturn()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value return");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int result = 0;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
|
||||
auto inner = []() -> CoroTask<int> { co_return 42; };
|
||||
*rp = co_await inner();
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(result == 42);
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* CoroTask<T> propagates exceptions from inner coroutines.
|
||||
* Outer coroutine catches via try/catch around co_await.
|
||||
*/
|
||||
void
|
||||
testValueException()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value exception");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
bool caught = false;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [cp = &caught, gp = &g](auto) -> CoroTask<void> {
|
||||
auto inner = []() -> CoroTask<int> {
|
||||
throw std::runtime_error("inner error");
|
||||
co_return 0;
|
||||
};
|
||||
try
|
||||
{
|
||||
co_await inner();
|
||||
}
|
||||
catch (std::runtime_error const& e)
|
||||
{
|
||||
*cp = true;
|
||||
}
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(caught);
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* CoroTask<T> chaining. Nested value-returning coroutines
|
||||
* compose via co_await.
|
||||
*/
|
||||
void
|
||||
testValueChaining()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value chaining");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int result = 0;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
|
||||
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
|
||||
auto mul = [add](int a, int b) -> CoroTask<int> {
|
||||
int sum = co_await add(a, b);
|
||||
co_return sum * 2;
|
||||
};
|
||||
*rp = co_await mul(3, 4);
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(result == 14); // (3 + 4) * 2
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* postCoroTask returns nullptr when JobQueue is stopping.
|
||||
*/
|
||||
void
|
||||
testShutdownRejection()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("shutdown rejection");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
// Stop the JobQueue
|
||||
env.app().getJobQueue().stop();
|
||||
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [](auto) -> CoroTask<void> { co_return; });
|
||||
BEAST_EXPECT(!runner);
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testVoidCompletion();
|
||||
testCorrectOrder();
|
||||
testIncorrectOrder();
|
||||
testJobQueueAwaiter();
|
||||
testThreadSpecificStorage();
|
||||
testExceptionPropagation();
|
||||
testMultipleYields();
|
||||
testValueReturn();
|
||||
testValueException();
|
||||
testValueChaining();
|
||||
testShutdownRejection();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
|
||||
|
||||
} // namespace test
|
||||
} // namespace xrpl
|
||||
@@ -74,7 +74,6 @@ sig::operator()(Env& env, JTx& jt) const
|
||||
|
||||
Serializer msg;
|
||||
serializeBatch(msg, stx.getFlags(), stx.getBatchTransactionIDs());
|
||||
finishMultiSigningData(e.acct.id(), msg);
|
||||
auto const sig = xrpl::sign(*publicKeyType(e.sig.pk().slice()), e.sig.sk(), msg.slice());
|
||||
jo[sfTxnSignature.getJsonName()] = strHex(Slice{sig.data(), sig.size()});
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ class Feature_test : public beast::unit_test::suite
|
||||
// or removed, swap out for any other feature.
|
||||
BEAST_EXPECT(
|
||||
featureToName(fixRemoveNFTokenAutoTrustLine) == "fixRemoveNFTokenAutoTrustLine");
|
||||
BEAST_EXPECT(featureToName(featureBatchV1_1) == "BatchV1_1");
|
||||
BEAST_EXPECT(featureToName(featureBatch) == "Batch");
|
||||
BEAST_EXPECT(featureToName(featureDID) == "DID");
|
||||
BEAST_EXPECT(featureToName(fixIncludeKeyletFields) == "fixIncludeKeyletFields");
|
||||
BEAST_EXPECT(featureToName(featureTokenEscrow) == "TokenEscrow");
|
||||
|
||||
@@ -1119,8 +1119,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
|
||||
}
|
||||
|
||||
// Enforce Network bar for batch txn
|
||||
if (iTrans->isFlag(tfInnerBatchTxn) &&
|
||||
m_ledgerMaster.getValidatedRules().enabled(featureBatchV1_1))
|
||||
if (iTrans->isFlag(tfInnerBatchTxn) && m_ledgerMaster.getValidatedRules().enabled(featureBatch))
|
||||
{
|
||||
JLOG(m_journal.error()) << "Submitted transaction invalid: tfInnerBatchTxn flag present.";
|
||||
return;
|
||||
@@ -1186,7 +1185,7 @@ NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
|
||||
// under no circumstances will we ever accept an inner txn within a batch
|
||||
// txn from the network.
|
||||
auto const sttx = *transaction->getSTransaction();
|
||||
if (sttx.isFlag(tfInnerBatchTxn) && view->rules().enabled(featureBatchV1_1))
|
||||
if (sttx.isFlag(tfInnerBatchTxn) && view->rules().enabled(featureBatch))
|
||||
{
|
||||
transaction->setStatus(INVALID);
|
||||
transaction->setResult(temINVALID_FLAG);
|
||||
|
||||
@@ -1291,7 +1291,7 @@ PeerImp::handleTransaction(
|
||||
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
|
||||
// LCOV_EXCL_START
|
||||
/*
|
||||
There is no need to check whether the featureBatchV1_1 amendment is
|
||||
There is no need to check whether the featureBatch amendment is
|
||||
enabled.
|
||||
|
||||
* If the `tfInnerBatchTxn` flag is set, and the amendment is
|
||||
@@ -2740,7 +2740,7 @@ PeerImp::checkTransaction(
|
||||
// charge strongly for relaying batch txns
|
||||
// LCOV_EXCL_START
|
||||
/*
|
||||
There is no need to check whether the featureBatchV1_1 amendment is
|
||||
There is no need to check whether the featureBatch amendment is
|
||||
enabled.
|
||||
|
||||
* If the `tfInnerBatchTxn` flag is set, and the amendment is
|
||||
|
||||
Reference in New Issue
Block a user