Compare commits

..

5 Commits

Author SHA1 Message Date
Michael Legleux
8585055d6e fix output 2026-02-27 17:20:40 -08:00
Michael Legleux
a35308399d readable ui output 2026-02-27 17:16:57 -08:00
Michael Legleux
8068a7b513 Trigger run 2026-02-27 15:57:17 -08:00
Vito Tumas
1a7f824b89 refactor: Splits invariant checks into multiple classes (#6440)
The invariant check system had grown into a single monolithic file pair containing 24 invariant checker classes. The large `InvariantCheck.cpp` file was a frequent source of merge conflicts and difficult to navigate. This refactoring improves maintainability and readability with zero behavioral changes.

In particular, this change:
- Splits `InvariantCheck.h` and `InvariantCheck.cpp` into 10 focused header/source pairs organized by domain under a new `invariants/` subdirectory.
- Extracts the shared `Privilege` enum and `hasPrivilege()` function into a dedicated `InvariantCheckPrivilege.h` header, so domain-specific files can reference them independently.
2026-02-27 21:02:39 +00:00
Sergey Kuznetsov
b58c681189 chore: Make nix hook optional (#6431)
This change makes the `nix` pre-commit hook optional in development environments, and enforced only inside Github Actions.
2026-02-27 13:36:10 -05:00
53 changed files with 4945 additions and 8227 deletions

View File

@@ -32,10 +32,13 @@ We will further set additional CMake arguments as follows:
"""
def generate_strategy_matrix(all: bool, config: Config) -> list:
def generate_strategy_matrix(all: bool, config: Config, distro: str = "") -> list:
configurations = []
os_entries = config.os
if distro:
os_entries = [o for o in os_entries if o["distro_name"] == distro]
for architecture, os, build_type, cmake_args in itertools.product(
config.architecture, config.os, config.build_type, config.cmake_args
config.architecture, os_entries, config.build_type, config.cmake_args
):
# The default CMake target is 'all' for Linux and MacOS and 'install'
# for Windows, but it can get overridden for certain configurations.
@@ -223,7 +226,7 @@ def generate_strategy_matrix(all: bool, config: Config) -> list:
if (n := os["compiler_version"]) != "":
config_name += f"-{n}"
config_name += (
f"-{architecture['platform'][architecture['platform'].find('/')+1:]}"
f"-{architecture['platform'][architecture['platform'].find('/') + 1 :]}"
)
config_name += f"-{build_type.lower()}"
if "-Dcoverage=ON" in cmake_args:
@@ -313,21 +316,32 @@ if __name__ == "__main__":
required=False,
type=Path,
)
parser.add_argument(
"-d",
"--distro",
help="Filter OS entries to only include those with this distro_name (e.g. 'debian', 'rhel', 'ubuntu').",
required=False,
type=str,
default="",
)
args = parser.parse_args()
matrix = []
if args.config is None or args.config == "":
matrix += generate_strategy_matrix(
args.all, read_config(THIS_DIR / "linux.json")
args.all, read_config(THIS_DIR / "linux.json"), args.distro
)
matrix += generate_strategy_matrix(
args.all, read_config(THIS_DIR / "macos.json")
args.all, read_config(THIS_DIR / "macos.json"), args.distro
)
matrix += generate_strategy_matrix(
args.all, read_config(THIS_DIR / "windows.json")
args.all, read_config(THIS_DIR / "windows.json"), args.distro
)
else:
matrix += generate_strategy_matrix(args.all, read_config(args.config))
matrix += generate_strategy_matrix(
args.all, read_config(args.config), args.distro
)
# Generate the strategy matrix.
print(f"matrix={json.dumps({'include': matrix})}")
# print(json.dumps(matrix, indent=2))

View File

@@ -128,12 +128,23 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [linux, macos, windows]
include:
- os: linux
distro: debian
- os: linux
distro: rhel
- os: linux
distro: ubuntu
- os: macos
distro: ""
- os: windows
distro: ""
with:
# Enable ccache only for events targeting the XRPLF repository, since
# other accounts will not have access to our remote cache storage.
ccache_enabled: ${{ github.repository_owner == 'XRPLF' }}
os: ${{ matrix.os }}
distro: ${{ matrix.distro }}
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

View File

@@ -77,7 +77,17 @@ jobs:
strategy:
fail-fast: ${{ github.event_name == 'merge_group' }}
matrix:
os: [linux, macos, windows]
include:
- os: linux
distro: debian
- os: linux
distro: rhel
- os: linux
distro: ubuntu
- os: macos
distro: ""
- os: windows
distro: ""
with:
# Enable ccache only for events targeting the XRPLF repository, since
# other accounts will not have access to our remote cache storage.
@@ -86,6 +96,7 @@ jobs:
# not identical to a regular compilation.
ccache_enabled: ${{ github.repository_owner == 'XRPLF' && !startsWith(github.ref, 'refs/heads/release') }}
os: ${{ matrix.os }}
distro: ${{ matrix.distro }}
strategy_matrix: ${{ github.event_name == 'schedule' && 'all' || 'minimal' }}
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

View File

@@ -11,7 +11,7 @@ on:
jobs:
# Call the workflow in the XRPLF/actions repo that runs the pre-commit hooks.
run-hooks:
uses: XRPLF/actions/.github/workflows/pre-commit.yml@320be44621ca2a080f05aeb15817c44b84518108
uses: XRPLF/actions/.github/workflows/pre-commit.yml@56de1bdf19639e009639a50b8d17c28ca954f267
with:
runs_on: ubuntu-latest
container: '{ "image": "ghcr.io/xrplf/ci/tools-rippled-pre-commit:sha-41ec7c1" }'

View File

@@ -26,6 +26,12 @@ on:
type: string
default: "minimal"
distro:
description: 'Filter to only include configs for this distro (e.g. "debian", "rhel", "ubuntu"). Leave empty for no filtering.'
required: false
type: string
default: ""
secrets:
CODECOV_TOKEN:
description: "The Codecov token to use for uploading coverage reports."
@@ -38,9 +44,11 @@ jobs:
with:
os: ${{ inputs.os }}
strategy_matrix: ${{ inputs.strategy_matrix }}
distro: ${{ inputs.distro }}
# Build and test the binary for each configuration.
build-test-config:
name: ${{ matrix.config_name }}
needs:
- generate-matrix
uses: ./.github/workflows/reusable-build-test-config.yml

View File

@@ -13,6 +13,11 @@ on:
required: false
type: string
default: "minimal"
distro:
description: 'Filter to only include configs for this distro (e.g. "debian", "rhel", "ubuntu"). Leave empty for no filtering.'
required: false
type: string
default: ""
outputs:
matrix:
description: "The generated strategy matrix."
@@ -42,4 +47,5 @@ jobs:
env:
GENERATE_CONFIG: ${{ inputs.os != '' && format('--config={0}.json', inputs.os) || '' }}
GENERATE_OPTION: ${{ inputs.strategy_matrix == 'all' && '--all' || '' }}
run: ./generate.py ${GENERATE_OPTION} ${GENERATE_CONFIG} >> "${GITHUB_OUTPUT}"
GENERATE_DISTRO: ${{ inputs.distro != '' && format('--distro={0}', inputs.distro) || '' }}
run: ./generate.py ${GENERATE_OPTION} ${GENERATE_CONFIG} ${GENERATE_DISTRO} >> "${GITHUB_OUTPUT}"

View File

@@ -61,7 +61,15 @@ repos:
hooks:
- id: nix-fmt
name: Format Nix files
entry: nix --extra-experimental-features 'nix-command flakes' fmt
entry: |
bash -c '
if command -v nix &> /dev/null || [ "$GITHUB_ACTIONS" = "true" ]; then
nix --extra-experimental-features "nix-command flakes" fmt "$@"
else
echo "Skipping nix-fmt: nix not installed and not in GitHub Actions"
exit 0
fi
' --
language: system
types:
- nix

File diff suppressed because it is too large Load Diff

View File

@@ -16,6 +16,8 @@ find_dependency(Boost
COMPONENTS
chrono
container
context
coroutine
date_time
filesystem
program_options

View File

@@ -22,6 +22,7 @@ target_compile_definitions(
BOOST_FILESYSTEM_NO_DEPRECATED
>
$<$<NOT:$<BOOL:${boost_show_deprecated}>>:
BOOST_COROUTINES_NO_DEPRECATION_WARNING
BOOST_BEAST_ALLOW_DEPRECATED
BOOST_FILESYSTEM_DEPRECATED
>

View File

@@ -4,6 +4,7 @@ include(XrplSanitizers)
find_package(Boost REQUIRED
COMPONENTS chrono
container
coroutine
date_time
filesystem
json
@@ -20,6 +21,7 @@ target_link_libraries(
INTERFACE Boost::headers
Boost::chrono
Boost::container
Boost::coroutine
Boost::date_time
Boost::filesystem
Boost::json

View File

@@ -196,6 +196,7 @@ class Xrpl(ConanFile):
"boost::headers",
"boost::chrono",
"boost::container",
"boost::coroutine",
"boost::date_time",
"boost::filesystem",
"boost::json",

View File

@@ -71,14 +71,12 @@ words:
- coldwallet
- compr
- conanfile
- cppcoro
- conanrun
- confs
- connectability
- coro
- coros
- cowid
- cppcoro
- cryptocondition
- cryptoconditional
- cryptoconditions
@@ -101,14 +99,11 @@ words:
- endmacro
- exceptioned
- Falco
- fcontext
- finalizers
- firewalled
- fcontext
- fmtdur
- fsanitize
- funclets
- gantt
- gcov
- gcovr
- ghead
@@ -190,7 +185,6 @@ words:
- ostr
- pargs
- partitioner
- pratik
- paychan
- paychans
- permdex
@@ -198,7 +192,6 @@ words:
- permissioned
- pointee
- populator
- pratik
- preauth
- preauthorization
- preauthorize
@@ -213,7 +206,6 @@ words:
- queuable
- Raphson
- replayer
- repost
- rerere
- retriable
- RIPD
@@ -244,7 +236,6 @@ words:
- soci
- socidb
- sslws
- stackful
- statsd
- STATSDCOLLECTOR
- stissue

122
include/xrpl/core/Coro.ipp Normal file
View File

@@ -0,0 +1,122 @@
#pragma once
#include <xrpl/basics/ByteUtilities.h>
namespace xrpl {
template <class F>
JobQueue::Coro::Coro(Coro_create_t, JobQueue& jq, JobType type, std::string const& name, F&& f)
: jq_(jq)
, type_(type)
, name_(name)
, running_(false)
, coro_(
[this, fn = std::forward<F>(f)](
boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield) {
yield_ = &do_yield;
yield();
fn(shared_from_this());
#ifndef NDEBUG
finished_ = true;
#endif
},
boost::coroutines::attributes(megabytes(1)))
{
}
inline JobQueue::Coro::~Coro()
{
#ifndef NDEBUG
XRPL_ASSERT(finished_, "xrpl::JobQueue::Coro::~Coro : is finished");
#endif
}
inline void
JobQueue::Coro::yield() const
{
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
(*yield_)();
}
inline bool
JobQueue::Coro::post()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// sp keeps 'this' alive
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
inline void
JobQueue::Coro::resume()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(static_cast<bool>(coro_), "xrpl::JobQueue::Coro::resume : is runnable");
coro_();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
inline bool
JobQueue::Coro::runnable() const
{
return static_cast<bool>(coro_);
}
inline void
JobQueue::Coro::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
// expectEarlyExit() must only ever be called from outside the
// Coro's stack. It you're inside the stack you can simply return
// and be done.
//
// That said, since we're outside the Coro's stack, we need to
// decrement the nSuspend that the Coro's call to yield caused.
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
}
inline void
JobQueue::Coro::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
}
} // namespace xrpl

View File

@@ -1,687 +0,0 @@
#pragma once
#include <coroutine>
#include <exception>
#include <utility>
#include <variant>
namespace xrpl {
template <typename T = void>
class CoroTask;
/**
* CoroTask<void> -- coroutine return type for void-returning coroutines.
*
* Class / Dependency Diagram
* ==========================
*
* CoroTask<void>
* +-----------------------------------------------+
* | - handle_ : Handle (coroutine_handle<promise>) |
* +-----------------------------------------------+
* | + handle(), done() |
* | + await_ready/suspend/resume (Awaiter iface) |
* +-----------------------------------------------+
* | owns
* v
* promise_type
* +-----------------------------------------------+
* | - exception_ : std::exception_ptr |
* | - continuation_ : std::coroutine_handle<> |
* +-----------------------------------------------+
* | + get_return_object() -> CoroTask |
* | + initial_suspend() -> suspend_always (lazy) |
* | + final_suspend() -> FinalAwaiter |
* | + return_void() |
* | + unhandled_exception() |
* +-----------------------------------------------+
* | returns at final_suspend
* v
* FinalAwaiter
* +-----------------------------------------------+
* | await_suspend(h): |
* | if continuation_ set -> symmetric transfer |
* | else -> noop_coroutine |
* +-----------------------------------------------+
*
* Design Notes
* ------------
* - Lazy start: initial_suspend returns suspend_always, so the coroutine
* body does not execute until the handle is explicitly resumed.
* - Symmetric transfer: await_suspend returns a coroutine_handle instead
* of void/bool, allowing the scheduler to jump directly to the next
* coroutine without growing the call stack.
* - Continuation chaining: when one CoroTask is co_await-ed inside
* another, the caller's handle is stored as continuation_ so
* FinalAwaiter can resume it when this task finishes.
* - Move-only: the handle is exclusively owned; copy is deleted.
*
* Usage Examples
* ==============
*
* 1. Basic void coroutine (the most common case in rippled):
*
* CoroTask<void> doWork(std::shared_ptr<CoroTaskRunner> runner) {
* // do something
* co_await runner->suspend(); // yield control
* // resumed later via runner->post() or runner->resume()
* co_return;
* }
*
* 2. co_await-ing one CoroTask<void> from another (chaining):
*
* CoroTask<void> inner() {
* // ...
* co_return;
* }
* CoroTask<void> outer() {
* co_await inner(); // continuation_ links outer -> inner
* co_return; // FinalAwaiter resumes outer
* }
*
* 3. Exceptions propagate through co_await:
*
* CoroTask<void> failing() {
* throw std::runtime_error("oops");
* co_return;
* }
* CoroTask<void> caller() {
* try { co_await failing(); }
* catch (std::runtime_error const&) { // caught here }
* }
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Dangling references in coroutine parameters.
* Coroutine parameters are copied into the frame, but references
* are NOT -- they are stored as-is. If the referent goes out of scope
* before the coroutine finishes, you get use-after-free.
*
* // BROKEN -- local dies before coroutine runs:
* CoroTask<void> bad(int& ref) { co_return; }
* void launch() {
* int local = 42;
* auto task = bad(local); // frame stores &local
* } // local destroyed; frame holds dangling ref
*
* // FIX -- pass by value, or ensure lifetime via shared_ptr.
*
* BUG-RISK: GCC 14 corrupts reference captures in coroutine lambdas.
* When a lambda that returns CoroTask captures by reference ([&]),
* GCC 14 may generate a corrupted coroutine frame. Always capture
* by explicit pointer-to-value instead:
*
* // BROKEN on GCC 14:
* jq.postCoroTask(t, n, [&](auto) -> CoroTask<void> { ... });
*
* // FIX -- capture pointers explicitly:
* jq.postCoroTask(t, n, [ptr = &val](auto) -> CoroTask<void> { ... });
*
* BUG-RISK: Resuming a destroyed or completed CoroTask.
* Calling handle().resume() after the coroutine has already run to
* completion (done() == true) is undefined behavior. The CoroTaskRunner
* guards against this with an XRPL_ASSERT, but standalone usage of
* CoroTask must check done() before resuming.
*
* BUG-RISK: Moving a CoroTask that is being awaited.
* If task A is co_await-ed by task B (so A.continuation_ == B), moving
* or destroying A will invalidate the continuation link. Never move
* or reassign a CoroTask while it is mid-execution or being awaited.
*
* LIMITATION: CoroTask is fire-and-forget for the top-level owner.
* There is no built-in notification when the coroutine finishes.
* The caller must use external synchronization (e.g. CoroTaskRunner::join
* or a gate/condition_variable) to know when it is done.
*
* LIMITATION: No cancellation token.
* There is no way to cancel a suspended CoroTask from outside. The
* coroutine body must cooperatively check a flag (e.g. jq_.isStopping())
* after each co_await and co_return early if needed.
*
* LIMITATION: Stackless -- cannot suspend from nested non-coroutine calls.
* If a coroutine calls a regular function that wants to "yield", it
* cannot. Only the immediate coroutine body can use co_await.
* This is acceptable for rippled because all yield() sites are shallow.
*/
template <>
class CoroTask<void>
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
/**
* Coroutine promise. Compiler uses this to manage coroutine state.
* Stores the exception (if any) and the continuation handle for
* symmetric transfer back to the awaiting coroutine.
*/
struct promise_type
{
// Captured exception from the coroutine body, rethrown in
// await_resume() when this task is co_await-ed by a caller.
std::exception_ptr exception_;
// Handle to the coroutine that is co_await-ing this task.
// Set by await_suspend(). FinalAwaiter uses it for symmetric
// transfer back to the caller. Null if this is a top-level task.
std::coroutine_handle<> continuation_;
/**
* Create the CoroTask return object.
* Called by the compiler at coroutine creation.
*/
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
/**
* Lazy start. The coroutine body does not execute until the
* handle is explicitly resumed (e.g. by CoroTaskRunner::resume).
*/
std::suspend_always
initial_suspend() noexcept
{
return {};
}
/**
* Awaiter returned by final_suspend(). Uses symmetric transfer:
* if a continuation exists, transfers control directly to it
* (tail-call, no stack growth). Otherwise returns noop_coroutine
* so the coroutine frame stays alive for the owner to destroy.
*/
struct FinalAwaiter
{
/**
* Always false. We need await_suspend to run for
* symmetric transfer.
*/
bool
await_ready() noexcept
{
return false;
}
/**
* Symmetric transfer: returns the continuation handle so
* the compiler emits a tail-call instead of a nested resume.
* If no continuation is set, returns noop_coroutine to
* suspend at final_suspend without destroying the frame.
*
* @param h Handle to this completing coroutine
*
* @return Continuation handle, or noop_coroutine
*/
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
/**
* Returns FinalAwaiter for symmetric transfer at coroutine end.
*/
FinalAwaiter
final_suspend() noexcept
{
return {};
}
/**
* Called by the compiler for `co_return;` (void coroutine).
*/
void
return_void()
{
}
/**
* Called by the compiler when an exception escapes the coroutine
* body. Captures it for later rethrowing in await_resume().
*/
void
unhandled_exception()
{
exception_ = std::current_exception();
}
};
/**
* Default constructor. Creates an empty (null handle) task.
*/
CoroTask() = default;
/**
* Takes ownership of a compiler-generated coroutine handle.
*
* @param h Coroutine handle to own
*/
explicit CoroTask(Handle h) : handle_(h)
{
}
/**
* Destroys the coroutine frame if this task owns one.
*/
~CoroTask()
{
if (handle_)
handle_.destroy();
}
/**
* Move constructor. Transfers handle ownership, leaves other empty.
*/
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
/**
* Move assignment. Destroys current frame (if any), takes other's.
*/
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
/**
* @return The underlying coroutine_handle
*/
Handle
handle() const
{
return handle_;
}
/**
* @return true if the coroutine has run to completion (or thrown)
*/
bool
done() const
{
return handle_ && handle_.done();
}
// -- Awaiter interface: allows `co_await someCoroTask;` --
/**
* Always false. This task is lazy, so co_await always suspends
* the caller to set up the continuation link.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Stores the caller's handle as our continuation, then returns
* our handle for symmetric transfer (caller suspends, we resume).
*
* @param caller Handle of the coroutine doing co_await on us
*
* @return Our handle for symmetric transfer
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_; // Symmetric transfer
}
/**
* Called when the caller resumes after co_await. Rethrows any
* exception captured by unhandled_exception().
*/
void
await_resume()
{
if (auto& ep = handle_.promise().exception_)
std::rethrow_exception(ep);
}
private:
// Exclusively-owned coroutine handle. Null after move or default
// construction. Destroyed in the destructor.
Handle handle_;
};
/**
* CoroTask<T> -- coroutine return type for value-returning coroutines.
*
* Class / Dependency Diagram
* ==========================
*
* CoroTask<T>
* +-----------------------------------------------+
* | - handle_ : Handle (coroutine_handle<promise>) |
* +-----------------------------------------------+
* | + handle(), done() |
* | + await_ready/suspend/resume (Awaiter iface) |
* +-----------------------------------------------+
* | owns
* v
* promise_type
* +-----------------------------------------------+
* | - result_ : variant<monostate, T, |
* | exception_ptr> |
* | - continuation_ : std::coroutine_handle<> |
* +-----------------------------------------------+
* | + get_return_object() -> CoroTask |
* | + initial_suspend() -> suspend_always (lazy) |
* | + final_suspend() -> FinalAwaiter |
* | + return_value(T) -> stores in result_[1] |
* | + unhandled_exception -> stores in result_[2] |
* +-----------------------------------------------+
* | returns at final_suspend
* v
* FinalAwaiter (same symmetric-transfer pattern as CoroTask<void>)
*
* Value Extraction
* ----------------
* await_resume() inspects the variant:
* - index 2 (exception_ptr) -> rethrow
* - index 1 (T) -> return value via move
*
* Usage Examples
* ==============
*
* 1. Simple value return:
*
* CoroTask<int> computeAnswer() { co_return 42; }
*
* CoroTask<void> caller() {
* int v = co_await computeAnswer(); // v == 42
* }
*
* 2. Chaining value-returning coroutines:
*
* CoroTask<int> add(int a, int b) { co_return a + b; }
* CoroTask<int> doubleSum(int a, int b) {
* int s = co_await add(a, b);
* co_return s * 2;
* }
*
* 3. Exception propagation from inner to outer:
*
* CoroTask<int> failing() {
* throw std::runtime_error("bad");
* co_return 0; // never reached
* }
* CoroTask<void> caller() {
* try {
* int v = co_await failing(); // throws here
* } catch (std::runtime_error const& e) {
* // e.what() == "bad"
* }
* }
*
* Caveats / Pitfalls (in addition to CoroTask<void> caveats above)
* ================================================================
*
* BUG-RISK: await_resume() moves the value out of the variant.
* Calling co_await on the same CoroTask<T> instance twice is undefined
* behavior -- the second call will see a moved-from T. CoroTask is
* single-shot: one co_return, one co_await.
*
* BUG-RISK: T must be move-constructible.
* return_value(T) takes by value and moves into the variant.
* Types that are not movable cannot be used as T.
*
* LIMITATION: No co_yield support.
* CoroTask<T> only supports a single co_return. It does not implement
* yield_value(), so using co_yield inside a CoroTask<T> coroutine is a
* compile error. For streaming values, a different return type
* (e.g. Generator<T>) would be needed.
*
* LIMITATION: Result is only accessible via co_await.
* There is no .get() or .result() method. The value can only be
* extracted by co_await-ing the CoroTask<T> from inside another
* coroutine. For extracting results in non-coroutine code, pass a
* pointer to the caller and write through it (as the tests do).
*/
template <typename T>
class CoroTask
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
/**
* Coroutine promise for value-returning coroutines.
* Stores the result as a variant: monostate (not yet set),
* T (co_return value), or exception_ptr (unhandled exception).
*/
struct promise_type
{
// Tri-state result:
// index 0 (monostate) -- coroutine has not yet completed
// index 1 (T) -- co_return value stored here
// index 2 (exception) -- unhandled exception captured here
std::variant<std::monostate, T, std::exception_ptr> result_;
// Handle to the coroutine co_await-ing this task. Used by
// FinalAwaiter for symmetric transfer. Null for top-level tasks.
std::coroutine_handle<> continuation_;
/**
* Create the CoroTask return object.
* Called by the compiler at coroutine creation.
*/
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
/**
* Lazy start. Coroutine body does not run until explicitly resumed.
*/
std::suspend_always
initial_suspend() noexcept
{
return {};
}
/**
* Symmetric-transfer awaiter at coroutine completion.
* Same pattern as CoroTask<void>::FinalAwaiter.
*/
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
/**
* Returns continuation for symmetric transfer, or
* noop_coroutine if this is a top-level task.
*
* @param h Handle to this completing coroutine
*
* @return Continuation handle, or noop_coroutine
*/
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
/**
* Called by the compiler for `co_return value;`.
* Moves the value into result_ at index 1.
*
* @param value The value to store
*/
void
return_value(T value)
{
result_.template emplace<1>(std::move(value));
}
/**
* Captures unhandled exceptions at index 2 of result_.
* Rethrown later in await_resume().
*/
void
unhandled_exception()
{
result_.template emplace<2>(std::current_exception());
}
};
/**
* Default constructor. Creates an empty (null handle) task.
*/
CoroTask() = default;
/**
* Takes ownership of a compiler-generated coroutine handle.
*
* @param h Coroutine handle to own
*/
explicit CoroTask(Handle h) : handle_(h)
{
}
/**
* Destroys the coroutine frame if this task owns one.
*/
~CoroTask()
{
if (handle_)
handle_.destroy();
}
/**
* Move constructor. Transfers handle ownership, leaves other empty.
*/
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
/**
* Move assignment. Destroys current frame (if any), takes other's.
*/
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
/**
* @return The underlying coroutine_handle
*/
Handle
handle() const
{
return handle_;
}
/**
* @return true if the coroutine has run to completion (or thrown)
*/
bool
done() const
{
return handle_ && handle_.done();
}
// -- Awaiter interface: allows `T val = co_await someCoroTask;` --
/**
* Always false. co_await always suspends to set up continuation.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Stores caller as continuation, returns our handle for
* symmetric transfer.
*
* @param caller Handle of the coroutine doing co_await on us
*
* @return Our handle for symmetric transfer
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_;
}
/**
* Extracts the result: rethrows if exception, otherwise moves
* the T value out of the variant. Single-shot: calling twice
* on the same task is undefined (moved-from T).
*
* @return The co_return-ed value
*/
T
await_resume()
{
auto& result = handle_.promise().result_;
if (auto* ep = std::get_if<2>(&result))
std::rethrow_exception(*ep);
return std::get<1>(std::move(result));
}
private:
// Exclusively-owned coroutine handle. Null after move or default
// construction. Destroyed in the destructor.
Handle handle_;
};
} // namespace xrpl

View File

@@ -1,321 +0,0 @@
#pragma once
/**
* @file CoroTaskRunner.ipp
*
* CoroTaskRunner inline implementation.
*
* This file contains the business logic for managing C++20 coroutines
* on the JobQueue. It is included at the bottom of JobQueue.h.
*
* Data Flow: suspend / post / resume cycle
* =========================================
*
* coroutine body CoroTaskRunner JobQueue
* -------------- -------------- --------
* |
* co_await runner->suspend()
* |
* +--- await_suspend ------> onSuspend()
* | ++nSuspend_ ------------> nSuspend_
* | [coroutine is now suspended]
* |
* . (externally or by JobQueueAwaiter)
* .
* +--- (caller calls) -----> post()
* | running_ = true
* | addJob(resume) ----------> job enqueued
* | |
* | [worker picks up]
* | |
* +--- <----- resume() <-----------------------------------+
* | --nSuspend_ ------> nSuspend_
* | swap in LocalValues (lvs_)
* | task_.handle().resume()
* | |
* | [coroutine body continues here]
* | |
* | swap out LocalValues
* | running_ = false
* | cv_.notify_all()
* v
*
* Thread Safety
* =============
* - mutex_ : guards task_.handle().resume() so that post()-before-suspend
* races cannot resume the coroutine while it is still running.
* (See the race condition discussion in JobQueue.h)
* - mutex_run_ : guards running_ flag; used by join() to wait for completion.
* - jq_.m_mutex: guards nSuspend_ increments/decrements.
*
* Common Mistakes When Modifying This File
* =========================================
*
* 1. Changing lock ordering.
* resume() acquires locks in this order: mutex_run_ -> jq_.m_mutex -> mutex_.
* Acquiring them in a different order WILL deadlock. Any new code path
* that touches these mutexes must follow the same order.
*
* 2. Removing the shared_from_this() capture in post().
* The lambda passed to addJob captures [this, sp = shared_from_this()].
* If you remove sp, 'this' can be destroyed before the job runs,
* causing use-after-free. The sp capture is load-bearing.
*
* 3. Forgetting to decrement nSuspend_ on a new code path.
* Every ++nSuspend_ must have a matching --nSuspend_. If you add a new
* suspension path (e.g. a new awaiter) and forget to decrement on resume
* or on failure, JobQueue::stop() will hang.
*
* 4. Calling task_.handle().resume() without holding mutex_.
* This allows a race where the coroutine runs on two threads
* simultaneously. Always hold mutex_ around resume().
*
* 5. Swapping LocalValues outside of the mutex_ critical section.
* The swap-in and swap-out of LocalValues must bracket the resume()
* call. If you move the swap-out before the lock_guard(mutex_) is
* released, you break LocalValue isolation for any code that runs
* after the coroutine suspends but before the lock is dropped.
*/
//
namespace xrpl {
/**
* Construct a CoroTaskRunner. Sets running_ to false; does not
* create the coroutine. Call init() afterwards.
*
* @param jq The JobQueue this coroutine will run on
* @param type Job type for scheduling priority
* @param name Human-readable name for logging
*/
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
create_t,
JobQueue& jq,
JobType type,
std::string const& name)
: jq_(jq), type_(type), name_(name), running_(false)
{
}
/**
* Initialize with a coroutine-returning callable.
* Stores the callable on the heap (FuncStore) so it outlives the
* coroutine frame. Coroutine frames store a reference to the
* callable's implicit object parameter (the lambda). If the callable
* is a temporary, that reference dangles after the caller returns.
* Keeping the callable alive here ensures the coroutine's captures
* remain valid.
*
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
*/
template <class F>
void
JobQueue::CoroTaskRunner::init(F&& f)
{
using Fn = std::decay_t<F>;
auto store = std::make_unique<FuncStore<Fn>>(std::forward<F>(f));
task_ = store->func(shared_from_this());
storedFunc_ = std::move(store);
}
/**
* Destructor. Asserts (debug) that the coroutine has finished
* or expectEarlyExit() was called.
*/
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
{
#ifndef NDEBUG
XRPL_ASSERT(finished_, "xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
#endif
}
/**
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
*/
inline void
JobQueue::CoroTaskRunner::onSuspend()
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
/**
* Decrement nSuspend_ without resuming.
*/
inline void
JobQueue::CoroTaskRunner::onUndoSuspend()
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
/**
* Return a SuspendAwaiter whose await_suspend() increments nSuspend_
* before the coroutine actually suspends. The caller must later call
* post() or resume() to continue execution.
*
* @return Awaiter for use with `co_await runner->suspend()`
*/
inline auto
JobQueue::CoroTaskRunner::suspend()
{
/**
* Custom awaiter for suspend(). Always suspends (await_ready
* returns false) and increments nSuspend_ in await_suspend().
*/
struct SuspendAwaiter
{
CoroTaskRunner& runner_; // The runner that owns this coroutine.
/**
* Always returns false so the coroutine suspends.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Called when the coroutine suspends. Increments nSuspend_
* so the JobQueue knows a coroutine is waiting.
*/
void
await_suspend(std::coroutine_handle<>) const
{
runner_.onSuspend();
}
void
await_resume() const noexcept
{
}
};
return SuspendAwaiter{*this};
}
/**
* Schedule coroutine resumption as a job on the JobQueue.
* A shared_ptr capture (sp) prevents this CoroTaskRunner from being
* destroyed while the job is queued but not yet executed.
*
* @return false if the JobQueue rejected the job (shutting down)
*/
inline bool
JobQueue::CoroTaskRunner::post()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// sp prevents 'this' from being destroyed while the job is pending
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
/**
* Resume the coroutine on the current thread.
*
* Steps:
* 1. Set running_ = true (under mutex_run_)
* 2. Decrement nSuspend_ (under jq_.m_mutex)
* 3. Swap in this coroutine's LocalValues for thread-local isolation
* 4. Resume the coroutine handle (under mutex_)
* 5. Swap out LocalValues, restoring the thread's previous state
* 6. Set running_ = false and notify join() waiters
*/
inline void
JobQueue::CoroTaskRunner::resume()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(!task_.done(), "xrpl::JobQueue::CoroTaskRunner::resume : task is not done");
task_.handle().resume();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
if (task_.done())
{
#ifndef NDEBUG
finished_ = true;
#endif
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
// Use std::move (not task_ = {}) so task_.handle_ is null BEFORE the
// frame is destroyed. operator= would destroy the frame while handle_
// still holds the old value -- a re-entrancy hazard on GCC-12 if
// frame destruction triggers runner cleanup.
[[maybe_unused]] auto completed = std::move(task_);
}
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
/**
* @return true if the coroutine has not yet run to completion
*/
inline bool
JobQueue::CoroTaskRunner::runnable() const
{
// After normal completion, task_ is reset to break the shared_ptr cycle
// (handle_ becomes null). A null handle means the coroutine is done.
return task_.handle() && !task_.done();
}
/**
* Handle early termination when the coroutine never ran (e.g. JobQueue
* is stopping). Decrements nSuspend_ and destroys the coroutine frame
* to break the shared_ptr cycle: frame -> lambda -> runner -> frame.
*/
inline void
JobQueue::CoroTaskRunner::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
// The coroutine is at initial_suspend and never ran user code, so
// destroying it is safe. Use std::move (not task_ = {}) so
// task_.handle_ is null before the frame is destroyed.
{
[[maybe_unused]] auto completed = std::move(task_);
}
storedFunc_.reset();
}
/**
* Block until the coroutine finishes its current execution slice.
* Uses cv_ + mutex_run_ to wait until running_ == false.
*/
inline void
JobQueue::CoroTaskRunner::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
}
} // namespace xrpl

View File

@@ -2,13 +2,13 @@
#include <xrpl/basics/LocalValue.h>
#include <xrpl/core/ClosureCounter.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobTypeData.h>
#include <xrpl/core/JobTypes.h>
#include <xrpl/core/detail/Workers.h>
#include <xrpl/json/json_value.h>
#include <coroutine>
#include <boost/coroutine/all.hpp>
#include <set>
namespace xrpl {
@@ -18,6 +18,10 @@ class PerfLog;
}
class Logs;
struct Coro_create_t
{
explicit Coro_create_t() = default;
};
/** A pool of threads to perform work.
@@ -32,380 +36,85 @@ class Logs;
class JobQueue : private Workers::Callback
{
public:
/** C++20 coroutine lifecycle manager.
*
* Class / Inheritance / Dependency Diagram
* =========================================
*
* std::enable_shared_from_this<CoroTaskRunner>
* ^
* | (public inheritance)
* |
* CoroTaskRunner
* +---------------------------------------------------+
* | - lvs_ : detail::LocalValues |
* | - jq_ : JobQueue& |
* | - type_ : JobType |
* | - name_ : std::string |
* | - running_ : bool |
* | - mutex_ : std::mutex (coroutine guard) |
* | - mutex_run_ : std::mutex (join guard) |
* | - cv_ : condition_variable |
* | - task_ : CoroTask<void> |
* | - storedFunc_ : unique_ptr<FuncBase> (type-erased)|
* +---------------------------------------------------+
* | + init(F&&) : set up coroutine callable |
* | + onSuspend() : ++jq_.nSuspend_ |
* | + onUndoSuspend() : --jq_.nSuspend_ |
* | + suspend() : returns SuspendAwaiter |
* | + post() : schedule resume on JobQueue |
* | + resume() : resume coroutine on caller |
* | + runnable() : !task_.done() |
* | + expectEarlyExit() : teardown for failed post |
* | + join() : block until not running |
* +---------------------------------------------------+
* | |
* | owns | references
* v v
* CoroTask<void> JobQueue
* (coroutine frame) (thread pool + nSuspend_)
*
* FuncBase / FuncStore<F> (type-erased heap storage
* for the coroutine lambda)
*
* Coroutine Lifecycle (Control Flow)
* ===================================
*
* Caller thread JobQueue worker thread
* ------------- ----------------------
* postCoroTask(f)
* |
* +-- check stopping_ (reject if JQ shutting down)
* +-- ++nSuspend_ (lazy start counts as suspended)
* +-- make_shared<CoroTaskRunner>
* +-- init(f)
* | +-- store lambda on heap (FuncStore)
* | +-- task_ = f(shared_from_this())
* | [coroutine created, suspended at initial_suspend]
* +-- resume() (synchronous — runs on caller's thread)
* | +-- running_ = true
* | +-- --nSuspend_
* | +-- swap in LocalValues
* | +-- task_.handle().resume()
* | | [coroutine body runs to first co_await or co_return]
* | | ...
* | | co_await suspend()
* | | +-- ++nSuspend_
* | | [coroutine suspends]
* | +-- swap out LocalValues
* | +-- running_ = false
* | +-- cv_.notify_all()
* |
* post() <-- called externally or by JobQueueAwaiter
* +-- addJob(type_, [resume]{})
* resume()
* |
* +-- [coroutine body continues]
* +-- co_return
* +-- running_ = false
* +-- cv_.notify_all()
* join()
* +-- cv_.wait([]{!running_})
* +-- [done]
*
* Usage Examples
* ==============
*
* 1. Fire-and-forget coroutine (most common pattern):
*
* jq.postCoroTask(jtCLIENT, "MyWork",
* [](auto runner) -> CoroTask<void> {
* doSomeWork();
* co_await runner->suspend(); // yield to other jobs
* doMoreWork();
* co_return;
* });
*
* 2. Manually controlling suspend / resume (external trigger):
*
* auto runner = jq.postCoroTask(jtCLIENT, "ExtTrigger",
* [&result](auto runner) -> CoroTask<void> {
* startAsyncOperation(callback);
* co_await runner->suspend();
* // callback called runner->post() to get here
* result = collectResult();
* co_return;
* });
* // ... later, from the callback:
* runner->post(); // reschedule the coroutine on the JobQueue
*
* 3. Using JobQueueAwaiter for automatic suspend + repost:
*
* jq.postCoroTask(jtCLIENT, "AutoRepost",
* [](auto runner) -> CoroTask<void> {
* step1();
* co_await JobQueueAwaiter{runner}; // yield + auto-repost
* step2();
* co_await JobQueueAwaiter{runner};
* step3();
* co_return;
* });
*
* 4. Checking shutdown after co_await (cooperative cancellation):
*
* jq.postCoroTask(jtCLIENT, "Cancellable",
* [&jq](auto runner) -> CoroTask<void> {
* while (moreWork()) {
* co_await JobQueueAwaiter{runner};
* if (jq.isStopping())
* co_return; // bail out cleanly
* processNextItem();
* }
* co_return;
* });
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Calling suspend() without a matching post()/resume().
* After co_await runner->suspend(), the coroutine is parked and
* nSuspend_ is incremented. If nothing ever calls post() or
* resume(), the coroutine is leaked and JobQueue::stop() will
* hang forever waiting for nSuspend_ to reach zero.
*
* BUG-RISK: Calling post() on an already-running coroutine.
* post() schedules a resume() job. If the coroutine has not
* actually suspended yet (no co_await executed), the resume job
* will try to call handle().resume() while the coroutine is still
* running on another thread. This is UB. The mutex_ prevents
* data corruption but the logic is wrong — always co_await
* suspend() before calling post(). (The test testIncorrectOrder
* shows this works only because mutex_ serializes the calls.)
*
* BUG-RISK: Dropping the shared_ptr<CoroTaskRunner> before join().
* The CoroTaskRunner destructor asserts (!finished_ is false).
* If you let the last shared_ptr die while the coroutine is still
* running or suspended, you get an assertion failure in debug and
* UB in release. Always call join() or expectEarlyExit() first.
*
* BUG-RISK: Lambda captures outliving the coroutine frame.
* The lambda passed to postCoroTask is heap-allocated (FuncStore)
* to prevent dangling. But objects captured by pointer still need
* their own lifetime management. If you capture a raw pointer to
* a stack variable, and the stack frame exits before the coroutine
* finishes, the pointer dangles. Use shared_ptr or ensure the
* pointed-to object outlives the coroutine.
*
* BUG-RISK: Forgetting co_return in a void coroutine.
* If the coroutine body falls off the end without co_return,
* the compiler may silently treat it as co_return (per standard),
* but some compilers warn. Always write explicit co_return.
*
* LIMITATION: CoroTaskRunner only supports CoroTask<void>.
* The task_ member is CoroTask<void>. To return values from
* the top-level coroutine, write through a captured pointer
* (as the tests demonstrate), or co_await inner CoroTask<T>
* coroutines that return values.
*
* LIMITATION: One coroutine per CoroTaskRunner.
* init() must be called exactly once. You cannot reuse a
* CoroTaskRunner to run a second coroutine. Create a new one
* via postCoroTask() instead.
*
* LIMITATION: No timeout on join().
* join() blocks indefinitely. If the coroutine is suspended
* and never posted, join() will deadlock. Use timed waits
* on the gate pattern (condition_variable + wait_for) in tests.
*/
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
/** Coroutines must run to completion. */
class Coro : public std::enable_shared_from_this<Coro>
{
private:
// Per-coroutine thread-local storage. Swapped in before resume()
// and swapped out after, so each coroutine sees its own LocalValue
// state regardless of which worker thread executes it.
detail::LocalValues lvs_;
// Back-reference to the owning JobQueue. Used to post jobs,
// increment/decrement nSuspend_, and acquire jq_.m_mutex.
JobQueue& jq_;
// Job type passed to addJob() when posting this coroutine.
JobType type_;
// Human-readable name for this coroutine job (for logging).
std::string name_;
// True while the coroutine is actively executing on a thread.
// Guarded by mutex_run_. join() blocks until this is false.
bool running_;
// Guards task_.handle().resume() to prevent the coroutine from
// running on two threads simultaneously. Handles the race where
// post() enqueues a resume before the coroutine has actually
// suspended (post-before-suspend pattern).
std::mutex mutex_;
// Guards running_ flag. Used with cv_ for join() to wait
// until the coroutine finishes its current execution slice.
std::mutex mutex_run_;
// Notified when running_ transitions to false, allowing
// join() waiters to wake up.
std::condition_variable cv_;
// The coroutine handle wrapper. Owns the coroutine frame.
// Set by init(), reset to empty by expectEarlyExit() on
// early termination.
CoroTask<void> task_;
/**
* Type-erased base for heap-stored callables.
* Prevents the coroutine lambda from being destroyed before
* the coroutine frame is done with it.
*
* @see FuncStore
*/
struct FuncBase
{
virtual ~FuncBase() = default;
};
/**
* Concrete type-erased storage for a callable of type F.
* The coroutine frame stores a reference to the lambda's implicit
* object parameter. If the lambda is a temporary, that reference
* dangles after the call returns. FuncStore keeps it alive on
* the heap for the lifetime of the CoroTaskRunner.
*/
template <class F>
struct FuncStore : FuncBase
{
F func; // The stored callable (coroutine lambda).
explicit FuncStore(F&& f) : func(std::move(f))
{
}
};
// Heap-allocated callable storage. Set by init(), ensures the
// lambda outlives the coroutine frame that references it.
std::unique_ptr<FuncBase> storedFunc_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
#ifndef NDEBUG
// Debug-only flag. True once the coroutine has completed or
// expectEarlyExit() was called. Asserted in the destructor
// to catch leaked runners.
bool finished_ = false;
#endif
public:
/**
* Tag type for private construction. Prevents external code
* from constructing CoroTaskRunner directly. Use postCoroTask().
*/
struct create_t
{
explicit create_t() = default;
};
/**
* Construct a CoroTaskRunner. Private by convention (create_t tag).
*
* @param jq The JobQueue this coroutine will run on
* @param type Job type for scheduling priority
* @param name Human-readable name for logging
*/
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
CoroTaskRunner(CoroTaskRunner const&) = delete;
CoroTaskRunner&
operator=(CoroTaskRunner const&) = delete;
/**
* Destructor. Asserts (debug) that the coroutine has finished
* or expectEarlyExit() was called.
*/
~CoroTaskRunner();
/**
* Initialize with a coroutine-returning callable.
* Must be called exactly once, after the object is managed by
* shared_ptr (because init uses shared_from_this internally).
* This is handled automatically by postCoroTask().
*
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
*/
// Private: Used in the implementation
template <class F>
Coro(Coro_create_t, JobQueue&, JobType, std::string const&, F&&);
// Not copy-constructible or assignable
Coro(Coro const&) = delete;
Coro&
operator=(Coro const&) = delete;
~Coro();
/** Suspend coroutine execution.
Effects:
The coroutine's stack is saved.
The associated Job thread is released.
Note:
The associated Job function returns.
Undefined behavior if called consecutively without a corresponding
post.
*/
void
init(F&& f);
yield() const;
/**
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
* Called when the coroutine is about to suspend. Every call
* must be balanced by a corresponding decrement (via resume()
* or onUndoSuspend()), or JobQueue::stop() will hang.
*/
void
onSuspend();
/** Schedule coroutine execution.
Effects:
Returns immediately.
A new job is scheduled to resume the execution of the coroutine.
When the job runs, the coroutine's stack is restored and execution
continues at the beginning of coroutine function or the
statement after the previous call to yield. Undefined behavior if
called after the coroutine has completed with a return (as opposed to
a yield()). Undefined behavior if post() or resume() called
consecutively without a corresponding yield.
/**
* Decrement nSuspend_ without resuming.
* Used to undo onSuspend() when a scheduled post() fails
* (e.g. JobQueue is stopping).
*/
void
onUndoSuspend();
/**
* Suspend the coroutine.
* The awaiter's await_suspend() increments nSuspend_ before the
* coroutine actually suspends. The caller must later call post()
* or resume() to continue execution.
*
* @return An awaiter for use with `co_await runner->suspend()`
*/
auto
suspend();
/**
* Schedule coroutine resumption as a job on the JobQueue.
* Captures shared_from_this() to prevent this runner from being
* destroyed while the job is queued.
*
* @return true if the job was accepted; false if the JobQueue
* is stopping (caller must handle cleanup)
*/
@return true if the Coro's job is added to the JobQueue.
*/
bool
post();
/**
* Resume the coroutine on the current thread.
* Decrements nSuspend_, swaps in LocalValues, resumes the
* coroutine handle, swaps out LocalValues, and notifies join()
* waiters. Lock ordering: mutex_run_ -> jq_.m_mutex -> mutex_.
*/
/** Resume coroutine execution.
Effects:
The coroutine continues execution from where it last left off
using this same thread.
Undefined behavior if called after the coroutine has completed
with a return (as opposed to a yield()).
Undefined behavior if resume() or post() called consecutively
without a corresponding yield.
*/
void
resume();
/**
* @return true if the coroutine has not yet run to completion
*/
/** Returns true if the Coro is still runnable (has not returned). */
bool
runnable() const;
/**
* Handle early termination when the coroutine never ran.
* Decrements nSuspend_ and destroys the coroutine frame to
* break the shared_ptr cycle (frame -> lambda -> runner -> frame).
* Called by postCoroTask() when post() fails.
*/
/** Once called, the Coro allows early exit without an assert. */
void
expectEarlyExit();
/**
* Block until the coroutine finishes its current execution slice.
* Uses cv_ + mutex_run_ to wait until running_ == false.
* Warning: deadlocks if the coroutine is suspended and never posted.
*/
/** Waits until coroutine returns from the user function. */
void
join();
};
@@ -443,18 +152,18 @@ public:
return false;
}
/** Creates a C++20 coroutine and adds a job to the queue to run it.
/** Creates a coroutine and adds a job to the queue which will run it.
@param t The type of job.
@param name Name of the job.
@param f Callable with signature
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
@param f Has a signature of void(std::shared_ptr<Coro>). Called when the
job executes.
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
@return shared_ptr to posted Coro. nullptr if post was not successful.
*/
template <class F>
std::shared_ptr<CoroTaskRunner>
postCoroTask(JobType t, std::string const& name, F&& f);
std::shared_ptr<Coro>
postCoro(JobType t, std::string const& name, F&& f);
/** Jobs waiting at this priority.
*/
@@ -508,6 +217,8 @@ public:
isStopped() const;
private:
friend class Coro;
using JobDataMap = std::map<JobType, JobTypeData>;
beast::Journal m_journal;
@@ -608,70 +319,86 @@ private:
getJobLimit(JobType type);
};
/*
An RPC command is received and is handled via ServerHandler(HTTP) or
Handler(websocket), depending on the connection type. The handler then calls
the JobQueue::postCoro() method to create a coroutine and run it at a later
point. This frees up the handler thread and allows it to continue handling
other requests while the RPC command completes its work asynchronously.
postCoro() creates a Coro object. When the Coro ctor is called, and its
coro_ member is initialized (a boost::coroutines::pull_type), execution
automatically passes to the coroutine, which we don't want at this point,
since we are still in the handler thread context. It's important to note
here that construction of a boost pull_type automatically passes execution to
the coroutine. A pull_type object automatically generates a push_type that is
passed as a parameter (do_yield) in the signature of the function the
pull_type was created with. This function is immediately called during coro_
construction and within it, Coro::yield_ is assigned the push_type
parameter (do_yield) address and called (yield()) so we can return execution
back to the caller's stack.
postCoro() then calls Coro::post(), which schedules a job on the job
queue to continue execution of the coroutine in a JobQueue worker thread at
some later time. When the job runs, we lock on the Coro::mutex_ and call
coro_ which continues where we had left off. Since we the last thing we did
in coro_ was call yield(), the next thing we continue with is calling the
function param f, that was passed into Coro ctor. It is within this
function body that the caller specifies what he would like to do while
running in the coroutine and allow them to suspend and resume execution.
A task that relies on other events to complete, such as path finding, calls
Coro::yield() to suspend its execution while waiting on those events to
complete and continue when signaled via the Coro::post() method.
There is a potential race condition that exists here where post() can get
called before yield() after f is called. Technically the problem only occurs
if the job that post() scheduled is executed before yield() is called.
If the post() job were to be executed before yield(), undefined behavior
would occur. The lock ensures that coro_ is not called again until we exit
the coroutine. At which point a scheduled resume() job waiting on the lock
would gain entry, harmlessly call coro_ and immediately return as we have
already completed the coroutine.
The race condition occurs as follows:
1- The coroutine is running.
2- The coroutine is about to suspend, but before it can do so, it must
arrange for some event to wake it up.
3- The coroutine arranges for some event to wake it up.
4- Before the coroutine can suspend, that event occurs and the
resumption of the coroutine is scheduled on the job queue. 5- Again, before
the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
Again, before the coroutine can suspend, the resumption code runs the
coroutine.
The coroutine is now running in two threads.
The lock prevents this from happening as step 6 will block until the
lock is released which only happens after the coroutine completes.
*/
} // namespace xrpl
#include <xrpl/core/CoroTaskRunner.ipp>
#include <xrpl/core/Coro.ipp>
namespace xrpl {
// postCoroTask — entry point for launching a C++20 coroutine on the JobQueue.
//
// Control Flow
// ============
//
// postCoroTask(t, name, f)
// |
// +-- 1. Check stopping_ — reject if JQ shutting down
// |
// +-- 2. ++nSuspend_ (mirrors Boost Coro ctor's implicit yield)
// | The coroutine is "suspended" from the JobQueue's perspective
// | even though it hasn't run yet — this keeps the JQ shutdown
// | logic correct (it waits for nSuspend_ to reach 0).
// |
// +-- 3. Create CoroTaskRunner (shared_ptr, ref-counted)
// |
// +-- 4. runner->init(f)
// | +-- Heap-allocate the lambda (FuncStore) to prevent
// | | dangling captures in the coroutine frame
// | +-- task_ = f(shared_from_this())
// | [coroutine created but NOT started — lazy initial_suspend]
// |
// +-- 5. runner->resume() (synchronous — runs on caller's thread)
// | +-- --nSuspend_
// | +-- task_.handle().resume()
// | | [coroutine body runs until first co_await or co_return]
// | +-- return runner to caller
//
// Why synchronous resume instead of async post()?
// ================================================
// The initial resume runs the coroutine body on the caller's thread to its
// first suspension point (co_await) or completion (co_return). This matches
// the behavioral pattern of the Boost Coro constructor, which ran coroutine
// setup code synchronously. The synchronous approach ensures the coroutine
// frame and its captured shared_ptr are in a determinate state before
// postCoroTask returns, eliminating a class of non-deterministic lifetime
// issues observed with async initial dispatch on GCC-12/15 debug builds.
//
template <class F>
std::shared_ptr<JobQueue::CoroTaskRunner>
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
std::shared_ptr<JobQueue::Coro>
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{
// Reject if the JQ is shutting down — matches addJob()'s stopping_ check.
// Must check before incrementing nSuspend_ to avoid leaving an orphan
// count that would cause stop() to hang.
if (stopping_)
return nullptr;
// Account for the initial suspension (CoroTask uses lazy start).
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<Coro>).
*/
auto coro = std::make_shared<Coro>(Coro_create_t{}, *this, t, name, std::forward<F>(f));
if (!coro->post())
{
std::lock_guard lock(m_mutex);
++nSuspend_;
// The Coro was not successfully posted. Disable it so it's destructor
// can run with no negative side effects. Then destroy it.
coro->expectEarlyExit();
coro.reset();
}
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
runner->init(std::forward<F>(f));
runner->resume();
return runner;
return coro;
}
} // namespace xrpl

View File

@@ -1,174 +0,0 @@
#pragma once
#include <xrpl/core/JobQueue.h>
#include <coroutine>
#include <memory>
namespace xrpl {
/**
* Awaiter that suspends and immediately reschedules on the JobQueue.
* Equivalent to calling yield() followed by post() in the old Coro API.
*
* Usage:
* co_await JobQueueAwaiter{runner};
*
* What it waits for: The coroutine is re-queued as a job and resumes
* when a worker thread picks it up.
*
* Which thread resumes: A JobQueue worker thread.
*
* What await_resume() returns: void.
*
* Dependency Diagram
* ==================
*
* JobQueueAwaiter
* +----------------------------------------------+
* | + runner : shared_ptr<CoroTaskRunner> |
* +----------------------------------------------+
* | + await_ready() -> false (always suspend) |
* | + await_suspend() -> bool (suspend or cancel) |
* | + await_resume() -> void |
* +----------------------------------------------+
* | |
* | uses | uses
* v v
* CoroTaskRunner JobQueue
* .onSuspend() (via runner->post() -> addJob)
* .onUndoSuspend()
* .post()
*
* Control Flow (await_suspend)
* ============================
*
* co_await JobQueueAwaiter{runner}
* |
* +-- await_ready() -> false
* +-- await_suspend(handle)
* |
* +-- runner->onSuspend() // ++nSuspend_
* +-- runner->post() // addJob to JobQueue
* | |
* | +-- success? return true // coroutine stays suspended
* | | // worker thread will call resume()
* | +-- failure? (JQ stopping)
* | +-- runner->onUndoSuspend() // --nSuspend_
* | +-- return false // coroutine continues immediately
* | // so it can clean up and co_return
*
* Usage Examples
* ==============
*
* 1. Yield and auto-repost (most common -- replaces yield() + post()):
*
* CoroTask<void> handler(auto runner) {
* doPartA();
* co_await JobQueueAwaiter{runner}; // yield + repost
* doPartB(); // runs on a worker thread
* co_return;
* }
*
* 2. Multiple yield points in a loop:
*
* CoroTask<void> batchProcessor(auto runner) {
* for (auto& item : items) {
* process(item);
* co_await JobQueueAwaiter{runner}; // let other jobs run
* }
* co_return;
* }
*
* 3. Graceful shutdown -- checking after resume:
*
* CoroTask<void> longTask(auto runner, JobQueue& jq) {
* while (hasWork()) {
* co_await JobQueueAwaiter{runner};
* // If JQ is stopping, await_suspend returns false and
* // the coroutine continues immediately without re-queuing.
* // Always check isStopping() to decide whether to proceed:
* if (jq.isStopping())
* co_return;
* doNextChunk();
* }
* co_return;
* }
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Using a stale or null runner.
* The runner shared_ptr must be valid and point to the CoroTaskRunner
* that owns the coroutine currently executing. Passing a runner from
* a different coroutine, or a default-constructed shared_ptr, is UB.
*
* BUG-RISK: Assuming resume happens on the same thread.
* After co_await JobQueueAwaiter, the coroutine resumes on whatever
* worker thread picks up the job. Do not rely on thread-local state
* unless it is managed through LocalValue (which CoroTaskRunner
* automatically swaps in/out).
*
* BUG-RISK: Ignoring the shutdown path.
* When the JobQueue is stopping, post() fails and await_suspend()
* returns false (coroutine does NOT actually suspend). The coroutine
* body continues immediately on the same thread. If your code after
* co_await assumes it was re-queued and is running on a worker thread,
* that assumption breaks during shutdown. Always handle the "JQ is
* stopping" case, either by checking jq.isStopping() or by letting
* the coroutine fall through to co_return naturally.
*
* DIFFERENCE from runner->suspend() + runner->post():
* JobQueueAwaiter combines both in one atomic operation. With the
* manual suspend()/post() pattern, there is a window between the
* two calls where an external event could race. JobQueueAwaiter
* removes that window -- onSuspend() and post() happen within the
* same await_suspend() call while the coroutine is guaranteed to
* be suspended. Prefer JobQueueAwaiter unless you need an external
* party to decide *when* to call post().
*/
struct JobQueueAwaiter
{
// The CoroTaskRunner that owns the currently executing coroutine.
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
/**
* Always returns false so the coroutine suspends.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Increment nSuspend (equivalent to yield()) and schedule resume
* on the JobQueue (equivalent to post()). If the JobQueue is
* stopping, undoes the suspend count and returns false so the
* coroutine continues immediately and can clean up.
*
* @return true if coroutine should stay suspended (job posted);
* false if coroutine should continue (JQ stopping)
*/
bool
await_suspend(std::coroutine_handle<>)
{
runner->onSuspend();
if (!runner->post())
{
// JobQueue is stopping. Undo the suspend count and
// don't actually suspend — the coroutine continues
// immediately so it can clean up and co_return.
runner->onUndoSuspend();
return false;
}
return true;
}
void
await_resume() const noexcept
{
}
};
} // namespace xrpl

View File

@@ -1,732 +0,0 @@
#pragma once
#include <xrpl/basics/Number.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/MPTIssue.h>
#include <xrpl/protocol/STLedgerEntry.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TER.h>
#include <cstdint>
#include <tuple>
#include <unordered_set>
namespace xrpl {
class ReadView;
#if GENERATING_DOCS
/**
* @brief Prototype for invariant check implementations.
*
* __THIS CLASS DOES NOT EXIST__ - or rather it exists in documentation only to
* communicate the interface required of any invariant checker. Any invariant
* check implementation should implement the public methods documented here.
*
*/
class InvariantChecker_PROTOTYPE
{
public:
explicit InvariantChecker_PROTOTYPE() = default;
/**
* @brief called for each ledger entry in the current transaction.
*
* @param isDelete true if the SLE is being deleted
* @param before ledger entry before modification by the transaction
* @param after ledger entry after modification by the transaction
*/
void
visitEntry(
bool isDelete,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after);
/**
* @brief called after all ledger entries have been visited to determine
* the final status of the check
*
* @param tx the transaction being applied
* @param tec the current TER result of the transaction
* @param fee the fee actually charged for this transaction
* @param view a ReadView of the ledger being modified
* @param j journal for logging
*
* @return true if check passes, false if it fails
*/
bool
finalize(
STTx const& tx,
TER const tec,
XRPAmount const fee,
ReadView const& view,
beast::Journal const& j);
};
#endif
/**
* @brief Invariant: We should never charge a transaction a negative fee or a
* fee that is larger than what the transaction itself specifies.
*
* We can, in some circumstances, charge less.
*/
class TransactionFeeCheck
{
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: A transaction must not create XRP and should only destroy
* the XRP fee.
*
* We iterate through all account roots, payment channels and escrow entries
* that were modified and calculate the net change in XRP caused by the
* transactions.
*/
class XRPNotCreated
{
std::int64_t drops_ = 0;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: we cannot remove an account ledger entry
*
* We iterate all account roots that were modified, and ensure that any that
* were present before the transaction was applied continue to be present
* afterwards unless they were explicitly deleted by a successful
* AccountDelete transaction.
*/
class AccountRootsNotDeleted
{
std::uint32_t accountsDeleted_ = 0;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: a deleted account must not have any objects left
*
* We iterate all deleted account roots, and ensure that there are no
* objects left that are directly accessible with that account's ID.
*
* There should only be one deleted account, but that's checked by
* AccountRootsNotDeleted. This invariant will handle multiple deleted account
* roots without a problem.
*/
class AccountRootsDeletedClean
{
// Pair is <before, after>. Before is used for most of the checks, so that
// if, for example, an object ID field is cleared, but the object is not
// deleted, it can still be found. After is used specifically for any checks
// that are expected as part of the deletion, such as zeroing out the
// balance.
std::vector<std::pair<std::shared_ptr<SLE const>, std::shared_ptr<SLE const>>> accountsDeleted_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: An account XRP balance must be in XRP and take a value
* between 0 and INITIAL_XRP drops, inclusive.
*
* We iterate all account roots modified by the transaction and ensure that
* their XRP balances are reasonable.
*/
class XRPBalanceChecks
{
bool bad_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: corresponding modified ledger entries should match in type
* and added entries should be a valid type.
*/
class LedgerEntryTypesMatch
{
bool typeMismatch_ = false;
bool invalidTypeAdded_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: Trust lines using XRP are not allowed.
*
* We iterate all the trust lines created by this transaction and ensure
* that they are against a valid issuer.
*/
class NoXRPTrustLines
{
bool xrpTrustLine_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: Trust lines with deep freeze flag are not allowed if normal
* freeze flag is not set.
*
* We iterate all the trust lines created by this transaction and ensure
* that they don't have deep freeze flag set without normal freeze flag set.
*/
class NoDeepFreezeTrustLinesWithoutFreeze
{
bool deepFreezeWithoutFreeze_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: frozen trust line balance change is not allowed.
*
* We iterate all affected trust lines and ensure that they don't have
* unexpected change of balance if they're frozen.
*/
class TransfersNotFrozen
{
struct BalanceChange
{
std::shared_ptr<SLE const> const line;
int const balanceChangeSign;
};
struct IssuerChanges
{
std::vector<BalanceChange> senders;
std::vector<BalanceChange> receivers;
};
using ByIssuer = std::map<Issue, IssuerChanges>;
ByIssuer balanceChanges_;
std::map<AccountID, std::shared_ptr<SLE const> const> possibleIssuers_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
private:
bool
isValidEntry(std::shared_ptr<SLE const> const& before, std::shared_ptr<SLE const> const& after);
STAmount
calculateBalanceChange(
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after,
bool isDelete);
void
recordBalance(Issue const& issue, BalanceChange change);
void
recordBalanceChanges(std::shared_ptr<SLE const> const& after, STAmount const& balanceChange);
std::shared_ptr<SLE const>
findIssuer(AccountID const& issuerID, ReadView const& view);
bool
validateIssuerChanges(
std::shared_ptr<SLE const> const& issuer,
IssuerChanges const& changes,
STTx const& tx,
beast::Journal const& j,
bool enforce);
bool
validateFrozenState(
BalanceChange const& change,
bool high,
STTx const& tx,
beast::Journal const& j,
bool enforce,
bool globalFreeze);
};
/**
* @brief Invariant: offers should be for non-negative amounts and must not
* be XRP to XRP.
*
* Examine all offers modified by the transaction and ensure that there are
* no offers which contain negative amounts or which exchange XRP for XRP.
*/
class NoBadOffers
{
bool bad_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: an escrow entry must take a value between 0 and
* INITIAL_XRP drops exclusive.
*/
class NoZeroEscrow
{
bool bad_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: a new account root must be the consequence of a payment,
* must have the right starting sequence, and the payment
* may not create more than one new account root.
*/
class ValidNewAccountRoot
{
std::uint32_t accountsCreated_ = 0;
std::uint32_t accountSeq_ = 0;
bool pseudoAccount_ = false;
std::uint32_t flags_ = 0;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: Validates several invariants for NFToken pages.
*
* The following checks are made:
* - The page is correctly associated with the owner.
* - The page is correctly ordered between the next and previous links.
* - The page contains at least one and no more than 32 NFTokens.
* - The NFTokens on this page do not belong on a lower or higher page.
* - The NFTokens are correctly sorted on the page.
* - Each URI, if present, is not empty.
*/
class ValidNFTokenPage
{
bool badEntry_ = false;
bool badLink_ = false;
bool badSort_ = false;
bool badURI_ = false;
bool invalidSize_ = false;
bool deletedFinalPage_ = false;
bool deletedLink_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: Validates counts of NFTokens after all transaction types.
*
* The following checks are made:
* - The number of minted or burned NFTokens can only be changed by
* NFTokenMint or NFTokenBurn transactions.
* - A successful NFTokenMint must increase the number of NFTokens.
* - A failed NFTokenMint must not change the number of minted NFTokens.
* - An NFTokenMint transaction cannot change the number of burned NFTokens.
* - A successful NFTokenBurn must increase the number of burned NFTokens.
* - A failed NFTokenBurn must not change the number of burned NFTokens.
* - An NFTokenBurn transaction cannot change the number of minted NFTokens.
*/
class NFTokenCountTracking
{
std::uint32_t beforeMintedTotal = 0;
std::uint32_t beforeBurnedTotal = 0;
std::uint32_t afterMintedTotal = 0;
std::uint32_t afterBurnedTotal = 0;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: Token holder's trustline balance cannot be negative after
* Clawback.
*
* We iterate all the trust lines affected by this transaction and ensure
* that no more than one trustline is modified, and also holder's balance is
* non-negative.
*/
class ValidClawback
{
std::uint32_t trustlinesChanged = 0;
std::uint32_t mptokensChanged = 0;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
class ValidMPTIssuance
{
std::uint32_t mptIssuancesCreated_ = 0;
std::uint32_t mptIssuancesDeleted_ = 0;
std::uint32_t mptokensCreated_ = 0;
std::uint32_t mptokensDeleted_ = 0;
// non-MPT transactions may attempt to create
// MPToken by an issuer
bool mptCreatedByIssuer_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariants: Permissioned Domains must have some rules and
* AcceptedCredentials must have length between 1 and 10 inclusive.
*
* Since only permissions constitute rules, an empty credentials list
* means that there are no rules and the invariant is violated.
*
* Credentials must be sorted and no duplicates allowed
*
*/
class ValidPermissionedDomain
{
struct SleStatus
{
std::size_t credentialsSize_{0};
bool isSorted_ = false;
bool isUnique_ = false;
bool isDelete_ = false;
};
std::vector<SleStatus> sleStatus_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariants: Pseudo-accounts have valid and consistent properties
*
* Pseudo-accounts have certain properties, and some of those properties are
* unique to pseudo-accounts. Check that all pseudo-accounts are following the
* rules, and that only pseudo-accounts look like pseudo-accounts.
*
*/
class ValidPseudoAccounts
{
std::vector<std::string> errors_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
class ValidPermissionedDEX
{
bool regularOffers_ = false;
bool badHybrids_ = false;
hash_set<uint256> domains_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
class ValidAMM
{
std::optional<AccountID> ammAccount_;
std::optional<STAmount> lptAMMBalanceAfter_;
std::optional<STAmount> lptAMMBalanceBefore_;
bool ammPoolChanged_;
public:
enum class ZeroAllowed : bool { No = false, Yes = true };
ValidAMM() : ammPoolChanged_{false}
{
}
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
private:
bool
finalizeBid(bool enforce, beast::Journal const&) const;
bool
finalizeVote(bool enforce, beast::Journal const&) const;
bool
finalizeCreate(STTx const&, ReadView const&, bool enforce, beast::Journal const&) const;
bool
finalizeDelete(bool enforce, TER res, beast::Journal const&) const;
bool
finalizeDeposit(STTx const&, ReadView const&, bool enforce, beast::Journal const&) const;
// Includes clawback
bool
finalizeWithdraw(STTx const&, ReadView const&, bool enforce, beast::Journal const&) const;
bool
finalizeDEX(bool enforce, beast::Journal const&) const;
bool
generalInvariant(STTx const&, ReadView const&, ZeroAllowed zeroAllowed, beast::Journal const&)
const;
};
/**
* @brief Invariants: Some fields are unmodifiable
*
* Check that any fields specified as unmodifiable are not modified when the
* object is modified. Creation and deletion are ignored.
*
*/
class NoModifiedUnmodifiableFields
{
// Pair is <before, after>.
std::set<std::pair<SLE::const_pointer, SLE::const_pointer>> changedEntries_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariants: Loan brokers are internally consistent
*
* 1. If `LoanBroker.OwnerCount = 0` the `DirectoryNode` will have at most one
* node (the root), which will only hold entries for `RippleState` or
* `MPToken` objects.
*
*/
class ValidLoanBroker
{
// Not all of these elements will necessarily be populated. Remaining items
// will be looked up as needed.
struct BrokerInfo
{
SLE::const_pointer brokerBefore = nullptr;
// After is used for most of the checks, except
// those that check changed values.
SLE::const_pointer brokerAfter = nullptr;
};
// Collect all the LoanBrokers found directly or indirectly through
// pseudo-accounts. Key is the brokerID / index. It will be used to find the
// LoanBroker object if brokerBefore and brokerAfter are nullptr
std::map<uint256, BrokerInfo> brokers_;
// Collect all the modified trust lines. Their high and low accounts will be
// loaded to look for LoanBroker pseudo-accounts.
std::vector<SLE::const_pointer> lines_;
// Collect all the modified MPTokens. Their accounts will be loaded to look
// for LoanBroker pseudo-accounts.
std::vector<SLE::const_pointer> mpts_;
bool
goodZeroDirectory(ReadView const& view, SLE::const_ref dir, beast::Journal const& j) const;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariants: Loans are internally consistent
*
* 1. If `Loan.PaymentRemaining = 0` then `Loan.PrincipalOutstanding = 0`
*
*/
class ValidLoan
{
// Pair is <before, after>. After is used for most of the checks, except
// those that check changed values.
std::vector<std::pair<SLE::const_pointer, SLE::const_pointer>> loans_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/*
* @brief Invariants: Vault object and MPTokenIssuance for vault shares
*
* - vault deleted and vault created is empty
* - vault created must be linked to pseudo-account for shares and assets
* - vault must have MPTokenIssuance for shares
* - vault without shares outstanding must have no shares
* - loss unrealized does not exceed the difference between assets total and
* assets available
* - assets available do not exceed assets total
* - vault deposit increases assets and share issuance, and adds to:
* total assets, assets available, shares outstanding
* - vault withdrawal and clawback reduce assets and share issuance, and
* subtracts from: total assets, assets available, shares outstanding
* - vault set must not alter the vault assets or shares balance
* - no vault transaction can change loss unrealized (it's updated by loan
* transactions)
*
*/
class ValidVault
{
Number static constexpr zero{};
struct Vault final
{
uint256 key = beast::zero;
Asset asset = {};
AccountID pseudoId = {};
AccountID owner = {};
uint192 shareMPTID = beast::zero;
Number assetsTotal = 0;
Number assetsAvailable = 0;
Number assetsMaximum = 0;
Number lossUnrealized = 0;
Vault static make(SLE const&);
};
struct Shares final
{
MPTIssue share = {};
std::uint64_t sharesTotal = 0;
std::uint64_t sharesMaximum = 0;
Shares static make(SLE const&);
};
std::vector<Vault> afterVault_ = {};
std::vector<Shares> afterMPTs_ = {};
std::vector<Vault> beforeVault_ = {};
std::vector<Shares> beforeMPTs_ = {};
std::unordered_map<uint256, Number> deltas_ = {};
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
// additional invariant checks can be declared above and then added to this
// tuple
using InvariantChecks = std::tuple<
TransactionFeeCheck,
AccountRootsNotDeleted,
AccountRootsDeletedClean,
LedgerEntryTypesMatch,
XRPBalanceChecks,
XRPNotCreated,
NoXRPTrustLines,
NoDeepFreezeTrustLinesWithoutFreeze,
TransfersNotFrozen,
NoBadOffers,
NoZeroEscrow,
ValidNewAccountRoot,
ValidNFTokenPage,
NFTokenCountTracking,
ValidClawback,
ValidMPTIssuance,
ValidPermissionedDomain,
ValidPermissionedDEX,
ValidAMM,
NoModifiedUnmodifiableFields,
ValidPseudoAccounts,
ValidLoanBroker,
ValidLoan,
ValidVault>;
/**
* @brief get a tuple of all invariant checks
*
* @return std::tuple of instances that implement the required invariant check
* methods
*
* @see xrpl::InvariantChecker_PROTOTYPE
*/
inline InvariantChecks
getInvariantChecks()
{
return InvariantChecks{};
}
} // namespace xrpl

View File

@@ -0,0 +1,53 @@
#pragma once
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/STAmount.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TER.h>
#include <optional>
namespace xrpl {
class ValidAMM
{
std::optional<AccountID> ammAccount_;
std::optional<STAmount> lptAMMBalanceAfter_;
std::optional<STAmount> lptAMMBalanceBefore_;
bool ammPoolChanged_;
public:
enum class ZeroAllowed : bool { No = false, Yes = true };
ValidAMM() : ammPoolChanged_{false}
{
}
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
private:
bool
finalizeBid(bool enforce, beast::Journal const&) const;
bool
finalizeVote(bool enforce, beast::Journal const&) const;
bool
finalizeCreate(STTx const&, ReadView const&, bool enforce, beast::Journal const&) const;
bool
finalizeDelete(bool enforce, TER res, beast::Journal const&) const;
bool
finalizeDeposit(STTx const&, ReadView const&, bool enforce, beast::Journal const&) const;
// Includes clawback
bool
finalizeWithdraw(STTx const&, ReadView const&, bool enforce, beast::Journal const&) const;
bool
finalizeDEX(bool enforce, beast::Journal const&) const;
bool
generalInvariant(STTx const&, ReadView const&, ZeroAllowed zeroAllowed, beast::Journal const&)
const;
};
} // namespace xrpl

View File

@@ -0,0 +1,84 @@
#pragma once
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/Issue.h>
#include <xrpl/protocol/STAmount.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TER.h>
#include <map>
#include <vector>
namespace xrpl {
/**
* @brief Invariant: frozen trust line balance change is not allowed.
*
* We iterate all affected trust lines and ensure that they don't have
* unexpected change of balance if they're frozen.
*/
class TransfersNotFrozen
{
struct BalanceChange
{
std::shared_ptr<SLE const> const line;
int const balanceChangeSign;
};
struct IssuerChanges
{
std::vector<BalanceChange> senders;
std::vector<BalanceChange> receivers;
};
using ByIssuer = std::map<Issue, IssuerChanges>;
ByIssuer balanceChanges_;
std::map<AccountID, std::shared_ptr<SLE const> const> possibleIssuers_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
private:
bool
isValidEntry(std::shared_ptr<SLE const> const& before, std::shared_ptr<SLE const> const& after);
STAmount
calculateBalanceChange(
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after,
bool isDelete);
void
recordBalance(Issue const& issue, BalanceChange change);
void
recordBalanceChanges(std::shared_ptr<SLE const> const& after, STAmount const& balanceChange);
std::shared_ptr<SLE const>
findIssuer(AccountID const& issuerID, ReadView const& view);
bool
validateIssuerChanges(
std::shared_ptr<SLE const> const& issuer,
IssuerChanges const& changes,
STTx const& tx,
beast::Journal const& j,
bool enforce);
bool
validateFrozenState(
BalanceChange const& change,
bool high,
STTx const& tx,
beast::Journal const& j,
bool enforce,
bool globalFreeze);
};
} // namespace xrpl

View File

@@ -0,0 +1,385 @@
#pragma once
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TER.h>
#include <xrpl/tx/invariants/AMMInvariant.h>
#include <xrpl/tx/invariants/FreezeInvariant.h>
#include <xrpl/tx/invariants/LoanInvariant.h>
#include <xrpl/tx/invariants/MPTInvariant.h>
#include <xrpl/tx/invariants/NFTInvariant.h>
#include <xrpl/tx/invariants/PermissionedDEXInvariant.h>
#include <xrpl/tx/invariants/PermissionedDomainInvariant.h>
#include <xrpl/tx/invariants/VaultInvariant.h>
#include <cstdint>
#include <tuple>
namespace xrpl {
#if GENERATING_DOCS
/**
* @brief Prototype for invariant check implementations.
*
* __THIS CLASS DOES NOT EXIST__ - or rather it exists in documentation only to
* communicate the interface required of any invariant checker. Any invariant
* check implementation should implement the public methods documented here.
*
*/
class InvariantChecker_PROTOTYPE
{
public:
explicit InvariantChecker_PROTOTYPE() = default;
/**
* @brief called for each ledger entry in the current transaction.
*
* @param isDelete true if the SLE is being deleted
* @param before ledger entry before modification by the transaction
* @param after ledger entry after modification by the transaction
*/
void
visitEntry(
bool isDelete,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after);
/**
* @brief called after all ledger entries have been visited to determine
* the final status of the check
*
* @param tx the transaction being applied
* @param tec the current TER result of the transaction
* @param fee the fee actually charged for this transaction
* @param view a ReadView of the ledger being modified
* @param j journal for logging
*
* @return true if check passes, false if it fails
*/
bool
finalize(
STTx const& tx,
TER const tec,
XRPAmount const fee,
ReadView const& view,
beast::Journal const& j);
};
#endif
/**
* @brief Invariant: We should never charge a transaction a negative fee or a
* fee that is larger than what the transaction itself specifies.
*
* We can, in some circumstances, charge less.
*/
class TransactionFeeCheck
{
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: A transaction must not create XRP and should only destroy
* the XRP fee.
*
* We iterate through all account roots, payment channels and escrow entries
* that were modified and calculate the net change in XRP caused by the
* transactions.
*/
class XRPNotCreated
{
std::int64_t drops_ = 0;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: we cannot remove an account ledger entry
*
* We iterate all account roots that were modified, and ensure that any that
* were present before the transaction was applied continue to be present
* afterwards unless they were explicitly deleted by a successful
* AccountDelete transaction.
*/
class AccountRootsNotDeleted
{
std::uint32_t accountsDeleted_ = 0;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: a deleted account must not have any objects left
*
* We iterate all deleted account roots, and ensure that there are no
* objects left that are directly accessible with that account's ID.
*
* There should only be one deleted account, but that's checked by
* AccountRootsNotDeleted. This invariant will handle multiple deleted account
* roots without a problem.
*/
class AccountRootsDeletedClean
{
// Pair is <before, after>. Before is used for most of the checks, so that
// if, for example, an object ID field is cleared, but the object is not
// deleted, it can still be found. After is used specifically for any checks
// that are expected as part of the deletion, such as zeroing out the
// balance.
std::vector<std::pair<std::shared_ptr<SLE const>, std::shared_ptr<SLE const>>> accountsDeleted_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: An account XRP balance must be in XRP and take a value
* between 0 and INITIAL_XRP drops, inclusive.
*
* We iterate all account roots modified by the transaction and ensure that
* their XRP balances are reasonable.
*/
class XRPBalanceChecks
{
bool bad_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: corresponding modified ledger entries should match in type
* and added entries should be a valid type.
*/
class LedgerEntryTypesMatch
{
bool typeMismatch_ = false;
bool invalidTypeAdded_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: Trust lines using XRP are not allowed.
*
* We iterate all the trust lines created by this transaction and ensure
* that they are against a valid issuer.
*/
class NoXRPTrustLines
{
bool xrpTrustLine_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: Trust lines with deep freeze flag are not allowed if normal
* freeze flag is not set.
*
* We iterate all the trust lines created by this transaction and ensure
* that they don't have deep freeze flag set without normal freeze flag set.
*/
class NoDeepFreezeTrustLinesWithoutFreeze
{
bool deepFreezeWithoutFreeze_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: offers should be for non-negative amounts and must not
* be XRP to XRP.
*
* Examine all offers modified by the transaction and ensure that there are
* no offers which contain negative amounts or which exchange XRP for XRP.
*/
class NoBadOffers
{
bool bad_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: an escrow entry must take a value between 0 and
* INITIAL_XRP drops exclusive.
*/
class NoZeroEscrow
{
bool bad_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: a new account root must be the consequence of a payment,
* must have the right starting sequence, and the payment
* may not create more than one new account root.
*/
class ValidNewAccountRoot
{
std::uint32_t accountsCreated_ = 0;
std::uint32_t accountSeq_ = 0;
bool pseudoAccount_ = false;
std::uint32_t flags_ = 0;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: Token holder's trustline balance cannot be negative after
* Clawback.
*
* We iterate all the trust lines affected by this transaction and ensure
* that no more than one trustline is modified, and also holder's balance is
* non-negative.
*/
class ValidClawback
{
std::uint32_t trustlinesChanged = 0;
std::uint32_t mptokensChanged = 0;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariants: Pseudo-accounts have valid and consistent properties
*
* Pseudo-accounts have certain properties, and some of those properties are
* unique to pseudo-accounts. Check that all pseudo-accounts are following the
* rules, and that only pseudo-accounts look like pseudo-accounts.
*
*/
class ValidPseudoAccounts
{
std::vector<std::string> errors_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariants: Some fields are unmodifiable
*
* Check that any fields specified as unmodifiable are not modified when the
* object is modified. Creation and deletion are ignored.
*
*/
class NoModifiedUnmodifiableFields
{
// Pair is <before, after>.
std::set<std::pair<SLE::const_pointer, SLE::const_pointer>> changedEntries_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
// additional invariant checks can be declared above and then added to this
// tuple
using InvariantChecks = std::tuple<
TransactionFeeCheck,
AccountRootsNotDeleted,
AccountRootsDeletedClean,
LedgerEntryTypesMatch,
XRPBalanceChecks,
XRPNotCreated,
NoXRPTrustLines,
NoDeepFreezeTrustLinesWithoutFreeze,
TransfersNotFrozen,
NoBadOffers,
NoZeroEscrow,
ValidNewAccountRoot,
ValidNFTokenPage,
NFTokenCountTracking,
ValidClawback,
ValidMPTIssuance,
ValidPermissionedDomain,
ValidPermissionedDEX,
ValidAMM,
NoModifiedUnmodifiableFields,
ValidPseudoAccounts,
ValidLoanBroker,
ValidLoan,
ValidVault>;
/**
* @brief get a tuple of all invariant checks
*
* @return std::tuple of instances that implement the required invariant check
* methods
*
* @see xrpl::InvariantChecker_PROTOTYPE
*/
inline InvariantChecks
getInvariantChecks()
{
return InvariantChecks{};
}
} // namespace xrpl

View File

@@ -0,0 +1,60 @@
#pragma once
#include <xrpl/protocol/STTx.h>
#include <type_traits>
namespace xrpl {
/*
assert(enforce)
There are several asserts (or XRPL_ASSERTs) in invariant check files that check
a variable named `enforce` when an invariant fails. At first glance, those
asserts may look incorrect, but they are not.
Those asserts take advantage of two facts:
1. `asserts` are not (normally) executed in release builds.
2. Invariants should *never* fail, except in tests that specifically modify
the open ledger to break them.
This makes `assert(enforce)` sort of a second-layer of invariant enforcement
aimed at _developers_. It's designed to fire if a developer writes code that
violates an invariant, and runs it in unit tests or a develop build that _does
not have the relevant amendments enabled_. It's intentionally a pain in the neck
so that bad code gets caught and fixed as early as possible.
*/
enum Privilege {
noPriv = 0x0000, // The transaction can not do any of the enumerated operations
createAcct = 0x0001, // The transaction can create a new ACCOUNT_ROOT object.
createPseudoAcct = 0x0002, // The transaction can create a pseudo account,
// which implies createAcct
mustDeleteAcct = 0x0004, // The transaction must delete an ACCOUNT_ROOT object
mayDeleteAcct = 0x0008, // The transaction may delete an ACCOUNT_ROOT
// object, but does not have to
overrideFreeze = 0x0010, // The transaction can override some freeze rules
changeNFTCounts = 0x0020, // The transaction can mint or burn an NFT
createMPTIssuance = 0x0040, // The transaction can create a new MPT issuance
destroyMPTIssuance = 0x0080, // The transaction can destroy an MPT issuance
mustAuthorizeMPT = 0x0100, // The transaction MUST create or delete an MPT
// object (except by issuer)
mayAuthorizeMPT = 0x0200, // The transaction MAY create or delete an MPT
// object (except by issuer)
mayDeleteMPT = 0x0400, // The transaction MAY delete an MPT object. May not create.
mustModifyVault = 0x0800, // The transaction must modify, delete or create, a vault
mayModifyVault = 0x1000, // The transaction MAY modify, delete or create, a vault
};
constexpr Privilege
operator|(Privilege lhs, Privilege rhs)
{
return safe_cast<Privilege>(
safe_cast<std::underlying_type_t<Privilege>>(lhs) |
safe_cast<std::underlying_type_t<Privilege>>(rhs));
}
bool
hasPrivilege(STTx const& tx, Privilege priv);
} // namespace xrpl

View File

@@ -0,0 +1,75 @@
#pragma once
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TER.h>
#include <map>
#include <vector>
namespace xrpl {
/**
* @brief Invariants: Loan brokers are internally consistent
*
* 1. If `LoanBroker.OwnerCount = 0` the `DirectoryNode` will have at most one
* node (the root), which will only hold entries for `RippleState` or
* `MPToken` objects.
*
*/
class ValidLoanBroker
{
// Not all of these elements will necessarily be populated. Remaining items
// will be looked up as needed.
struct BrokerInfo
{
SLE::const_pointer brokerBefore = nullptr;
// After is used for most of the checks, except
// those that check changed values.
SLE::const_pointer brokerAfter = nullptr;
};
// Collect all the LoanBrokers found directly or indirectly through
// pseudo-accounts. Key is the brokerID / index. It will be used to find the
// LoanBroker object if brokerBefore and brokerAfter are nullptr
std::map<uint256, BrokerInfo> brokers_;
// Collect all the modified trust lines. Their high and low accounts will be
// loaded to look for LoanBroker pseudo-accounts.
std::vector<SLE::const_pointer> lines_;
// Collect all the modified MPTokens. Their accounts will be loaded to look
// for LoanBroker pseudo-accounts.
std::vector<SLE::const_pointer> mpts_;
bool
goodZeroDirectory(ReadView const& view, SLE::const_ref dir, beast::Journal const& j) const;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariants: Loans are internally consistent
*
* 1. If `Loan.PaymentRemaining = 0` then `Loan.PrincipalOutstanding = 0`
*
*/
class ValidLoan
{
// Pair is <before, after>. After is used for most of the checks, except
// those that check changed values.
std::vector<std::pair<SLE::const_pointer, SLE::const_pointer>> loans_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
} // namespace xrpl

View File

@@ -0,0 +1,31 @@
#pragma once
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TER.h>
#include <cstdint>
namespace xrpl {
class ValidMPTIssuance
{
std::uint32_t mptIssuancesCreated_ = 0;
std::uint32_t mptIssuancesDeleted_ = 0;
std::uint32_t mptokensCreated_ = 0;
std::uint32_t mptokensDeleted_ = 0;
// non-MPT transactions may attempt to create
// MPToken by an issuer
bool mptCreatedByIssuer_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
} // namespace xrpl

View File

@@ -0,0 +1,70 @@
#pragma once
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TER.h>
#include <cstdint>
namespace xrpl {
/**
* @brief Invariant: Validates several invariants for NFToken pages.
*
* The following checks are made:
* - The page is correctly associated with the owner.
* - The page is correctly ordered between the next and previous links.
* - The page contains at least one and no more than 32 NFTokens.
* - The NFTokens on this page do not belong on a lower or higher page.
* - The NFTokens are correctly sorted on the page.
* - Each URI, if present, is not empty.
*/
class ValidNFTokenPage
{
bool badEntry_ = false;
bool badLink_ = false;
bool badSort_ = false;
bool badURI_ = false;
bool invalidSize_ = false;
bool deletedFinalPage_ = false;
bool deletedLink_ = false;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
/**
* @brief Invariant: Validates counts of NFTokens after all transaction types.
*
* The following checks are made:
* - The number of minted or burned NFTokens can only be changed by
* NFTokenMint or NFTokenBurn transactions.
* - A successful NFTokenMint must increase the number of NFTokens.
* - A failed NFTokenMint must not change the number of minted NFTokens.
* - An NFTokenMint transaction cannot change the number of burned NFTokens.
* - A successful NFTokenBurn must increase the number of burned NFTokens.
* - A failed NFTokenBurn must not change the number of burned NFTokens.
* - An NFTokenBurn transaction cannot change the number of minted NFTokens.
*/
class NFTokenCountTracking
{
std::uint32_t beforeMintedTotal = 0;
std::uint32_t beforeBurnedTotal = 0;
std::uint32_t afterMintedTotal = 0;
std::uint32_t afterBurnedTotal = 0;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
} // namespace xrpl

View File

@@ -0,0 +1,25 @@
#pragma once
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TER.h>
namespace xrpl {
class ValidPermissionedDEX
{
bool regularOffers_ = false;
bool badHybrids_ = false;
hash_set<uint256> domains_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
} // namespace xrpl

View File

@@ -0,0 +1,41 @@
#pragma once
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TER.h>
#include <vector>
namespace xrpl {
/**
* @brief Invariants: Permissioned Domains must have some rules and
* AcceptedCredentials must have length between 1 and 10 inclusive.
*
* Since only permissions constitute rules, an empty credentials list
* means that there are no rules and the invariant is violated.
*
* Credentials must be sorted and no duplicates allowed
*
*/
class ValidPermissionedDomain
{
struct SleStatus
{
std::size_t credentialsSize_{0};
bool isSorted_ = false;
bool isUnique_ = false;
bool isDelete_ = false;
};
std::vector<SleStatus> sleStatus_;
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
} // namespace xrpl

View File

@@ -0,0 +1,77 @@
#pragma once
#include <xrpl/basics/Number.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/MPTIssue.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TER.h>
#include <unordered_map>
#include <vector>
namespace xrpl {
/*
* @brief Invariants: Vault object and MPTokenIssuance for vault shares
*
* - vault deleted and vault created is empty
* - vault created must be linked to pseudo-account for shares and assets
* - vault must have MPTokenIssuance for shares
* - vault without shares outstanding must have no shares
* - loss unrealized does not exceed the difference between assets total and
* assets available
* - assets available do not exceed assets total
* - vault deposit increases assets and share issuance, and adds to:
* total assets, assets available, shares outstanding
* - vault withdrawal and clawback reduce assets and share issuance, and
* subtracts from: total assets, assets available, shares outstanding
* - vault set must not alter the vault assets or shares balance
* - no vault transaction can change loss unrealized (it's updated by loan
* transactions)
*
*/
class ValidVault
{
Number static constexpr zero{};
struct Vault final
{
uint256 key = beast::zero;
Asset asset = {};
AccountID pseudoId = {};
AccountID owner = {};
uint192 shareMPTID = beast::zero;
Number assetsTotal = 0;
Number assetsAvailable = 0;
Number assetsMaximum = 0;
Number lossUnrealized = 0;
Vault static make(SLE const&);
};
struct Shares final
{
MPTIssue share = {};
std::uint64_t sharesTotal = 0;
std::uint64_t sharesMaximum = 0;
Shares static make(SLE const&);
};
std::vector<Vault> afterVault_ = {};
std::vector<Shares> afterMPTs_ = {};
std::vector<Vault> beforeVault_ = {};
std::vector<Shares> beforeMPTs_ = {};
std::unordered_map<uint256, Number> deltas_ = {};
public:
void
visitEntry(bool, std::shared_ptr<SLE const> const&, std::shared_ptr<SLE const> const&);
bool
finalize(STTx const&, TER const, XRPAmount const, ReadView const&, beast::Journal const&);
};
} // namespace xrpl

View File

@@ -1,8 +1,9 @@
#include <xrpl/tx/ApplyContext.h>
//
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/json/to_string.h>
#include <xrpl/tx/ApplyContext.h>
#include <xrpl/tx/InvariantCheck.h>
#include <xrpl/tx/invariants/InvariantCheck.h>
namespace xrpl {

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,305 @@
#include <xrpl/tx/invariants/AMMInvariant.h>
//
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/tx/transactors/AMM/AMMHelpers.h>
#include <xrpl/tx/transactors/AMM/AMMUtils.h>
namespace xrpl {
void
ValidAMM::visitEntry(
bool isDelete,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
if (isDelete)
return;
if (after)
{
auto const type = after->getType();
// AMM object changed
if (type == ltAMM)
{
ammAccount_ = after->getAccountID(sfAccount);
lptAMMBalanceAfter_ = after->getFieldAmount(sfLPTokenBalance);
}
// AMM pool changed
else if (
(type == ltRIPPLE_STATE && after->getFlags() & lsfAMMNode) ||
(type == ltACCOUNT_ROOT && after->isFieldPresent(sfAMMID)))
{
ammPoolChanged_ = true;
}
}
if (before)
{
// AMM object changed
if (before->getType() == ltAMM)
{
lptAMMBalanceBefore_ = before->getFieldAmount(sfLPTokenBalance);
}
}
}
static bool
validBalances(
STAmount const& amount,
STAmount const& amount2,
STAmount const& lptAMMBalance,
ValidAMM::ZeroAllowed zeroAllowed)
{
bool const positive =
amount > beast::zero && amount2 > beast::zero && lptAMMBalance > beast::zero;
if (zeroAllowed == ValidAMM::ZeroAllowed::Yes)
return positive ||
(amount == beast::zero && amount2 == beast::zero && lptAMMBalance == beast::zero);
return positive;
}
bool
ValidAMM::finalizeVote(bool enforce, beast::Journal const& j) const
{
if (lptAMMBalanceAfter_ != lptAMMBalanceBefore_ || ammPoolChanged_)
{
// LPTokens and the pool can not change on vote
// LCOV_EXCL_START
JLOG(j.error()) << "AMMVote invariant failed: " << lptAMMBalanceBefore_.value_or(STAmount{})
<< " " << lptAMMBalanceAfter_.value_or(STAmount{}) << " "
<< ammPoolChanged_;
if (enforce)
return false;
// LCOV_EXCL_STOP
}
return true;
}
bool
ValidAMM::finalizeBid(bool enforce, beast::Journal const& j) const
{
if (ammPoolChanged_)
{
// The pool can not change on bid
// LCOV_EXCL_START
JLOG(j.error()) << "AMMBid invariant failed: pool changed";
if (enforce)
return false;
// LCOV_EXCL_STOP
}
// LPTokens are burnt, therefore there should be fewer LPTokens
else if (
lptAMMBalanceBefore_ && lptAMMBalanceAfter_ &&
(*lptAMMBalanceAfter_ > *lptAMMBalanceBefore_ || *lptAMMBalanceAfter_ <= beast::zero))
{
// LCOV_EXCL_START
JLOG(j.error()) << "AMMBid invariant failed: " << *lptAMMBalanceBefore_ << " "
<< *lptAMMBalanceAfter_;
if (enforce)
return false;
// LCOV_EXCL_STOP
}
return true;
}
bool
ValidAMM::finalizeCreate(
STTx const& tx,
ReadView const& view,
bool enforce,
beast::Journal const& j) const
{
if (!ammAccount_)
{
// LCOV_EXCL_START
JLOG(j.error()) << "AMMCreate invariant failed: AMM object is not created";
if (enforce)
return false;
// LCOV_EXCL_STOP
}
else
{
auto const [amount, amount2] = ammPoolHolds(
view,
*ammAccount_,
tx[sfAmount].get<Issue>(),
tx[sfAmount2].get<Issue>(),
fhIGNORE_FREEZE,
j);
// Create invariant:
// sqrt(amount * amount2) == LPTokens
// all balances are greater than zero
if (!validBalances(amount, amount2, *lptAMMBalanceAfter_, ZeroAllowed::No) ||
ammLPTokens(amount, amount2, lptAMMBalanceAfter_->issue()) != *lptAMMBalanceAfter_)
{
JLOG(j.error()) << "AMMCreate invariant failed: " << amount << " " << amount2 << " "
<< *lptAMMBalanceAfter_;
if (enforce)
return false;
}
}
return true;
}
bool
ValidAMM::finalizeDelete(bool enforce, TER res, beast::Journal const& j) const
{
if (ammAccount_)
{
// LCOV_EXCL_START
std::string const msg = (res == tesSUCCESS) ? "AMM object is not deleted on tesSUCCESS"
: "AMM object is changed on tecINCOMPLETE";
JLOG(j.error()) << "AMMDelete invariant failed: " << msg;
if (enforce)
return false;
// LCOV_EXCL_STOP
}
return true;
}
bool
ValidAMM::finalizeDEX(bool enforce, beast::Journal const& j) const
{
if (ammAccount_)
{
// LCOV_EXCL_START
JLOG(j.error()) << "AMM swap invariant failed: AMM object changed";
if (enforce)
return false;
// LCOV_EXCL_STOP
}
return true;
}
bool
ValidAMM::generalInvariant(
xrpl::STTx const& tx,
xrpl::ReadView const& view,
ZeroAllowed zeroAllowed,
beast::Journal const& j) const
{
auto const [amount, amount2] = ammPoolHolds(
view,
*ammAccount_,
tx[sfAsset].get<Issue>(),
tx[sfAsset2].get<Issue>(),
fhIGNORE_FREEZE,
j);
// Deposit and Withdrawal invariant:
// sqrt(amount * amount2) >= LPTokens
// all balances are greater than zero
// unless on last withdrawal
auto const poolProductMean = root2(amount * amount2);
bool const nonNegativeBalances =
validBalances(amount, amount2, *lptAMMBalanceAfter_, zeroAllowed);
bool const strongInvariantCheck = poolProductMean >= *lptAMMBalanceAfter_;
// Allow for a small relative error if strongInvariantCheck fails
auto weakInvariantCheck = [&]() {
return *lptAMMBalanceAfter_ != beast::zero &&
withinRelativeDistance(poolProductMean, Number{*lptAMMBalanceAfter_}, Number{1, -11});
};
if (!nonNegativeBalances || (!strongInvariantCheck && !weakInvariantCheck()))
{
JLOG(j.error()) << "AMM " << tx.getTxnType()
<< " invariant failed: " << tx.getHash(HashPrefix::transactionID) << " "
<< ammPoolChanged_ << " " << amount << " " << amount2 << " "
<< poolProductMean << " " << lptAMMBalanceAfter_->getText() << " "
<< ((*lptAMMBalanceAfter_ == beast::zero)
? Number{1}
: ((*lptAMMBalanceAfter_ - poolProductMean) / poolProductMean));
return false;
}
return true;
}
bool
ValidAMM::finalizeDeposit(
xrpl::STTx const& tx,
xrpl::ReadView const& view,
bool enforce,
beast::Journal const& j) const
{
if (!ammAccount_)
{
// LCOV_EXCL_START
JLOG(j.error()) << "AMMDeposit invariant failed: AMM object is deleted";
if (enforce)
return false;
// LCOV_EXCL_STOP
}
else if (!generalInvariant(tx, view, ZeroAllowed::No, j) && enforce)
return false;
return true;
}
bool
ValidAMM::finalizeWithdraw(
xrpl::STTx const& tx,
xrpl::ReadView const& view,
bool enforce,
beast::Journal const& j) const
{
if (!ammAccount_)
{
// Last Withdraw or Clawback deleted AMM
}
else if (!generalInvariant(tx, view, ZeroAllowed::Yes, j))
{
if (enforce)
return false;
}
return true;
}
bool
ValidAMM::finalize(
STTx const& tx,
TER const result,
XRPAmount const,
ReadView const& view,
beast::Journal const& j)
{
// Delete may return tecINCOMPLETE if there are too many
// trustlines to delete.
if (result != tesSUCCESS && result != tecINCOMPLETE)
return true;
bool const enforce = view.rules().enabled(fixAMMv1_3);
switch (tx.getTxnType())
{
case ttAMM_CREATE:
return finalizeCreate(tx, view, enforce, j);
case ttAMM_DEPOSIT:
return finalizeDeposit(tx, view, enforce, j);
case ttAMM_CLAWBACK:
case ttAMM_WITHDRAW:
return finalizeWithdraw(tx, view, enforce, j);
case ttAMM_BID:
return finalizeBid(enforce, j);
case ttAMM_VOTE:
return finalizeVote(enforce, j);
case ttAMM_DELETE:
return finalizeDelete(enforce, result, j);
case ttCHECK_CASH:
case ttOFFER_CREATE:
case ttPAYMENT:
return finalizeDEX(enforce, j);
default:
break;
}
return true;
}
} // namespace xrpl

View File

@@ -0,0 +1,278 @@
#include <xrpl/tx/invariants/FreezeInvariant.h>
//
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/tx/invariants/InvariantCheckPrivilege.h>
namespace xrpl {
void
TransfersNotFrozen::visitEntry(
bool isDelete,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
/*
* A trust line freeze state alone doesn't determine if a transfer is
* frozen. The transfer must be examined "end-to-end" because both sides of
* the transfer may have different freeze states and freeze impact depends
* on the transfer direction. This is why first we need to track the
* transfers using IssuerChanges senders/receivers.
*
* Only in validateIssuerChanges, after we collected all changes can we
* determine if the transfer is valid.
*/
if (!isValidEntry(before, after))
{
return;
}
auto const balanceChange = calculateBalanceChange(before, after, isDelete);
if (balanceChange.signum() == 0)
{
return;
}
recordBalanceChanges(after, balanceChange);
}
bool
TransfersNotFrozen::finalize(
STTx const& tx,
TER const ter,
XRPAmount const fee,
ReadView const& view,
beast::Journal const& j)
{
/*
* We check this invariant regardless of deep freeze amendment status,
* allowing for detection and logging of potential issues even when the
* amendment is disabled.
*
* If an exploit that allows moving frozen assets is discovered,
* we can alert operators who monitor fatal messages and trigger assert in
* debug builds for an early warning.
*
* In an unlikely event that an exploit is found, this early detection
* enables encouraging the UNL to expedite deep freeze amendment activation
* or deploy hotfixes via new amendments. In case of a new amendment, we'd
* only have to change this line setting 'enforce' variable.
* enforce = view.rules().enabled(featureDeepFreeze) ||
* view.rules().enabled(fixFreezeExploit);
*/
[[maybe_unused]] bool const enforce = view.rules().enabled(featureDeepFreeze);
for (auto const& [issue, changes] : balanceChanges_)
{
auto const issuerSle = findIssuer(issue.account, view);
// It should be impossible for the issuer to not be found, but check
// just in case so rippled doesn't crash in release.
if (!issuerSle)
{
// The comment above starting with "assert(enforce)" explains this
// assert.
XRPL_ASSERT(
enforce,
"xrpl::TransfersNotFrozen::finalize : enforce "
"invariant.");
if (enforce)
{
return false;
}
continue;
}
if (!validateIssuerChanges(issuerSle, changes, tx, j, enforce))
{
return false;
}
}
return true;
}
bool
TransfersNotFrozen::isValidEntry(
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
// `after` can never be null, even if the trust line is deleted.
XRPL_ASSERT(after, "xrpl::TransfersNotFrozen::isValidEntry : valid after.");
if (!after)
{
return false;
}
if (after->getType() == ltACCOUNT_ROOT)
{
possibleIssuers_.emplace(after->at(sfAccount), after);
return false;
}
/* While LedgerEntryTypesMatch invariant also checks types, all invariants
* are processed regardless of previous failures.
*
* This type check is still necessary here because it prevents potential
* issues in subsequent processing.
*/
return after->getType() == ltRIPPLE_STATE && (!before || before->getType() == ltRIPPLE_STATE);
}
STAmount
TransfersNotFrozen::calculateBalanceChange(
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after,
bool isDelete)
{
auto const getBalance = [](auto const& line, auto const& other, bool zero) {
STAmount amt = line ? line->at(sfBalance) : other->at(sfBalance).zeroed();
return zero ? amt.zeroed() : amt;
};
/* Trust lines can be created dynamically by other transactions such as
* Payment and OfferCreate that cross offers. Such trust line won't be
* created frozen, but the sender might be, so the starting balance must be
* treated as zero.
*/
auto const balanceBefore = getBalance(before, after, false);
/* Same as above, trust lines can be dynamically deleted, and for frozen
* trust lines, payments not involving the issuer must be blocked. This is
* achieved by treating the final balance as zero when isDelete=true to
* ensure frozen line restrictions are enforced even during deletion.
*/
auto const balanceAfter = getBalance(after, before, isDelete);
return balanceAfter - balanceBefore;
}
void
TransfersNotFrozen::recordBalance(Issue const& issue, BalanceChange change)
{
XRPL_ASSERT(
change.balanceChangeSign,
"xrpl::TransfersNotFrozen::recordBalance : valid trustline "
"balance sign.");
auto& changes = balanceChanges_[issue];
if (change.balanceChangeSign < 0)
changes.senders.emplace_back(std::move(change));
else
changes.receivers.emplace_back(std::move(change));
}
void
TransfersNotFrozen::recordBalanceChanges(
std::shared_ptr<SLE const> const& after,
STAmount const& balanceChange)
{
auto const balanceChangeSign = balanceChange.signum();
auto const currency = after->at(sfBalance).getCurrency();
// Change from low account's perspective, which is trust line default
recordBalance({currency, after->at(sfHighLimit).getIssuer()}, {after, balanceChangeSign});
// Change from high account's perspective, which reverses the sign.
recordBalance({currency, after->at(sfLowLimit).getIssuer()}, {after, -balanceChangeSign});
}
std::shared_ptr<SLE const>
TransfersNotFrozen::findIssuer(AccountID const& issuerID, ReadView const& view)
{
if (auto it = possibleIssuers_.find(issuerID); it != possibleIssuers_.end())
{
return it->second;
}
return view.read(keylet::account(issuerID));
}
bool
TransfersNotFrozen::validateIssuerChanges(
std::shared_ptr<SLE const> const& issuer,
IssuerChanges const& changes,
STTx const& tx,
beast::Journal const& j,
bool enforce)
{
if (!issuer)
{
return false;
}
bool const globalFreeze = issuer->isFlag(lsfGlobalFreeze);
if (changes.receivers.empty() || changes.senders.empty())
{
/* If there are no receivers, then the holder(s) are returning
* their tokens to the issuer. Likewise, if there are no
* senders, then the issuer is issuing tokens to the holder(s).
* This is allowed regardless of the issuer's freeze flags. (The
* holder may have contradicting freeze flags, but that will be
* checked when the holder is treated as issuer.)
*/
return true;
}
for (auto const& actors : {changes.senders, changes.receivers})
{
for (auto const& change : actors)
{
bool const high = change.line->at(sfLowLimit).getIssuer() == issuer->at(sfAccount);
if (!validateFrozenState(change, high, tx, j, enforce, globalFreeze))
{
return false;
}
}
}
return true;
}
bool
TransfersNotFrozen::validateFrozenState(
BalanceChange const& change,
bool high,
STTx const& tx,
beast::Journal const& j,
bool enforce,
bool globalFreeze)
{
bool const freeze =
change.balanceChangeSign < 0 && change.line->isFlag(high ? lsfLowFreeze : lsfHighFreeze);
bool const deepFreeze = change.line->isFlag(high ? lsfLowDeepFreeze : lsfHighDeepFreeze);
bool const frozen = globalFreeze || deepFreeze || freeze;
bool const isAMMLine = change.line->isFlag(lsfAMMNode);
if (!frozen)
{
return true;
}
// AMMClawbacks are allowed to override some freeze rules
if ((!isAMMLine || globalFreeze) && hasPrivilege(tx, overrideFreeze))
{
JLOG(j.debug()) << "Invariant check allowing funds to be moved "
<< (change.balanceChangeSign > 0 ? "to" : "from")
<< " a frozen trustline for AMMClawback " << tx.getTransactionID();
return true;
}
JLOG(j.fatal()) << "Invariant failed: Attempting to move frozen funds for "
<< tx.getTransactionID();
// The comment above starting with "assert(enforce)" explains this assert.
XRPL_ASSERT(
enforce,
"xrpl::TransfersNotFrozen::validateFrozenState : enforce "
"invariant.");
if (enforce)
{
return false;
}
return true;
}
} // namespace xrpl

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,278 @@
#include <xrpl/tx/invariants/LoanInvariant.h>
//
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/LedgerFormats.h>
#include <xrpl/protocol/STNumber.h>
#include <xrpl/protocol/TxFormats.h>
namespace xrpl {
void
ValidLoanBroker::visitEntry(
bool isDelete,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
if (after)
{
if (after->getType() == ltLOAN_BROKER)
{
auto& broker = brokers_[after->key()];
broker.brokerBefore = before;
broker.brokerAfter = after;
}
else if (after->getType() == ltACCOUNT_ROOT && after->isFieldPresent(sfLoanBrokerID))
{
auto const& loanBrokerID = after->at(sfLoanBrokerID);
// create an entry if one doesn't already exist
brokers_.emplace(loanBrokerID, BrokerInfo{});
}
else if (after->getType() == ltRIPPLE_STATE)
{
lines_.emplace_back(after);
}
else if (after->getType() == ltMPTOKEN)
{
mpts_.emplace_back(after);
}
}
}
bool
ValidLoanBroker::goodZeroDirectory(
ReadView const& view,
SLE::const_ref dir,
beast::Journal const& j) const
{
auto const next = dir->at(~sfIndexNext);
auto const prev = dir->at(~sfIndexPrevious);
if ((prev && *prev) || (next && *next))
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker with zero "
"OwnerCount has multiple directory pages";
return false;
}
auto indexes = dir->getFieldV256(sfIndexes);
if (indexes.size() > 1)
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker with zero "
"OwnerCount has multiple indexes in the Directory root";
return false;
}
if (indexes.size() == 1)
{
auto const index = indexes.value().front();
auto const sle = view.read(keylet::unchecked(index));
if (!sle)
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker directory corrupt";
return false;
}
if (sle->getType() != ltRIPPLE_STATE && sle->getType() != ltMPTOKEN)
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker with zero "
"OwnerCount has an unexpected entry in the directory";
return false;
}
}
return true;
}
bool
ValidLoanBroker::finalize(
STTx const& tx,
TER const,
XRPAmount const,
ReadView const& view,
beast::Journal const& j)
{
// Loan Brokers will not exist on ledger if the Lending Protocol amendment
// is not enabled, so there's no need to check it.
for (auto const& line : lines_)
{
for (auto const& field : {&sfLowLimit, &sfHighLimit})
{
auto const account = view.read(keylet::account(line->at(*field).getIssuer()));
// This Invariant doesn't know about the rules for Trust Lines, so
// if the account is missing, don't treat it as an error. This
// loop is only concerned with finding Broker pseudo-accounts
if (account && account->isFieldPresent(sfLoanBrokerID))
{
auto const& loanBrokerID = account->at(sfLoanBrokerID);
// create an entry if one doesn't already exist
brokers_.emplace(loanBrokerID, BrokerInfo{});
}
}
}
for (auto const& mpt : mpts_)
{
auto const account = view.read(keylet::account(mpt->at(sfAccount)));
// This Invariant doesn't know about the rules for MPTokens, so
// if the account is missing, don't treat is as an error. This
// loop is only concerned with finding Broker pseudo-accounts
if (account && account->isFieldPresent(sfLoanBrokerID))
{
auto const& loanBrokerID = account->at(sfLoanBrokerID);
// create an entry if one doesn't already exist
brokers_.emplace(loanBrokerID, BrokerInfo{});
}
}
for (auto const& [brokerID, broker] : brokers_)
{
auto const& after =
broker.brokerAfter ? broker.brokerAfter : view.read(keylet::loanbroker(brokerID));
if (!after)
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker missing";
return false;
}
auto const& before = broker.brokerBefore;
// https://github.com/Tapanito/XRPL-Standards/blob/xls-66-lending-protocol/XLS-0066d-lending-protocol/README.md#3123-invariants
// If `LoanBroker.OwnerCount = 0` the `DirectoryNode` will have at most
// one node (the root), which will only hold entries for `RippleState`
// or `MPToken` objects.
if (after->at(sfOwnerCount) == 0)
{
auto const dir = view.read(keylet::ownerDir(after->at(sfAccount)));
if (dir)
{
if (!goodZeroDirectory(view, dir, j))
{
return false;
}
}
}
if (before && before->at(sfLoanSequence) > after->at(sfLoanSequence))
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker sequence number "
"decreased";
return false;
}
if (after->at(sfDebtTotal) < 0)
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker debt total is negative";
return false;
}
if (after->at(sfCoverAvailable) < 0)
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker cover available is negative";
return false;
}
auto const vault = view.read(keylet::vault(after->at(sfVaultID)));
if (!vault)
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker vault ID is invalid";
return false;
}
auto const& vaultAsset = vault->at(sfAsset);
if (after->at(sfCoverAvailable) < accountHolds(
view,
after->at(sfAccount),
vaultAsset,
FreezeHandling::fhIGNORE_FREEZE,
AuthHandling::ahIGNORE_AUTH,
j))
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker cover available "
"is less than pseudo-account asset balance";
return false;
}
}
return true;
}
//------------------------------------------------------------------------------
void
ValidLoan::visitEntry(
bool isDelete,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
if (after && after->getType() == ltLOAN)
{
loans_.emplace_back(before, after);
}
}
bool
ValidLoan::finalize(
STTx const& tx,
TER const,
XRPAmount const,
ReadView const& view,
beast::Journal const& j)
{
// Loans will not exist on ledger if the Lending Protocol amendment
// is not enabled, so there's no need to check it.
for (auto const& [before, after] : loans_)
{
// https://github.com/Tapanito/XRPL-Standards/blob/xls-66-lending-protocol/XLS-0066d-lending-protocol/README.md#3223-invariants
// If `Loan.PaymentRemaining = 0` then the loan MUST be fully paid off
if (after->at(sfPaymentRemaining) == 0 &&
(after->at(sfTotalValueOutstanding) != beast::zero ||
after->at(sfPrincipalOutstanding) != beast::zero ||
after->at(sfManagementFeeOutstanding) != beast::zero))
{
JLOG(j.fatal()) << "Invariant failed: Loan with zero payments "
"remaining has not been paid off";
return false;
}
// If `Loan.PaymentRemaining != 0` then the loan MUST NOT be fully paid
// off
if (after->at(sfPaymentRemaining) != 0 &&
after->at(sfTotalValueOutstanding) == beast::zero &&
after->at(sfPrincipalOutstanding) == beast::zero &&
after->at(sfManagementFeeOutstanding) == beast::zero)
{
JLOG(j.fatal()) << "Invariant failed: Loan with zero payments "
"remaining has not been paid off";
return false;
}
if (before && (before->isFlag(lsfLoanOverpayment) != after->isFlag(lsfLoanOverpayment)))
{
JLOG(j.fatal()) << "Invariant failed: Loan Overpayment flag changed";
return false;
}
// Must not be negative - STNumber
for (auto const field :
{&sfLoanServiceFee,
&sfLatePaymentFee,
&sfClosePaymentFee,
&sfPrincipalOutstanding,
&sfTotalValueOutstanding,
&sfManagementFeeOutstanding})
{
if (after->at(*field) < 0)
{
JLOG(j.fatal()) << "Invariant failed: " << field->getName() << " is negative ";
return false;
}
}
// Must be positive - STNumber
for (auto const field : {
&sfPeriodicPayment,
})
{
if (after->at(*field) <= 0)
{
JLOG(j.fatal()) << "Invariant failed: " << field->getName()
<< " is zero or negative ";
return false;
}
}
}
return true;
}
} // namespace xrpl

View File

@@ -0,0 +1,192 @@
#include <xrpl/tx/invariants/MPTInvariant.h>
//
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/MPTIssue.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/tx/invariants/InvariantCheckPrivilege.h>
namespace xrpl {
void
ValidMPTIssuance::visitEntry(
bool isDelete,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
if (after && after->getType() == ltMPTOKEN_ISSUANCE)
{
if (isDelete)
mptIssuancesDeleted_++;
else if (!before)
mptIssuancesCreated_++;
}
if (after && after->getType() == ltMPTOKEN)
{
if (isDelete)
mptokensDeleted_++;
else if (!before)
{
mptokensCreated_++;
MPTIssue const mptIssue{after->at(sfMPTokenIssuanceID)};
if (mptIssue.getIssuer() == after->at(sfAccount))
mptCreatedByIssuer_ = true;
}
}
}
bool
ValidMPTIssuance::finalize(
STTx const& tx,
TER const result,
XRPAmount const _fee,
ReadView const& view,
beast::Journal const& j)
{
if (result == tesSUCCESS)
{
auto const& rules = view.rules();
[[maybe_unused]]
bool enforceCreatedByIssuer =
rules.enabled(featureSingleAssetVault) || rules.enabled(featureLendingProtocol);
if (mptCreatedByIssuer_)
{
JLOG(j.fatal()) << "Invariant failed: MPToken created for the MPT issuer";
// The comment above starting with "assert(enforce)" explains this
// assert.
XRPL_ASSERT_PARTS(
enforceCreatedByIssuer, "xrpl::ValidMPTIssuance::finalize", "no issuer MPToken");
if (enforceCreatedByIssuer)
return false;
}
auto const txnType = tx.getTxnType();
if (hasPrivilege(tx, createMPTIssuance))
{
if (mptIssuancesCreated_ == 0)
{
JLOG(j.fatal()) << "Invariant failed: transaction "
"succeeded without creating a MPT issuance";
}
else if (mptIssuancesDeleted_ != 0)
{
JLOG(j.fatal()) << "Invariant failed: transaction "
"succeeded while removing MPT issuances";
}
else if (mptIssuancesCreated_ > 1)
{
JLOG(j.fatal()) << "Invariant failed: transaction "
"succeeded but created multiple issuances";
}
return mptIssuancesCreated_ == 1 && mptIssuancesDeleted_ == 0;
}
if (hasPrivilege(tx, destroyMPTIssuance))
{
if (mptIssuancesDeleted_ == 0)
{
JLOG(j.fatal()) << "Invariant failed: MPT issuance deletion "
"succeeded without removing a MPT issuance";
}
else if (mptIssuancesCreated_ > 0)
{
JLOG(j.fatal()) << "Invariant failed: MPT issuance deletion "
"succeeded while creating MPT issuances";
}
else if (mptIssuancesDeleted_ > 1)
{
JLOG(j.fatal()) << "Invariant failed: MPT issuance deletion "
"succeeded but deleted multiple issuances";
}
return mptIssuancesCreated_ == 0 && mptIssuancesDeleted_ == 1;
}
bool const lendingProtocolEnabled = view.rules().enabled(featureLendingProtocol);
// ttESCROW_FINISH may authorize an MPT, but it can't have the
// mayAuthorizeMPT privilege, because that may cause
// non-amendment-gated side effects.
bool const enforceEscrowFinish = (txnType == ttESCROW_FINISH) &&
(view.rules().enabled(featureSingleAssetVault) || lendingProtocolEnabled);
if (hasPrivilege(tx, mustAuthorizeMPT | mayAuthorizeMPT) || enforceEscrowFinish)
{
bool const submittedByIssuer = tx.isFieldPresent(sfHolder);
if (mptIssuancesCreated_ > 0)
{
JLOG(j.fatal()) << "Invariant failed: MPT authorize "
"succeeded but created MPT issuances";
return false;
}
else if (mptIssuancesDeleted_ > 0)
{
JLOG(j.fatal()) << "Invariant failed: MPT authorize "
"succeeded but deleted issuances";
return false;
}
else if (lendingProtocolEnabled && mptokensCreated_ + mptokensDeleted_ > 1)
{
JLOG(j.fatal()) << "Invariant failed: MPT authorize succeeded "
"but created/deleted bad number mptokens";
return false;
}
else if (submittedByIssuer && (mptokensCreated_ > 0 || mptokensDeleted_ > 0))
{
JLOG(j.fatal()) << "Invariant failed: MPT authorize submitted by issuer "
"succeeded but created/deleted mptokens";
return false;
}
else if (
!submittedByIssuer && hasPrivilege(tx, mustAuthorizeMPT) &&
(mptokensCreated_ + mptokensDeleted_ != 1))
{
// if the holder submitted this tx, then a mptoken must be
// either created or deleted.
JLOG(j.fatal()) << "Invariant failed: MPT authorize submitted by holder "
"succeeded but created/deleted bad number of mptokens";
return false;
}
return true;
}
if (txnType == ttESCROW_FINISH)
{
// ttESCROW_FINISH may authorize an MPT, but it can't have the
// mayAuthorizeMPT privilege, because that may cause
// non-amendment-gated side effects.
XRPL_ASSERT_PARTS(
!enforceEscrowFinish, "xrpl::ValidMPTIssuance::finalize", "not escrow finish tx");
return true;
}
if (hasPrivilege(tx, mayDeleteMPT) && mptokensDeleted_ == 1 && mptokensCreated_ == 0 &&
mptIssuancesCreated_ == 0 && mptIssuancesDeleted_ == 0)
return true;
}
if (mptIssuancesCreated_ != 0)
{
JLOG(j.fatal()) << "Invariant failed: a MPT issuance was created";
}
else if (mptIssuancesDeleted_ != 0)
{
JLOG(j.fatal()) << "Invariant failed: a MPT issuance was deleted";
}
else if (mptokensCreated_ != 0)
{
JLOG(j.fatal()) << "Invariant failed: a MPToken was created";
}
else if (mptokensDeleted_ != 0)
{
JLOG(j.fatal()) << "Invariant failed: a MPToken was deleted";
}
return mptIssuancesCreated_ == 0 && mptIssuancesDeleted_ == 0 && mptokensCreated_ == 0 &&
mptokensDeleted_ == 0;
}
} // namespace xrpl

View File

@@ -0,0 +1,274 @@
#include <xrpl/tx/invariants/NFTInvariant.h>
//
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/nftPageMask.h>
#include <xrpl/tx/invariants/InvariantCheckPrivilege.h>
#include <xrpl/tx/transactors/NFT/NFTokenUtils.h>
namespace xrpl {
void
ValidNFTokenPage::visitEntry(
bool isDelete,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
static constexpr uint256 const& pageBits = nft::pageMask;
static constexpr uint256 const accountBits = ~pageBits;
if ((before && before->getType() != ltNFTOKEN_PAGE) ||
(after && after->getType() != ltNFTOKEN_PAGE))
return;
auto check = [this, isDelete](std::shared_ptr<SLE const> const& sle) {
uint256 const account = sle->key() & accountBits;
uint256 const hiLimit = sle->key() & pageBits;
std::optional<uint256> const prev = (*sle)[~sfPreviousPageMin];
// Make sure that any page links...
// 1. Are properly associated with the owning account and
// 2. The page is correctly ordered between links.
if (prev)
{
if (account != (*prev & accountBits))
badLink_ = true;
if (hiLimit <= (*prev & pageBits))
badLink_ = true;
}
if (auto const next = (*sle)[~sfNextPageMin])
{
if (account != (*next & accountBits))
badLink_ = true;
if (hiLimit >= (*next & pageBits))
badLink_ = true;
}
{
auto const& nftokens = sle->getFieldArray(sfNFTokens);
// An NFTokenPage should never contain too many tokens or be empty.
if (std::size_t const nftokenCount = nftokens.size();
(!isDelete && nftokenCount == 0) || nftokenCount > dirMaxTokensPerPage)
invalidSize_ = true;
// If prev is valid, use it to establish a lower bound for
// page entries. If prev is not valid the lower bound is zero.
uint256 const loLimit = prev ? *prev & pageBits : uint256(beast::zero);
// Also verify that all NFTokenIDs in the page are sorted.
uint256 loCmp = loLimit;
for (auto const& obj : nftokens)
{
uint256 const tokenID = obj[sfNFTokenID];
if (!nft::compareTokens(loCmp, tokenID))
badSort_ = true;
loCmp = tokenID;
// None of the NFTs on this page should belong on lower or
// higher pages.
if (uint256 const tokenPageBits = tokenID & pageBits;
tokenPageBits < loLimit || tokenPageBits >= hiLimit)
badEntry_ = true;
if (auto uri = obj[~sfURI]; uri && uri->empty())
badURI_ = true;
}
}
};
if (before)
{
check(before);
// While an account's NFToken directory contains any NFTokens, the last
// NFTokenPage (with 96 bits of 1 in the low part of the index) should
// never be deleted.
if (isDelete && (before->key() & nft::pageMask) == nft::pageMask &&
before->isFieldPresent(sfPreviousPageMin))
{
deletedFinalPage_ = true;
}
}
if (after)
check(after);
if (!isDelete && before && after)
{
// If the NFTokenPage
// 1. Has a NextMinPage field in before, but loses it in after, and
// 2. This is not the last page in the directory
// Then we have identified a corruption in the links between the
// NFToken pages in the NFToken directory.
if ((before->key() & nft::pageMask) != nft::pageMask &&
before->isFieldPresent(sfNextPageMin) && !after->isFieldPresent(sfNextPageMin))
{
deletedLink_ = true;
}
}
}
bool
ValidNFTokenPage::finalize(
STTx const& tx,
TER const result,
XRPAmount const,
ReadView const& view,
beast::Journal const& j)
{
if (badLink_)
{
JLOG(j.fatal()) << "Invariant failed: NFT page is improperly linked.";
return false;
}
if (badEntry_)
{
JLOG(j.fatal()) << "Invariant failed: NFT found in incorrect page.";
return false;
}
if (badSort_)
{
JLOG(j.fatal()) << "Invariant failed: NFTs on page are not sorted.";
return false;
}
if (badURI_)
{
JLOG(j.fatal()) << "Invariant failed: NFT contains empty URI.";
return false;
}
if (invalidSize_)
{
JLOG(j.fatal()) << "Invariant failed: NFT page has invalid size.";
return false;
}
if (view.rules().enabled(fixNFTokenPageLinks))
{
if (deletedFinalPage_)
{
JLOG(j.fatal()) << "Invariant failed: Last NFT page deleted with "
"non-empty directory.";
return false;
}
if (deletedLink_)
{
JLOG(j.fatal()) << "Invariant failed: Lost NextMinPage link.";
return false;
}
}
return true;
}
//------------------------------------------------------------------------------
void
NFTokenCountTracking::visitEntry(
bool,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
if (before && before->getType() == ltACCOUNT_ROOT)
{
beforeMintedTotal += (*before)[~sfMintedNFTokens].value_or(0);
beforeBurnedTotal += (*before)[~sfBurnedNFTokens].value_or(0);
}
if (after && after->getType() == ltACCOUNT_ROOT)
{
afterMintedTotal += (*after)[~sfMintedNFTokens].value_or(0);
afterBurnedTotal += (*after)[~sfBurnedNFTokens].value_or(0);
}
}
bool
NFTokenCountTracking::finalize(
STTx const& tx,
TER const result,
XRPAmount const,
ReadView const& view,
beast::Journal const& j)
{
if (!hasPrivilege(tx, changeNFTCounts))
{
if (beforeMintedTotal != afterMintedTotal)
{
JLOG(j.fatal()) << "Invariant failed: the number of minted tokens "
"changed without a mint transaction!";
return false;
}
if (beforeBurnedTotal != afterBurnedTotal)
{
JLOG(j.fatal()) << "Invariant failed: the number of burned tokens "
"changed without a burn transaction!";
return false;
}
return true;
}
if (tx.getTxnType() == ttNFTOKEN_MINT)
{
if (result == tesSUCCESS && beforeMintedTotal >= afterMintedTotal)
{
JLOG(j.fatal()) << "Invariant failed: successful minting didn't increase "
"the number of minted tokens.";
return false;
}
if (result != tesSUCCESS && beforeMintedTotal != afterMintedTotal)
{
JLOG(j.fatal()) << "Invariant failed: failed minting changed the "
"number of minted tokens.";
return false;
}
if (beforeBurnedTotal != afterBurnedTotal)
{
JLOG(j.fatal()) << "Invariant failed: minting changed the number of "
"burned tokens.";
return false;
}
}
if (tx.getTxnType() == ttNFTOKEN_BURN)
{
if (result == tesSUCCESS)
{
if (beforeBurnedTotal >= afterBurnedTotal)
{
JLOG(j.fatal()) << "Invariant failed: successful burning didn't increase "
"the number of burned tokens.";
return false;
}
}
if (result != tesSUCCESS && beforeBurnedTotal != afterBurnedTotal)
{
JLOG(j.fatal()) << "Invariant failed: failed burning changed the "
"number of burned tokens.";
return false;
}
if (beforeMintedTotal != afterMintedTotal)
{
JLOG(j.fatal()) << "Invariant failed: burning changed the number of "
"minted tokens.";
return false;
}
}
return true;
}
} // namespace xrpl

View File

@@ -0,0 +1,93 @@
#include <xrpl/tx/invariants/PermissionedDEXInvariant.h>
//
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/LedgerFormats.h>
#include <xrpl/protocol/STArray.h>
#include <xrpl/protocol/TxFormats.h>
namespace xrpl {
void
ValidPermissionedDEX::visitEntry(
bool,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
if (after && after->getType() == ltDIR_NODE)
{
if (after->isFieldPresent(sfDomainID))
domains_.insert(after->getFieldH256(sfDomainID));
}
if (after && after->getType() == ltOFFER)
{
if (after->isFieldPresent(sfDomainID))
domains_.insert(after->getFieldH256(sfDomainID));
else
regularOffers_ = true;
// if a hybrid offer is missing domain or additional book, there's
// something wrong
if (after->isFlag(lsfHybrid) &&
(!after->isFieldPresent(sfDomainID) || !after->isFieldPresent(sfAdditionalBooks) ||
after->getFieldArray(sfAdditionalBooks).size() > 1))
badHybrids_ = true;
}
}
bool
ValidPermissionedDEX::finalize(
STTx const& tx,
TER const result,
XRPAmount const,
ReadView const& view,
beast::Journal const& j)
{
auto const txType = tx.getTxnType();
if ((txType != ttPAYMENT && txType != ttOFFER_CREATE) || result != tesSUCCESS)
return true;
// For each offercreate transaction, check if
// permissioned offers are valid
if (txType == ttOFFER_CREATE && badHybrids_)
{
JLOG(j.fatal()) << "Invariant failed: hybrid offer is malformed";
return false;
}
if (!tx.isFieldPresent(sfDomainID))
return true;
auto const domain = tx.getFieldH256(sfDomainID);
if (!view.exists(keylet::permissionedDomain(domain)))
{
JLOG(j.fatal()) << "Invariant failed: domain doesn't exist";
return false;
}
// for both payment and offercreate, there shouldn't be another domain
// that's different from the domain specified
for (auto const& d : domains_)
{
if (d != domain)
{
JLOG(j.fatal()) << "Invariant failed: transaction"
" consumed wrong domains";
return false;
}
}
if (regularOffers_)
{
JLOG(j.fatal()) << "Invariant failed: domain transaction"
" affected regular offers";
return false;
}
return true;
}
} // namespace xrpl

View File

@@ -0,0 +1,162 @@
#include <xrpl/tx/invariants/PermissionedDomainInvariant.h>
//
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/ledger/CredentialHelpers.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/STArray.h>
#include <xrpl/protocol/TxFormats.h>
namespace xrpl {
void
ValidPermissionedDomain::visitEntry(
bool isDel,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
if (before && before->getType() != ltPERMISSIONED_DOMAIN)
return;
if (after && after->getType() != ltPERMISSIONED_DOMAIN)
return;
auto check = [isDel](std::vector<SleStatus>& sleStatus, std::shared_ptr<SLE const> const& sle) {
auto const& credentials = sle->getFieldArray(sfAcceptedCredentials);
auto const sorted = credentials::makeSorted(credentials);
SleStatus ss{credentials.size(), false, !sorted.empty(), isDel};
// If array have duplicates then all the other checks are invalid
if (ss.isUnique_)
{
unsigned i = 0;
for (auto const& cred : sorted)
{
auto const& credTx = credentials[i++];
ss.isSorted_ =
(cred.first == credTx[sfIssuer]) && (cred.second == credTx[sfCredentialType]);
if (!ss.isSorted_)
break;
}
}
sleStatus.emplace_back(std::move(ss));
};
if (after)
check(sleStatus_, after);
}
bool
ValidPermissionedDomain::finalize(
STTx const& tx,
TER const result,
XRPAmount const,
ReadView const& view,
beast::Journal const& j)
{
auto check = [](SleStatus const& sleStatus, beast::Journal const& j) {
if (!sleStatus.credentialsSize_)
{
JLOG(j.fatal()) << "Invariant failed: permissioned domain with "
"no rules.";
return false;
}
if (sleStatus.credentialsSize_ > maxPermissionedDomainCredentialsArraySize)
{
JLOG(j.fatal()) << "Invariant failed: permissioned domain bad "
"credentials size "
<< sleStatus.credentialsSize_;
return false;
}
if (!sleStatus.isUnique_)
{
JLOG(j.fatal()) << "Invariant failed: permissioned domain credentials "
"aren't unique";
return false;
}
if (!sleStatus.isSorted_)
{
JLOG(j.fatal()) << "Invariant failed: permissioned domain credentials "
"aren't sorted";
return false;
}
return true;
};
if (view.rules().enabled(fixPermissionedDomainInvariant))
{
// No permissioned domains should be affected if the transaction failed
if (result != tesSUCCESS)
// If nothing changed, all is good. If there were changes, that's
// bad.
return sleStatus_.empty();
if (sleStatus_.size() > 1)
{
JLOG(j.fatal()) << "Invariant failed: transaction affected more "
"than 1 permissioned domain entry.";
return false;
}
switch (tx.getTxnType())
{
case ttPERMISSIONED_DOMAIN_SET: {
if (sleStatus_.empty())
{
JLOG(j.fatal()) << "Invariant failed: no domain objects affected by "
"PermissionedDomainSet";
return false;
}
auto const& sleStatus = sleStatus_[0];
if (sleStatus.isDelete_)
{
JLOG(j.fatal()) << "Invariant failed: domain object "
"deleted by PermissionedDomainSet";
return false;
}
return check(sleStatus, j);
}
case ttPERMISSIONED_DOMAIN_DELETE: {
if (sleStatus_.empty())
{
JLOG(j.fatal()) << "Invariant failed: no domain objects affected by "
"PermissionedDomainDelete";
return false;
}
if (!sleStatus_[0].isDelete_)
{
JLOG(j.fatal()) << "Invariant failed: domain object "
"modified, but not deleted by "
"PermissionedDomainDelete";
return false;
}
return true;
}
default: {
if (!sleStatus_.empty())
{
JLOG(j.fatal()) << "Invariant failed: " << sleStatus_.size()
<< " domain object(s) affected by an "
"unauthorized transaction. "
<< tx.getTxnType();
return false;
}
return true;
}
}
}
else
{
if (tx.getTxnType() != ttPERMISSIONED_DOMAIN_SET || result != tesSUCCESS ||
sleStatus_.empty())
return true;
return check(sleStatus_[0], j);
}
}
} // namespace xrpl

View File

@@ -0,0 +1,926 @@
#include <xrpl/tx/invariants/VaultInvariant.h>
//
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/LedgerFormats.h>
#include <xrpl/protocol/Protocol.h>
#include <xrpl/protocol/SField.h>
#include <xrpl/protocol/STNumber.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/tx/invariants/InvariantCheckPrivilege.h>
namespace xrpl {
ValidVault::Vault
ValidVault::Vault::make(SLE const& from)
{
XRPL_ASSERT(from.getType() == ltVAULT, "ValidVault::Vault::make : from Vault object");
ValidVault::Vault self;
self.key = from.key();
self.asset = from.at(sfAsset);
self.pseudoId = from.getAccountID(sfAccount);
self.owner = from.at(sfOwner);
self.shareMPTID = from.getFieldH192(sfShareMPTID);
self.assetsTotal = from.at(sfAssetsTotal);
self.assetsAvailable = from.at(sfAssetsAvailable);
self.assetsMaximum = from.at(sfAssetsMaximum);
self.lossUnrealized = from.at(sfLossUnrealized);
return self;
}
ValidVault::Shares
ValidVault::Shares::make(SLE const& from)
{
XRPL_ASSERT(
from.getType() == ltMPTOKEN_ISSUANCE,
"ValidVault::Shares::make : from MPTokenIssuance object");
ValidVault::Shares self;
self.share = MPTIssue(makeMptID(from.getFieldU32(sfSequence), from.getAccountID(sfIssuer)));
self.sharesTotal = from.at(sfOutstandingAmount);
self.sharesMaximum = from[~sfMaximumAmount].value_or(maxMPTokenAmount);
return self;
}
void
ValidVault::visitEntry(
bool isDelete,
std::shared_ptr<SLE const> const& before,
std::shared_ptr<SLE const> const& after)
{
// If `before` is empty, this means an object is being created, in which
// case `isDelete` must be false. Otherwise `before` and `after` are set and
// `isDelete` indicates whether an object is being deleted or modified.
XRPL_ASSERT(
after != nullptr && (before != nullptr || !isDelete),
"xrpl::ValidVault::visitEntry : some object is available");
// Number balanceDelta will capture the difference (delta) between "before"
// state (zero if created) and "after" state (zero if destroyed), so the
// invariants can validate that the change in account balances matches the
// change in vault balances, stored to deltas_ at the end of this function.
Number balanceDelta{};
std::int8_t sign = 0;
if (before)
{
switch (before->getType())
{
case ltVAULT:
beforeVault_.push_back(Vault::make(*before));
break;
case ltMPTOKEN_ISSUANCE:
// At this moment we have no way of telling if this object holds
// vault shares or something else. Save it for finalize.
beforeMPTs_.push_back(Shares::make(*before));
balanceDelta = static_cast<std::int64_t>(before->getFieldU64(sfOutstandingAmount));
sign = 1;
break;
case ltMPTOKEN:
balanceDelta = static_cast<std::int64_t>(before->getFieldU64(sfMPTAmount));
sign = -1;
break;
case ltACCOUNT_ROOT:
case ltRIPPLE_STATE:
balanceDelta = before->getFieldAmount(sfBalance);
sign = -1;
break;
default:;
}
}
if (!isDelete && after)
{
switch (after->getType())
{
case ltVAULT:
afterVault_.push_back(Vault::make(*after));
break;
case ltMPTOKEN_ISSUANCE:
// At this moment we have no way of telling if this object holds
// vault shares or something else. Save it for finalize.
afterMPTs_.push_back(Shares::make(*after));
balanceDelta -=
Number(static_cast<std::int64_t>(after->getFieldU64(sfOutstandingAmount)));
sign = 1;
break;
case ltMPTOKEN:
balanceDelta -= Number(static_cast<std::int64_t>(after->getFieldU64(sfMPTAmount)));
sign = -1;
break;
case ltACCOUNT_ROOT:
case ltRIPPLE_STATE:
balanceDelta -= Number(after->getFieldAmount(sfBalance));
sign = -1;
break;
default:;
}
}
uint256 const key = (before ? before->key() : after->key());
// Append to deltas if sign is non-zero, i.e. an object of an interesting
// type has been updated. A transaction may update an object even when
// its balance has not changed, e.g. transaction fee equals the amount
// transferred to the account. We intentionally do not compare balanceDelta
// against zero, to avoid missing such updates.
if (sign != 0)
deltas_[key] = balanceDelta * sign;
}
bool
ValidVault::finalize(
STTx const& tx,
TER const ret,
XRPAmount const fee,
ReadView const& view,
beast::Journal const& j)
{
bool const enforce = view.rules().enabled(featureSingleAssetVault);
if (!isTesSuccess(ret))
return true; // Do not perform checks
if (afterVault_.empty() && beforeVault_.empty())
{
if (hasPrivilege(tx, mustModifyVault))
{
JLOG(j.fatal()) << //
"Invariant failed: vault operation succeeded without modifying "
"a vault";
XRPL_ASSERT(enforce, "xrpl::ValidVault::finalize : vault noop invariant");
return !enforce;
}
return true; // Not a vault operation
}
else if (!(hasPrivilege(tx, mustModifyVault) || hasPrivilege(tx, mayModifyVault)))
{
JLOG(j.fatal()) << //
"Invariant failed: vault updated by a wrong transaction type";
XRPL_ASSERT(
enforce,
"xrpl::ValidVault::finalize : illegal vault transaction "
"invariant");
return !enforce; // Also not a vault operation
}
if (beforeVault_.size() > 1 || afterVault_.size() > 1)
{
JLOG(j.fatal()) << //
"Invariant failed: vault operation updated more than single vault";
XRPL_ASSERT(enforce, "xrpl::ValidVault::finalize : single vault invariant");
return !enforce; // That's all we can do here
}
auto const txnType = tx.getTxnType();
// We do special handling for ttVAULT_DELETE first, because it's the only
// vault-modifying transaction without an "after" state of the vault
if (afterVault_.empty())
{
if (txnType != ttVAULT_DELETE)
{
JLOG(j.fatal()) << //
"Invariant failed: vault deleted by a wrong transaction type";
XRPL_ASSERT(
enforce,
"xrpl::ValidVault::finalize : illegal vault deletion "
"invariant");
return !enforce; // That's all we can do here
}
// Note, if afterVault_ is empty then we know that beforeVault_ is not
// empty, as enforced at the top of this function
auto const& beforeVault = beforeVault_[0];
// At this moment we only know a vault is being deleted and there
// might be some MPTokenIssuance objects which are deleted in the
// same transaction. Find the one matching this vault.
auto const deletedShares = [&]() -> std::optional<Shares> {
for (auto const& e : beforeMPTs_)
{
if (e.share.getMptID() == beforeVault.shareMPTID)
return std::move(e);
}
return std::nullopt;
}();
if (!deletedShares)
{
JLOG(j.fatal()) << "Invariant failed: deleted vault must also "
"delete shares";
XRPL_ASSERT(enforce, "xrpl::ValidVault::finalize : shares deletion invariant");
return !enforce; // That's all we can do here
}
bool result = true;
if (deletedShares->sharesTotal != 0)
{
JLOG(j.fatal()) << "Invariant failed: deleted vault must have no "
"shares outstanding";
result = false;
}
if (beforeVault.assetsTotal != zero)
{
JLOG(j.fatal()) << "Invariant failed: deleted vault must have no "
"assets outstanding";
result = false;
}
if (beforeVault.assetsAvailable != zero)
{
JLOG(j.fatal()) << "Invariant failed: deleted vault must have no "
"assets available";
result = false;
}
return result;
}
else if (txnType == ttVAULT_DELETE)
{
JLOG(j.fatal()) << "Invariant failed: vault deletion succeeded without "
"deleting a vault";
XRPL_ASSERT(enforce, "xrpl::ValidVault::finalize : vault deletion invariant");
return !enforce; // That's all we can do here
}
// Note, `afterVault_.empty()` is handled above
auto const& afterVault = afterVault_[0];
XRPL_ASSERT(
beforeVault_.empty() || beforeVault_[0].key == afterVault.key,
"xrpl::ValidVault::finalize : single vault operation");
auto const updatedShares = [&]() -> std::optional<Shares> {
// At this moment we only know that a vault is being updated and there
// might be some MPTokenIssuance objects which are also updated in the
// same transaction. Find the one matching the shares to this vault.
// Note, we expect updatedMPTs collection to be extremely small. For
// such collections linear search is faster than lookup.
for (auto const& e : afterMPTs_)
{
if (e.share.getMptID() == afterVault.shareMPTID)
return e;
}
auto const sleShares = view.read(keylet::mptIssuance(afterVault.shareMPTID));
return sleShares ? std::optional<Shares>(Shares::make(*sleShares)) : std::nullopt;
}();
bool result = true;
// Universal transaction checks
if (!beforeVault_.empty())
{
auto const& beforeVault = beforeVault_[0];
if (afterVault.asset != beforeVault.asset || afterVault.pseudoId != beforeVault.pseudoId ||
afterVault.shareMPTID != beforeVault.shareMPTID)
{
JLOG(j.fatal()) << "Invariant failed: violation of vault immutable data";
result = false;
}
}
if (!updatedShares)
{
JLOG(j.fatal()) << "Invariant failed: updated vault must have shares";
XRPL_ASSERT(enforce, "xrpl::ValidVault::finalize : vault has shares invariant");
return !enforce; // That's all we can do here
}
if (updatedShares->sharesTotal == 0)
{
if (afterVault.assetsTotal != zero)
{
JLOG(j.fatal()) << "Invariant failed: updated zero sized "
"vault must have no assets outstanding";
result = false;
}
if (afterVault.assetsAvailable != zero)
{
JLOG(j.fatal()) << "Invariant failed: updated zero sized "
"vault must have no assets available";
result = false;
}
}
else if (updatedShares->sharesTotal > updatedShares->sharesMaximum)
{
JLOG(j.fatal()) //
<< "Invariant failed: updated shares must not exceed maximum "
<< updatedShares->sharesMaximum;
result = false;
}
if (afterVault.assetsAvailable < zero)
{
JLOG(j.fatal()) << "Invariant failed: assets available must be positive";
result = false;
}
if (afterVault.assetsAvailable > afterVault.assetsTotal)
{
JLOG(j.fatal()) << "Invariant failed: assets available must "
"not be greater than assets outstanding";
result = false;
}
else if (afterVault.lossUnrealized > afterVault.assetsTotal - afterVault.assetsAvailable)
{
JLOG(j.fatal()) //
<< "Invariant failed: loss unrealized must not exceed "
"the difference between assets outstanding and available";
result = false;
}
if (afterVault.assetsTotal < zero)
{
JLOG(j.fatal()) << "Invariant failed: assets outstanding must be positive";
result = false;
}
if (afterVault.assetsMaximum < zero)
{
JLOG(j.fatal()) << "Invariant failed: assets maximum must be positive";
result = false;
}
// Thanks to this check we can simply do `assert(!beforeVault_.empty()` when
// enforcing invariants on transaction types other than ttVAULT_CREATE
if (beforeVault_.empty() && txnType != ttVAULT_CREATE)
{
JLOG(j.fatal()) << //
"Invariant failed: vault created by a wrong transaction type";
XRPL_ASSERT(enforce, "xrpl::ValidVault::finalize : vault creation invariant");
return !enforce; // That's all we can do here
}
if (!beforeVault_.empty() && afterVault.lossUnrealized != beforeVault_[0].lossUnrealized &&
txnType != ttLOAN_MANAGE && txnType != ttLOAN_PAY)
{
JLOG(j.fatal()) << //
"Invariant failed: vault transaction must not change loss "
"unrealized";
result = false;
}
auto const beforeShares = [&]() -> std::optional<Shares> {
if (beforeVault_.empty())
return std::nullopt;
auto const& beforeVault = beforeVault_[0];
for (auto const& e : beforeMPTs_)
{
if (e.share.getMptID() == beforeVault.shareMPTID)
return std::move(e);
}
return std::nullopt;
}();
if (!beforeShares &&
(tx.getTxnType() == ttVAULT_DEPOSIT || //
tx.getTxnType() == ttVAULT_WITHDRAW || //
tx.getTxnType() == ttVAULT_CLAWBACK))
{
JLOG(j.fatal()) << "Invariant failed: vault operation succeeded "
"without updating shares";
XRPL_ASSERT(enforce, "xrpl::ValidVault::finalize : shares noop invariant");
return !enforce; // That's all we can do here
}
auto const& vaultAsset = afterVault.asset;
auto const deltaAssets = [&](AccountID const& id) -> std::optional<Number> {
auto const get = //
[&](auto const& it, std::int8_t sign = 1) -> std::optional<Number> {
if (it == deltas_.end())
return std::nullopt;
return it->second * sign;
};
return std::visit(
[&]<typename TIss>(TIss const& issue) {
if constexpr (std::is_same_v<TIss, Issue>)
{
if (isXRP(issue))
return get(deltas_.find(keylet::account(id).key));
return get(
deltas_.find(keylet::line(id, issue).key), id > issue.getIssuer() ? -1 : 1);
}
else if constexpr (std::is_same_v<TIss, MPTIssue>)
{
return get(deltas_.find(keylet::mptoken(issue.getMptID(), id).key));
}
},
vaultAsset.value());
};
auto const deltaAssetsTxAccount = [&]() -> std::optional<Number> {
auto ret = deltaAssets(tx[sfAccount]);
// Nothing returned or not XRP transaction
if (!ret.has_value() || !vaultAsset.native())
return ret;
// Delegated transaction; no need to compensate for fees
if (auto const delegate = tx[~sfDelegate];
delegate.has_value() && *delegate != tx[sfAccount])
return ret;
*ret += fee.drops();
if (*ret == zero)
return std::nullopt;
return ret;
};
auto const deltaShares = [&](AccountID const& id) -> std::optional<Number> {
auto const it = [&]() {
if (id == afterVault.pseudoId)
return deltas_.find(keylet::mptIssuance(afterVault.shareMPTID).key);
return deltas_.find(keylet::mptoken(afterVault.shareMPTID, id).key);
}();
return it != deltas_.end() ? std::optional<Number>(it->second) : std::nullopt;
};
auto const vaultHoldsNoAssets = [&](Vault const& vault) {
return vault.assetsAvailable == 0 && vault.assetsTotal == 0;
};
// Technically this does not need to be a lambda, but it's more
// convenient thanks to early "return false"; the not-so-nice
// alternatives are several layers of nested if/else or more complex
// (i.e. brittle) if statements.
result &= [&]() {
switch (txnType)
{
case ttVAULT_CREATE: {
bool result = true;
if (!beforeVault_.empty())
{
JLOG(j.fatal()) //
<< "Invariant failed: create operation must not have "
"updated a vault";
result = false;
}
if (afterVault.assetsAvailable != zero || afterVault.assetsTotal != zero ||
afterVault.lossUnrealized != zero || updatedShares->sharesTotal != 0)
{
JLOG(j.fatal()) //
<< "Invariant failed: created vault must be empty";
result = false;
}
if (afterVault.pseudoId != updatedShares->share.getIssuer())
{
JLOG(j.fatal()) //
<< "Invariant failed: shares issuer and vault "
"pseudo-account must be the same";
result = false;
}
auto const sleSharesIssuer =
view.read(keylet::account(updatedShares->share.getIssuer()));
if (!sleSharesIssuer)
{
JLOG(j.fatal()) //
<< "Invariant failed: shares issuer must exist";
return false;
}
if (!isPseudoAccount(sleSharesIssuer))
{
JLOG(j.fatal()) //
<< "Invariant failed: shares issuer must be a "
"pseudo-account";
result = false;
}
if (auto const vaultId = (*sleSharesIssuer)[~sfVaultID];
!vaultId || *vaultId != afterVault.key)
{
JLOG(j.fatal()) //
<< "Invariant failed: shares issuer pseudo-account "
"must point back to the vault";
result = false;
}
return result;
}
case ttVAULT_SET: {
bool result = true;
XRPL_ASSERT(
!beforeVault_.empty(), "xrpl::ValidVault::finalize : set updated a vault");
auto const& beforeVault = beforeVault_[0];
auto const vaultDeltaAssets = deltaAssets(afterVault.pseudoId);
if (vaultDeltaAssets)
{
JLOG(j.fatal()) << //
"Invariant failed: set must not change vault balance";
result = false;
}
if (beforeVault.assetsTotal != afterVault.assetsTotal)
{
JLOG(j.fatal()) << //
"Invariant failed: set must not change assets "
"outstanding";
result = false;
}
if (afterVault.assetsMaximum > zero &&
afterVault.assetsTotal > afterVault.assetsMaximum)
{
JLOG(j.fatal()) << //
"Invariant failed: set assets outstanding must not "
"exceed assets maximum";
result = false;
}
if (beforeVault.assetsAvailable != afterVault.assetsAvailable)
{
JLOG(j.fatal()) << //
"Invariant failed: set must not change assets "
"available";
result = false;
}
if (beforeShares && updatedShares &&
beforeShares->sharesTotal != updatedShares->sharesTotal)
{
JLOG(j.fatal()) << //
"Invariant failed: set must not change shares "
"outstanding";
result = false;
}
return result;
}
case ttVAULT_DEPOSIT: {
bool result = true;
XRPL_ASSERT(
!beforeVault_.empty(), "xrpl::ValidVault::finalize : deposit updated a vault");
auto const& beforeVault = beforeVault_[0];
auto const vaultDeltaAssets = deltaAssets(afterVault.pseudoId);
if (!vaultDeltaAssets)
{
JLOG(j.fatal()) << //
"Invariant failed: deposit must change vault balance";
return false; // That's all we can do
}
if (*vaultDeltaAssets > tx[sfAmount])
{
JLOG(j.fatal()) << //
"Invariant failed: deposit must not change vault "
"balance by more than deposited amount";
result = false;
}
if (*vaultDeltaAssets <= zero)
{
JLOG(j.fatal()) << //
"Invariant failed: deposit must increase vault balance";
result = false;
}
// Any payments (including deposits) made by the issuer
// do not change their balance, but create funds instead.
bool const issuerDeposit = [&]() -> bool {
if (vaultAsset.native())
return false;
return tx[sfAccount] == vaultAsset.getIssuer();
}();
if (!issuerDeposit)
{
auto const accountDeltaAssets = deltaAssetsTxAccount();
if (!accountDeltaAssets)
{
JLOG(j.fatal()) << //
"Invariant failed: deposit must change depositor "
"balance";
return false;
}
if (*accountDeltaAssets >= zero)
{
JLOG(j.fatal()) << //
"Invariant failed: deposit must decrease depositor "
"balance";
result = false;
}
if (*accountDeltaAssets * -1 != *vaultDeltaAssets)
{
JLOG(j.fatal()) << //
"Invariant failed: deposit must change vault and "
"depositor balance by equal amount";
result = false;
}
}
if (afterVault.assetsMaximum > zero &&
afterVault.assetsTotal > afterVault.assetsMaximum)
{
JLOG(j.fatal()) << //
"Invariant failed: deposit assets outstanding must not "
"exceed assets maximum";
result = false;
}
auto const accountDeltaShares = deltaShares(tx[sfAccount]);
if (!accountDeltaShares)
{
JLOG(j.fatal()) << //
"Invariant failed: deposit must change depositor "
"shares";
return false; // That's all we can do
}
if (*accountDeltaShares <= zero)
{
JLOG(j.fatal()) << //
"Invariant failed: deposit must increase depositor "
"shares";
result = false;
}
auto const vaultDeltaShares = deltaShares(afterVault.pseudoId);
if (!vaultDeltaShares || *vaultDeltaShares == zero)
{
JLOG(j.fatal()) << //
"Invariant failed: deposit must change vault shares";
return false; // That's all we can do
}
if (*vaultDeltaShares * -1 != *accountDeltaShares)
{
JLOG(j.fatal()) << //
"Invariant failed: deposit must change depositor and "
"vault shares by equal amount";
result = false;
}
if (beforeVault.assetsTotal + *vaultDeltaAssets != afterVault.assetsTotal)
{
JLOG(j.fatal()) << "Invariant failed: deposit and assets "
"outstanding must add up";
result = false;
}
if (beforeVault.assetsAvailable + *vaultDeltaAssets != afterVault.assetsAvailable)
{
JLOG(j.fatal()) << "Invariant failed: deposit and assets "
"available must add up";
result = false;
}
return result;
}
case ttVAULT_WITHDRAW: {
bool result = true;
XRPL_ASSERT(
!beforeVault_.empty(),
"xrpl::ValidVault::finalize : withdrawal updated a "
"vault");
auto const& beforeVault = beforeVault_[0];
auto const vaultDeltaAssets = deltaAssets(afterVault.pseudoId);
if (!vaultDeltaAssets)
{
JLOG(j.fatal()) << "Invariant failed: withdrawal must "
"change vault balance";
return false; // That's all we can do
}
if (*vaultDeltaAssets >= zero)
{
JLOG(j.fatal()) << "Invariant failed: withdrawal must "
"decrease vault balance";
result = false;
}
// Any payments (including withdrawal) going to the issuer
// do not change their balance, but destroy funds instead.
bool const issuerWithdrawal = [&]() -> bool {
if (vaultAsset.native())
return false;
auto const destination = tx[~sfDestination].value_or(tx[sfAccount]);
return destination == vaultAsset.getIssuer();
}();
if (!issuerWithdrawal)
{
auto const accountDeltaAssets = deltaAssetsTxAccount();
auto const otherAccountDelta = [&]() -> std::optional<Number> {
if (auto const destination = tx[~sfDestination];
destination && *destination != tx[sfAccount])
return deltaAssets(*destination);
return std::nullopt;
}();
if (accountDeltaAssets.has_value() == otherAccountDelta.has_value())
{
JLOG(j.fatal()) << //
"Invariant failed: withdrawal must change one "
"destination balance";
return false;
}
auto const destinationDelta = //
accountDeltaAssets ? *accountDeltaAssets : *otherAccountDelta;
if (destinationDelta <= zero)
{
JLOG(j.fatal()) << //
"Invariant failed: withdrawal must increase "
"destination balance";
result = false;
}
if (*vaultDeltaAssets * -1 != destinationDelta)
{
JLOG(j.fatal()) << //
"Invariant failed: withdrawal must change vault "
"and destination balance by equal amount";
result = false;
}
}
auto const accountDeltaShares = deltaShares(tx[sfAccount]);
if (!accountDeltaShares)
{
JLOG(j.fatal()) << //
"Invariant failed: withdrawal must change depositor "
"shares";
return false;
}
if (*accountDeltaShares >= zero)
{
JLOG(j.fatal()) << //
"Invariant failed: withdrawal must decrease depositor "
"shares";
result = false;
}
auto const vaultDeltaShares = deltaShares(afterVault.pseudoId);
if (!vaultDeltaShares || *vaultDeltaShares == zero)
{
JLOG(j.fatal()) << //
"Invariant failed: withdrawal must change vault shares";
return false; // That's all we can do
}
if (*vaultDeltaShares * -1 != *accountDeltaShares)
{
JLOG(j.fatal()) << //
"Invariant failed: withdrawal must change depositor "
"and vault shares by equal amount";
result = false;
}
// Note, vaultBalance is negative (see check above)
if (beforeVault.assetsTotal + *vaultDeltaAssets != afterVault.assetsTotal)
{
JLOG(j.fatal()) << "Invariant failed: withdrawal and "
"assets outstanding must add up";
result = false;
}
if (beforeVault.assetsAvailable + *vaultDeltaAssets != afterVault.assetsAvailable)
{
JLOG(j.fatal()) << "Invariant failed: withdrawal and "
"assets available must add up";
result = false;
}
return result;
}
case ttVAULT_CLAWBACK: {
bool result = true;
XRPL_ASSERT(
!beforeVault_.empty(), "xrpl::ValidVault::finalize : clawback updated a vault");
auto const& beforeVault = beforeVault_[0];
if (vaultAsset.native() || vaultAsset.getIssuer() != tx[sfAccount])
{
// The owner can use clawback to force-burn shares when the
// vault is empty but there are outstanding shares
if (!(beforeShares && beforeShares->sharesTotal > 0 &&
vaultHoldsNoAssets(beforeVault) && beforeVault.owner == tx[sfAccount]))
{
JLOG(j.fatal()) << //
"Invariant failed: clawback may only be performed "
"by the asset issuer, or by the vault owner of an "
"empty vault";
return false; // That's all we can do
}
}
auto const vaultDeltaAssets = deltaAssets(afterVault.pseudoId);
if (vaultDeltaAssets)
{
if (*vaultDeltaAssets >= zero)
{
JLOG(j.fatal()) << //
"Invariant failed: clawback must decrease vault "
"balance";
result = false;
}
if (beforeVault.assetsTotal + *vaultDeltaAssets != afterVault.assetsTotal)
{
JLOG(j.fatal()) << //
"Invariant failed: clawback and assets outstanding "
"must add up";
result = false;
}
if (beforeVault.assetsAvailable + *vaultDeltaAssets !=
afterVault.assetsAvailable)
{
JLOG(j.fatal()) << //
"Invariant failed: clawback and assets available "
"must add up";
result = false;
}
}
else if (!vaultHoldsNoAssets(beforeVault))
{
JLOG(j.fatal()) << //
"Invariant failed: clawback must change vault balance";
return false; // That's all we can do
}
auto const accountDeltaShares = deltaShares(tx[sfHolder]);
if (!accountDeltaShares)
{
JLOG(j.fatal()) << //
"Invariant failed: clawback must change holder shares";
return false; // That's all we can do
}
if (*accountDeltaShares >= zero)
{
JLOG(j.fatal()) << //
"Invariant failed: clawback must decrease holder "
"shares";
result = false;
}
auto const vaultDeltaShares = deltaShares(afterVault.pseudoId);
if (!vaultDeltaShares || *vaultDeltaShares == zero)
{
JLOG(j.fatal()) << //
"Invariant failed: clawback must change vault shares";
return false; // That's all we can do
}
if (*vaultDeltaShares * -1 != *accountDeltaShares)
{
JLOG(j.fatal()) << //
"Invariant failed: clawback must change holder and "
"vault shares by equal amount";
result = false;
}
return result;
}
case ttLOAN_SET:
case ttLOAN_MANAGE:
case ttLOAN_PAY: {
// TBD
return true;
}
default:
// LCOV_EXCL_START
UNREACHABLE("xrpl::ValidVault::finalize : unknown transaction type");
return false;
// LCOV_EXCL_STOP
}
}();
if (!result)
{
// The comment at the top of this file starting with "assert(enforce)"
// explains this assert.
XRPL_ASSERT(enforce, "xrpl::ValidVault::finalize : vault invariants");
return !enforce;
}
return true;
}
} // namespace xrpl

View File

@@ -1,10 +1,9 @@
#include <xrpl/tx/transactors/PermissionedDomain/PermissionedDomainSet.h>
//
#include <xrpl/ledger/CredentialHelpers.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/STObject.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/tx/transactors/PermissionedDomain/PermissionedDomainSet.h>
#include <optional>
namespace xrpl {

View File

@@ -8,7 +8,6 @@
#include <xrpld/rpc/detail/Tuning.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/protocol/ApiVersion.h>
@@ -132,6 +131,7 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -155,11 +155,11 @@ public:
Json::Value result;
gate g;
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;
@@ -240,27 +240,28 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
Json::Value result;
gate g;
// Test RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur + 1);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));
@@ -268,22 +269,22 @@ public:
// Test RPC::Tuning::max_auto_src_cur source currencies.
for (auto i = 0; i < (RPC::Tuning::max_auto_src_cur - 1); ++i)
env.trust(Account("alice")[std::to_string(i + 100)](100), "bob");
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_auto_src_cur source currencies.
env.trust(Account("alice")["AUD"](100), "bob");
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));

View File

@@ -1,537 +0,0 @@
#include <test/jtx.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/JobQueueAwaiter.h>
#include <chrono>
#include <mutex>
namespace xrpl {
namespace test {
/**
* Test suite for the C++20 coroutine primitives: CoroTask, CoroTaskRunner,
* and JobQueueAwaiter.
*
* Dependency Diagram
* ==================
*
* CoroTask_test
* +-------------------------------------------------+
* | + gate (inner class) : condition_variable helper |
* +-------------------------------------------------+
* | uses
* v
* jtx::Env --> JobQueue::postCoroTask()
* |
* +-- CoroTaskRunner (suspend / post / resume)
* +-- CoroTask<void> / CoroTask<T>
* +-- JobQueueAwaiter
*
* Test Coverage Matrix
* ====================
*
* Test | Primitives exercised
* --------------------------+----------------------------------------------
* testVoidCompletion | CoroTask<void> basic lifecycle
* testCorrectOrder | suspend() -> join() -> post() -> complete
* testIncorrectOrder | post() before suspend() (race-safe path)
* testJobQueueAwaiter | JobQueueAwaiter suspend + auto-repost
* testThreadSpecificStorage | LocalValue isolation across coroutines
* testExceptionPropagation | unhandled_exception() in promise_type
* testMultipleYields | N sequential suspend/resume cycles
* testValueReturn | CoroTask<T> co_return value
* testValueException | CoroTask<T> exception via co_await
* testValueChaining | nested CoroTask<T> -> CoroTask<T>
* testShutdownRejection | postCoroTask returns nullptr when stopping
*/
class CoroTask_test : public beast::unit_test::suite
{
public:
/**
* Simple one-shot gate for synchronizing between test thread
* and coroutine worker threads. signal() sets the flag;
* wait_for() blocks until signaled or timeout.
*/
class gate
{
private:
std::condition_variable cv_;
std::mutex mutex_;
bool signaled_ = false;
public:
/**
* Block until signaled or timeout expires.
*
* @param rel_time Maximum duration to wait
*
* @return true if signaled before timeout
*/
template <class Rep, class Period>
bool
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
{
std::unique_lock<std::mutex> lk(mutex_);
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
signaled_ = false;
return b;
}
/**
* Signal the gate, waking any waiting thread.
*/
void
signal()
{
std::lock_guard lk(mutex_);
signaled_ = true;
cv_.notify_all();
}
};
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
/**
* CoroTask<void> runs to completion and runner becomes non-runnable.
*/
void
testVoidCompletion()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("void completion");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(!runner->runnable());
}
/**
* Correct order: suspend, join, post, complete.
* Mirrors existing Coroutine_test::correct_order.
*/
void
testCorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("correct order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g1, g2;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[rp = &r, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
*rp = runner;
g1p->signal();
co_await runner->suspend();
g2p->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g1.wait_for(5s));
runner->join();
runner->post();
BEAST_EXPECT(g2.wait_for(5s));
runner->join();
}
/**
* Incorrect order: post() before suspend(). Verifies the
* race-safe path. Mirrors Coroutine_test::incorrect_order.
*/
void
testIncorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("incorrect order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto runner) -> CoroTask<void> {
runner->post();
co_await runner->suspend();
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
/**
* JobQueueAwaiter suspend + auto-repost across multiple yield points.
*/
void
testJobQueueAwaiter()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("JobQueueAwaiter");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int step = 0;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [sp = &step, gp = &g](auto runner) -> CoroTask<void> {
*sp = 1;
co_await JobQueueAwaiter{runner};
*sp = 2;
co_await JobQueueAwaiter{runner};
*sp = 3;
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(step == 3);
}
/**
* Per-coroutine LocalValue isolation. Each coroutine sees its own
* copy of thread-local state. Mirrors Coroutine_test::thread_specific_storage.
*/
void
testThreadSpecificStorage()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("thread specific storage");
Env env(*this);
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
gate g;
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -1);
*lv = -2;
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
for (int i = 0; i < N; ++i)
{
jq.postCoroTask(
jtCLIENT,
"CoroTaskTest",
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
(*ap)[id] = runner;
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(**lvp == -1);
**lvp = id;
this->BEAST_EXPECT(**lvp == id);
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(**lvp == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}
for (auto const& r : a)
{
r->post();
BEAST_EXPECT(g.wait_for(5s));
r->join();
}
for (auto const& r : a)
{
r->post();
r->join();
}
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
}
/**
* Exception thrown in coroutine body is caught by
* promise_type::unhandled_exception(). Coroutine completes.
*/
void
testExceptionPropagation()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("exception propagation");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
gp->signal();
throw std::runtime_error("test exception");
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
// The exception is caught by promise_type::unhandled_exception()
// and the coroutine is considered done
BEAST_EXPECT(!runner->runnable());
}
/**
* Multiple sequential suspend/resume cycles via co_await.
*/
void
testMultipleYields()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("multiple yields");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int counter = 0;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[rp = &r, cp = &counter, gp = &g](auto runner) -> CoroTask<void> {
*rp = runner;
++(*cp);
gp->signal();
co_await runner->suspend();
++(*cp);
gp->signal();
co_await runner->suspend();
++(*cp);
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 1);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 2);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 3);
runner->join();
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> returns a value via co_return. Outer coroutine
* extracts it with co_await.
*/
void
testValueReturn()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value return");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> { co_return 42; };
*rp = co_await inner();
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 42);
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> propagates exceptions from inner coroutines.
* Outer coroutine catches via try/catch around co_await.
*/
void
testValueException()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value exception");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
bool caught = false;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [cp = &caught, gp = &g](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> {
throw std::runtime_error("inner error");
co_return 0;
};
try
{
co_await inner();
}
catch (std::runtime_error const& e)
{
*cp = true;
}
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(caught);
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> chaining. Nested value-returning coroutines
* compose via co_await.
*/
void
testValueChaining()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value chaining");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
auto mul = [add](int a, int b) -> CoroTask<int> {
int sum = co_await add(a, b);
co_return sum * 2;
};
*rp = co_await mul(3, 4);
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 14); // (3 + 4) * 2
BEAST_EXPECT(!runner->runnable());
}
/**
* postCoroTask returns nullptr when JobQueue is stopping.
*/
void
testShutdownRejection()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("shutdown rejection");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
// Stop the JobQueue
env.app().getJobQueue().stop();
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [](auto) -> CoroTask<void> { co_return; });
BEAST_EXPECT(!runner);
}
void
run() override
{
testVoidCompletion();
testCorrectOrder();
testIncorrectOrder();
testJobQueueAwaiter();
testThreadSpecificStorage();
testExceptionPropagation();
testMultipleYields();
testValueReturn();
testValueException();
testValueChaining();
testShutdownRejection();
}
};
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
} // namespace test
} // namespace xrpl

View File

@@ -40,11 +40,6 @@ public:
}
};
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
void
correct_order()
{
@@ -59,15 +54,13 @@ public:
}));
gate g1, g2;
std::shared_ptr<JobQueue::CoroTaskRunner> c;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTest", [cp = &c, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
*cp = runner;
g1p->signal();
co_await runner->suspend();
g2p->signal();
co_return;
});
std::shared_ptr<JobQueue::Coro> c;
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& cr) {
c = cr;
g1.signal();
c->yield();
g2.signal();
});
BEAST_EXPECT(g1.wait_for(5s));
c->join();
c->post();
@@ -88,17 +81,11 @@ public:
}));
gate g;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTest", [gp = &g](auto runner) -> CoroTask<void> {
// Schedule a resume before suspending. The posted job
// cannot actually call resume() until the current resume()
// releases CoroTaskRunner::mutex_, which only happens after
// the coroutine suspends at co_await.
runner->post();
co_await runner->suspend();
gp->signal();
co_return;
});
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& c) {
c->post();
c->yield();
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
}
@@ -114,7 +101,7 @@ public:
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
std::array<std::shared_ptr<JobQueue::Coro>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
@@ -131,23 +118,19 @@ public:
for (int i = 0; i < N; ++i)
{
jq.postCoroTask(
jtCLIENT,
"CoroTest",
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
(*ap)[id] = runner;
gp->signal();
co_await runner->suspend();
jq.postCoro(jtCLIENT, "CoroTest", [&, id = i](auto const& c) {
a[id] = c;
g.signal();
c->yield();
this->BEAST_EXPECT(**lvp == -1);
**lvp = id;
this->BEAST_EXPECT(**lvp == id);
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == -1);
*lv = id;
this->BEAST_EXPECT(*lv == id);
g.signal();
c->yield();
this->BEAST_EXPECT(**lvp == id);
co_return;
});
this->BEAST_EXPECT(*lv == id);
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}

View File

@@ -43,91 +43,87 @@ class JobQueue_test : public beast::unit_test::suite
}
}
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
void
testPostCoroTask()
testPostCoro()
{
jtx::Env env{*this};
JobQueue& jQueue = env.app().getJobQueue();
{
// Test repeated post()s until the coroutine completes.
// Test repeated post()s until the Coro completes.
std::atomic<int> yieldCount{0};
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest1", [ycp = &yieldCount](auto runner) -> CoroTask<void> {
while (++(*ycp) < 4)
co_await runner->suspend();
co_return;
auto const coro = jQueue.postCoro(
jtCLIENT,
"PostCoroTest1",
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
while (++yieldCount < 4)
coroCopy->yield();
});
BEAST_EXPECT(runner != nullptr);
BEAST_EXPECT(coro != nullptr);
// Wait for the Job to run and yield.
while (yieldCount == 0)
;
// Now re-post until the CoroTaskRunner says it is done.
// Now re-post until the Coro says it is done.
int old = yieldCount;
while (runner->runnable())
while (coro->runnable())
{
BEAST_EXPECT(runner->post());
BEAST_EXPECT(coro->post());
while (old == yieldCount)
{
}
runner->join();
coro->join();
BEAST_EXPECT(++old == yieldCount);
}
BEAST_EXPECT(yieldCount == 4);
}
{
// Test repeated resume()s until the coroutine completes.
// Test repeated resume()s until the Coro completes.
int yieldCount{0};
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest2", [ycp = &yieldCount](auto runner) -> CoroTask<void> {
while (++(*ycp) < 4)
co_await runner->suspend();
co_return;
auto const coro = jQueue.postCoro(
jtCLIENT,
"PostCoroTest2",
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
while (++yieldCount < 4)
coroCopy->yield();
});
if (!runner)
if (!coro)
{
// There's no good reason we should not get a runner, but we
// There's no good reason we should not get a Coro, but we
// can't continue without one.
BEAST_EXPECT(false);
return;
}
// Wait for the Job to run and yield.
runner->join();
coro->join();
// Now resume until the CoroTaskRunner says it is done.
// Now resume until the Coro says it is done.
int old = yieldCount;
while (runner->runnable())
while (coro->runnable())
{
runner->resume(); // Resume runs synchronously on this thread.
coro->resume(); // Resume runs synchronously on this thread.
BEAST_EXPECT(++old == yieldCount);
}
BEAST_EXPECT(yieldCount == 4);
}
{
// If the JobQueue is stopped, we should no
// longer be able to post a coroutine (and calling postCoroTask()
// should return nullptr).
// longer be able to add a Coro (and calling postCoro() should
// return false).
using namespace std::chrono_literals;
jQueue.stop();
// The coroutine should never run, so having it access this
// The Coro should never run, so having the Coro access this
// unprotected variable on the stack should be completely safe.
// Not recommended for the faint of heart...
bool unprotected;
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest3", [up = &unprotected](auto) -> CoroTask<void> {
*up = false;
co_return;
auto const coro = jQueue.postCoro(
jtCLIENT, "PostCoroTest3", [&unprotected](std::shared_ptr<JobQueue::Coro> const&) {
unprotected = false;
});
BEAST_EXPECT(runner == nullptr);
BEAST_EXPECT(coro == nullptr);
}
}
@@ -136,7 +132,7 @@ public:
run() override
{
testAddJob();
testPostCoroTask();
testPostCoro();
}
};

View File

@@ -6,7 +6,6 @@
#include <xrpld/rpc/RPCHandler.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/protocol/ApiVersion.h>
#include <xrpl/protocol/STParsedJSON.h>
#include <xrpl/resource/Fees.h>
@@ -194,6 +193,7 @@ AMMTest::find_paths_request(
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -215,11 +215,11 @@ AMMTest::find_paths_request(
Json::Value result;
gate g;
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;

View File

@@ -1425,6 +1425,7 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
c,
Role::ADMIN,
{},
{},
RPC::apiMaximumSupportedVersion},
jvCommand};

View File

@@ -3,7 +3,6 @@
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/resource/Fees.h>
namespace xrpl {
@@ -100,14 +99,13 @@ GRPCServerImpl::CallData<Request, Response>::process()
// ensures that finished is always true when this CallData object
// is returned as a tag in handleRpcs(), after sending the response
finished_ = true;
auto runner = app_.getJobQueue().postCoroTask(
JobType::jtRPC, "gRPC-Client", [thisShared](auto) -> CoroTask<void> {
thisShared->processRequest();
co_return;
auto coro = app_.getJobQueue().postCoro(
JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
thisShared->process(coro);
});
// If runner is null, then the JobQueue has already been shutdown
if (!runner)
// If coro is null, then the JobQueue has already been shutdown
if (!coro)
{
grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
responder_.FinishWithError(status, this);
@@ -116,7 +114,7 @@ GRPCServerImpl::CallData<Request, Response>::process()
template <class Request, class Response>
void
GRPCServerImpl::CallData<Request, Response>::processRequest()
GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::Coro> coro)
{
try
{
@@ -158,6 +156,7 @@ GRPCServerImpl::CallData<Request, Response>::processRequest()
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
request_};

View File

@@ -206,12 +206,9 @@ private:
clone() override;
private:
/**
* Process the gRPC request. Called inside the CoroTask lambda
* posted to the JobQueue by process().
*/
// process the request. Called inside the coroutine passed to JobQueue
void
processRequest();
process(std::shared_ptr<JobQueue::Coro> coro);
// return load type of this RPC
Resource::Charge

View File

@@ -3,6 +3,7 @@
#include <xrpld/rpc/Role.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/server/InfoSub.h>
namespace xrpl {
@@ -23,6 +24,7 @@ struct Context
LedgerMaster& ledgerMaster;
Resource::Consumer& consumer;
Role role;
std::shared_ptr<JobQueue::Coro> coro{};
InfoSub::pointer infoSub{};
unsigned int apiVersion;
};

View File

@@ -169,10 +169,13 @@ public:
private:
Json::Value
processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv);
processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv);
void
processSession(std::shared_ptr<Session> const&);
processSession(std::shared_ptr<Session> const&, std::shared_ptr<JobQueue::Coro> coro);
void
processRequest(
@@ -180,6 +183,7 @@ private:
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&&,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user);

View File

@@ -14,7 +14,6 @@
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/beast/rfc2616.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/json/to_string.h>
@@ -285,10 +284,9 @@ ServerHandler::onRequest(Session& session)
}
std::shared_ptr<Session> detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](auto) -> CoroTask<void> {
processSession(detachedSession);
co_return;
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
});
if (postResult == nullptr)
{
@@ -324,18 +322,17 @@ ServerHandler::onWSMessage(
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
auto const postResult = m_jobQueue.postCoroTask(
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_WEBSOCKET,
"WS-Client",
[this, session, jv = std::move(jv)](auto) -> CoroTask<void> {
auto const jr = this->processSession(session, jv);
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
auto const jr = this->processSession(session, coro, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
co_return;
});
if (postResult == nullptr)
{
@@ -376,7 +373,10 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ
}
Json::Value
ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv)
ServerHandler::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv)
{
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
if (is->getConsumer().disconnect(m_journal))
@@ -443,6 +443,7 @@ ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::V
app_.getLedgerMaster(),
is->getConsumer(),
role,
coro,
is,
apiVersion},
jv,
@@ -513,14 +514,18 @@ ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::V
return jr;
}
// Run as a coroutine.
void
ServerHandler::processSession(std::shared_ptr<Session> const& session)
ServerHandler::processSession(
std::shared_ptr<Session> const& session,
std::shared_ptr<JobQueue::Coro> coro)
{
processRequest(
session->port(),
buffers_to_string(session->request().body().data()),
session->remoteAddress().at_port(0),
makeOutput(*session),
coro,
forwardedFor(session->request()),
[&] {
auto const iter = session->request().find("X-User");
@@ -557,6 +562,7 @@ ServerHandler::processRequest(
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&& output,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user)
{
@@ -813,6 +819,7 @@ ServerHandler::processRequest(
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
params,

View File

@@ -7,9 +7,6 @@
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/resource/Fees.h>
#include <condition_variable>
#include <mutex>
namespace xrpl {
// This interface is deprecated.
@@ -40,31 +37,98 @@ doRipplePathFind(RPC::JsonContext& context)
PathRequest::pointer request;
lpLedger = context.ledgerMaster.getClosedLedger();
// makeLegacyPathRequest enqueues a path-finding job that runs
// asynchronously. We block this thread with a condition_variable
// until the path-finding continuation signals completion.
// If makeLegacyPathRequest cannot schedule the job (e.g. during
// shutdown), it returns an empty request and we skip the wait.
std::mutex mtx;
std::condition_variable cv;
bool pathDone = false;
// It doesn't look like there's much odd happening here, but you should
// be aware this code runs in a JobQueue::Coro, which is a coroutine.
// And we may be flipping around between threads. Here's an overview:
//
// 1. We're running doRipplePathFind() due to a call to
// ripple_path_find. doRipplePathFind() is currently running
// inside of a JobQueue::Coro using a JobQueue thread.
//
// 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the
// path-finding request. That request will (probably) run at some
// indeterminate future time on a (probably different) JobQueue
// thread.
//
// 3. As a continuation from that path-finding JobQueue thread, the
// coroutine we're currently running in (!) is posted to the
// JobQueue. Because it is a continuation, that post won't
// happen until the path-finding request completes.
//
// 4. Once the continuation is enqueued, and we have reason to think
// the path-finding job is likely to run, then the coroutine we're
// running in yield()s. That means it surrenders its thread in
// the JobQueue. The coroutine is suspended, but ready to run,
// because it is kept resident by a shared_ptr in the
// path-finding continuation.
//
// 5. If all goes well then path-finding runs on a JobQueue thread
// and executes its continuation. The continuation posts this
// same coroutine (!) to the JobQueue.
//
// 6. When the JobQueue calls this coroutine, this coroutine resumes
// from the line below the coro->yield() and returns the
// path-finding result.
//
// With so many moving parts, what could go wrong?
//
// Just in terms of the JobQueue refusing to add jobs at shutdown
// there are two specific things that can go wrong.
//
// 1. The path-finding Job queued by makeLegacyPathRequest() might be
// rejected (because we're shutting down).
//
// Fortunately this problem can be addressed by looking at the
// return value of makeLegacyPathRequest(). If
// makeLegacyPathRequest() cannot get a thread to run the path-find
// on, then it returns an empty request.
//
// 2. The path-finding job might run, but the Coro::post() might be
// rejected by the JobQueue (because we're shutting down).
//
// We handle this case by resuming (not posting) the Coro.
// By resuming the Coro, we allow the Coro to run to completion
// on the current thread instead of requiring that it run on a
// new thread from the JobQueue.
//
// Both of these failure modes are hard to recreate in a unit test
// because they are so dependent on inter-thread timing. However
// the failure modes can be observed by synchronously (inside the
// rippled source code) shutting down the application. The code to
// do so looks like this:
//
// context.app.signalStop();
// while (! context.app.getJobQueue().jobCounter().joined()) { }
//
// The first line starts the process of shutting down the app.
// The second line waits until no more jobs can be added to the
// JobQueue before letting the thread continue.
//
// May 2017
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
request,
[&]() {
[&context]() {
// Copying the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the
// captured reference could evaporate when we return from
// coroCopy->resume(). This is not strictly necessary, but
// will make maintenance easier.
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
if (!coroCopy->post())
{
std::lock_guard lk(mtx);
pathDone = true;
// The post() failed, so we won't get a thread to let
// the Coro finish. We'll call Coro::resume() so the
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
}
cv.notify_one();
},
context.consumer,
lpLedger,
context.params);
if (request)
{
std::unique_lock lk(mtx);
cv.wait(lk, [&] { return pathDone; });
context.coro->yield();
jvResult = request->doStatus(context.params);
}