Compare commits

..

9 Commits

Author SHA1 Message Date
Pratik Mankawde
6b4738f26a feat: Migrate coroutine tests from Boost.Coroutine to C++20 coroutines
Migrate Coroutine_test and JobQueue_test from Boost.Coroutine to
C++20 std::coroutine using CoroTask/CoroTaskRunner:

- Coroutine_test: Replace Coro-based coroutine tests with CoroTask
  equivalents using co_await runner->yieldAndPost().
- JobQueue_test: Replace Coro suspend/resume patterns with CoroTask
  equivalents, use pointer-by-value captures in coroutine lambdas
  to avoid dangling reference issues.
2026-03-11 11:38:12 +00:00
Pratik Mankawde
3df5b06026 feat: Migrate production entry points from Boost.Coroutine to C++20 coroutines
Migrate all production coroutine entry points from Boost.Coroutine
to C++20 std::coroutine using the CoroTask/CoroTaskRunner primitives:

- RipplePathFind: Replace Coro suspend/resume with co_await pattern,
  add cv timeout for graceful shutdown.
- ServerHandler: Replace Coro-based processRequest with CoroTask,
  simplify coroutine lifecycle management.
- GRPCServer: Replace Coro with CoroTask for streaming RPC handlers.
- Remove Coro usage from Context.h aggregate initialization.
- Add exception handling in coroutine bodies to prevent unhandled
  exceptions from escaping the coroutine frame.
2026-03-11 11:38:06 +00:00
Pratik Mankawde
b950ad4b0f feat: Add C++20 coroutine primitives: CoroTask, CoroTaskRunner, JobQueueAwaiter
Add C++20 std::coroutine based task primitives for the JobQueue:

- CoroTask<T>: A coroutine return type with RAII ownership semantics
  and symmetric transfer for efficient resumption.
- CoroTaskRunner: Manages coroutine lifecycle on the JobQueue with
  suspend/resume tracking, LocalValue preservation, and graceful
  shutdown support.
- JobQueueAwaiter: External awaiter combining yield+post atomically.
- yieldAndPost(): Inline awaiter workaround for GCC-12 codegen bug
  where external awaiters at multiple co_await points corrupt the
  coroutine state machine resume index.
- CoroTask_test: Comprehensive test suite covering task lifecycle,
  suspend/resume, shutdown, and value-returning coroutines.
- BoostToStdCoroutineSwitchPlan.md: Migration plan documentation.
2026-03-11 11:38:00 +00:00
Bart
3baf5454f2 ci: Only upload artifacts in the XRPLF/rippled repository (#6523)
This change will only attempt to upload artifacts for CI runs performed in the XRPLF/rippled repository.
2026-03-11 11:48:40 +01:00
Michael Legleux
24a5cbaa93 chore: Build voidstar on amd64 only (#6481)
* chore: Build voidstar on amd64 only

* fatal error if configuring voidstar on  non x86
2026-03-10 23:59:43 +00:00
Michael Legleux
eb7c8c6c7a chore: Use CMake components for install (#6485)
* chore: Use components for install

* rm CMake export targets

* reformat
2026-03-10 23:38:43 +00:00
Alex Kremer
f27d8f3890 chore: Enable clang-tidy bugprone-inc-dec-in-conditions check (#6455) 2026-03-10 20:12:15 +00:00
Alex Kremer
8345cd77df chore: Enable clang-tidy bugprone-unused-raii check (#6505) 2026-03-10 19:48:56 +00:00
Alex Kremer
c38aabdaee chore: Enable clang-tidy bugprone-unhandled-self-assignment check (#6504) 2026-03-10 17:42:49 +00:00
69 changed files with 6746 additions and 2955 deletions

View File

@@ -14,6 +14,7 @@ Checks: "-*,
bugprone-fold-init-type,
bugprone-forward-declaration-namespace,
bugprone-inaccurate-erase,
bugprone-inc-dec-in-conditions,
bugprone-incorrect-enable-if,
bugprone-incorrect-roundings,
bugprone-infinite-loop,
@@ -64,8 +65,10 @@ Checks: "-*,
bugprone-undefined-memory-manipulation,
bugprone-undelegated-constructor,
bugprone-unhandled-exception-at-new,
bugprone-unhandled-self-assignment,
bugprone-unique-ptr-array-mismatch,
bugprone-unsafe-functions,
bugprone-unused-raii,
bugprone-unused-local-non-trivial-variable,
bugprone-virtual-near-miss,
cppcoreguidelines-no-suspend-with-lock,
@@ -95,12 +98,10 @@ Checks: "-*,
# checks that have some issues that need to be resolved:
#
# bugprone-crtp-constructor-accessibility,
# bugprone-inc-dec-in-conditions,
# bugprone-move-forwarding-reference,
# bugprone-switch-missing-default-case,
# bugprone-unused-return-value,
# bugprone-use-after-move,
# bugprone-unhandled-self-assignment,
# bugprone-unused-raii,
#
# cppcoreguidelines-misleading-capture-default-by-value,

View File

@@ -55,7 +55,7 @@ def generate_strategy_matrix(all: bool, config: Config) -> list:
# fee to 500.
# - Bookworm using GCC 15: Debug on linux/amd64, enable code
# coverage (which will be done below).
# - Bookworm using Clang 16: Debug on linux/arm64, enable voidstar.
# - Bookworm using Clang 16: Debug on linux/amd64, enable voidstar.
# - Bookworm using Clang 17: Release on linux/amd64, set the
# reference fee to 1000.
# - Bookworm using Clang 20: Debug on linux/amd64.
@@ -78,7 +78,7 @@ def generate_strategy_matrix(all: bool, config: Config) -> list:
if (
f"{os['compiler_name']}-{os['compiler_version']}" == "clang-16"
and build_type == "Debug"
and architecture["platform"] == "linux/arm64"
and architecture["platform"] == "linux/amd64"
):
cmake_args = f"-Dvoidstar=ON {cmake_args}"
skip = False

View File

@@ -141,9 +141,8 @@ jobs:
needs:
- should-run
- build-test
# Only run when committing to a PR that targets a release branch in the
# XRPLF repository.
if: ${{ github.repository_owner == 'XRPLF' && needs.should-run.outputs.go == 'true' && startsWith(github.ref, 'refs/heads/release') }}
# Only run when committing to a PR that targets a release branch.
if: ${{ github.repository == 'XRPLF/rippled' && needs.should-run.outputs.go == 'true' && startsWith(github.ref, 'refs/heads/release') }}
uses: ./.github/workflows/reusable-upload-recipe.yml
secrets:
remote_username: ${{ secrets.CONAN_REMOTE_USERNAME }}

View File

@@ -17,8 +17,7 @@ defaults:
jobs:
upload-recipe:
# Only run when a tag is pushed to the XRPLF repository.
if: ${{ github.repository_owner == 'XRPLF' }}
if: ${{ github.repository == 'XRPLF/rippled' }}
uses: ./.github/workflows/reusable-upload-recipe.yml
secrets:
remote_username: ${{ secrets.CONAN_REMOTE_USERNAME }}

View File

@@ -92,8 +92,8 @@ jobs:
upload-recipe:
needs: build-test
# Only run when pushing to the develop branch in the XRPLF repository.
if: ${{ github.repository_owner == 'XRPLF' && github.event_name == 'push' && github.ref == 'refs/heads/develop' }}
# Only run when pushing to the develop branch.
if: ${{ github.repository == 'XRPLF/rippled' && github.event_name == 'push' && github.ref == 'refs/heads/develop' }}
uses: ./.github/workflows/reusable-upload-recipe.yml
secrets:
remote_username: ${{ secrets.CONAN_REMOTE_USERNAME }}

View File

@@ -176,7 +176,7 @@ jobs:
fi
- name: Upload the binary (Linux)
if: ${{ github.repository_owner == 'XRPLF' && runner.os == 'Linux' }}
if: ${{ github.repository == 'XRPLF/rippled' && runner.os == 'Linux' }}
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: xrpld-${{ inputs.config_name }}
@@ -266,7 +266,7 @@ jobs:
--target coverage
- name: Upload coverage report
if: ${{ github.repository_owner == 'XRPLF' && !inputs.build_only && env.COVERAGE_ENABLED == 'true' }}
if: ${{ github.repository == 'XRPLF/rippled' && !inputs.build_only && env.COVERAGE_ENABLED == 'true' }}
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
with:
disable_search: true

View File

@@ -103,11 +103,11 @@ jobs:
sanitizers: ${{ matrix.sanitizers }}
- name: Log into Conan remote
if: ${{ github.repository_owner == 'XRPLF' && (github.event_name == 'push' || github.event_name == 'workflow_dispatch') }}
if: ${{ github.repository == 'XRPLF/rippled' && (github.event_name == 'push' || github.event_name == 'workflow_dispatch') }}
run: conan remote login "${CONAN_REMOTE_NAME}" "${{ secrets.CONAN_REMOTE_USERNAME }}" --password "${{ secrets.CONAN_REMOTE_PASSWORD }}"
- name: Upload Conan packages
if: ${{ github.repository_owner == 'XRPLF' && (github.event_name == 'push' || github.event_name == 'workflow_dispatch') }}
if: ${{ github.repository == 'XRPLF/rippled' && (github.event_name == 'push' || github.event_name == 'workflow_dispatch') }}
env:
FORCE_OPTION: ${{ github.event.inputs.force_upload == 'true' && '--force' || '' }}
run: conan upload "*" --remote="${CONAN_REMOTE_NAME}" --confirm ${FORCE_OPTION}

File diff suppressed because it is too large Load Diff

View File

@@ -131,7 +131,6 @@ if(coverage)
include(XrplCov)
endif()
set(PROJECT_EXPORT_SET XrplExports)
include(XrplCore)
include(XrplInstall)
include(XrplValidatorKeys)

View File

@@ -1,60 +0,0 @@
include(CMakeFindDependencyMacro)
# need to represent system dependencies of the lib here
#[=========================================================[
Boost
#]=========================================================]
if(static OR APPLE OR MSVC)
set(Boost_USE_STATIC_LIBS ON)
endif()
set(Boost_USE_MULTITHREADED ON)
if(static OR MSVC)
set(Boost_USE_STATIC_RUNTIME ON)
else()
set(Boost_USE_STATIC_RUNTIME OFF)
endif()
find_dependency(
Boost
COMPONENTS
chrono
container
context
coroutine
date_time
filesystem
program_options
regex
system
thread
)
#[=========================================================[
OpenSSL
#]=========================================================]
if(NOT DEFINED OPENSSL_ROOT_DIR)
if(DEFINED ENV{OPENSSL_ROOT})
set(OPENSSL_ROOT_DIR $ENV{OPENSSL_ROOT})
elseif(APPLE)
find_program(homebrew brew)
if(homebrew)
execute_process(
COMMAND ${homebrew} --prefix openssl
OUTPUT_VARIABLE OPENSSL_ROOT_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
endif()
endif()
file(TO_CMAKE_PATH "${OPENSSL_ROOT_DIR}" OPENSSL_ROOT_DIR)
endif()
if(static OR APPLE OR MSVC)
set(OPENSSL_USE_STATIC_LIBS ON)
endif()
set(OPENSSL_MSVC_STATIC_RT ON)
find_dependency(OpenSSL REQUIRED)
find_dependency(ZLIB)
find_dependency(date)
if(TARGET ZLIB::ZLIB)
set_target_properties(
OpenSSL::Crypto
PROPERTIES INTERFACE_LINK_LIBRARIES ZLIB::ZLIB
)
endif()

View File

@@ -2,100 +2,38 @@
install stuff
#]===================================================================]
include(create_symbolic_link)
include(GNUInstallDirs)
# If no suffix is defined for executables (e.g. Windows uses .exe but Linux
# and macOS use none), then explicitly set it to the empty string.
if(NOT DEFINED suffix)
set(suffix "")
if(is_root_project AND TARGET xrpld)
install(
TARGETS xrpld
RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}" COMPONENT runtime
)
install(
FILES "${CMAKE_CURRENT_SOURCE_DIR}/cfg/xrpld-example.cfg"
DESTINATION "${CMAKE_INSTALL_SYSCONFDIR}/xrpld"
RENAME xrpld.cfg
COMPONENT runtime
)
install(
FILES "${CMAKE_CURRENT_SOURCE_DIR}/cfg/validators-example.txt"
DESTINATION "${CMAKE_INSTALL_SYSCONFDIR}/xrpld"
RENAME validators.txt
COMPONENT runtime
)
endif()
install(
TARGETS
common
opts
xrpl_boost
xrpl_libs
xrpl_syslibs
xrpl.imports.main
xrpl.libpb
xrpl.libxrpl
xrpl.libxrpl.basics
xrpl.libxrpl.beast
xrpl.libxrpl.conditions
xrpl.libxrpl.core
xrpl.libxrpl.crypto
xrpl.libxrpl.git
xrpl.libxrpl.json
xrpl.libxrpl.rdb
xrpl.libxrpl.ledger
xrpl.libxrpl.net
xrpl.libxrpl.nodestore
xrpl.libxrpl.protocol
xrpl.libxrpl.resource
xrpl.libxrpl.server
xrpl.libxrpl.shamap
xrpl.libxrpl.tx
antithesis-sdk-cpp
EXPORT XrplExports
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
INCLUDES DESTINATION include
TARGETS xrpl.libpb xrpl.libxrpl
LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}" COMPONENT development
ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}" COMPONENT development
RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}" COMPONENT development
)
install(
DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/include/xrpl"
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}"
)
install(
EXPORT XrplExports
FILE XrplTargets.cmake
NAMESPACE Xrpl::
DESTINATION lib/cmake/xrpl
)
include(CMakePackageConfigHelpers)
write_basic_package_version_file(
XrplConfigVersion.cmake
VERSION ${xrpld_version}
COMPATIBILITY SameMajorVersion
)
if(is_root_project AND TARGET xrpld)
install(TARGETS xrpld RUNTIME DESTINATION bin)
set_target_properties(xrpld PROPERTIES INSTALL_RPATH_USE_LINK_PATH ON)
# sample configs should not overwrite existing files
# install if-not-exists workaround as suggested by
# https://cmake.org/Bug/view.php?id=12646
install(
CODE
"
macro (copy_if_not_exists SRC DEST NEWNAME)
if (NOT EXISTS \"\$ENV{DESTDIR}\${CMAKE_INSTALL_PREFIX}/\${DEST}/\${NEWNAME}\")
file (INSTALL FILE_PERMISSIONS OWNER_READ OWNER_WRITE DESTINATION \"\${CMAKE_INSTALL_PREFIX}/\${DEST}\" FILES \"\${SRC}\" RENAME \"\${NEWNAME}\")
else ()
message (\"-- Skipping : \$ENV{DESTDIR}\${CMAKE_INSTALL_PREFIX}/\${DEST}/\${NEWNAME}\")
endif ()
endmacro()
copy_if_not_exists(\"${CMAKE_CURRENT_SOURCE_DIR}/cfg/xrpld-example.cfg\" etc xrpld.cfg)
copy_if_not_exists(\"${CMAKE_CURRENT_SOURCE_DIR}/cfg/validators-example.txt\" etc validators.txt)
"
)
install(
CODE
"
set(CMAKE_MODULE_PATH \"${CMAKE_MODULE_PATH}\")
include(create_symbolic_link)
create_symbolic_link(xrpld${suffix} \
\$ENV{DESTDIR}\${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_BINDIR}/rippled${suffix})
"
)
endif()
install(
FILES
${CMAKE_CURRENT_SOURCE_DIR}/cmake/XrplConfig.cmake
${CMAKE_CURRENT_BINARY_DIR}/XrplConfigVersion.cmake
DESTINATION lib/cmake/xrpl
COMPONENT development
)

View File

@@ -50,6 +50,13 @@ if(MSVC AND CMAKE_GENERATOR_PLATFORM STREQUAL "Win32")
message(FATAL_ERROR "Visual Studio 32-bit build is not supported.")
endif()
if(voidstar AND NOT is_amd64)
message(
FATAL_ERROR
"The voidstar library only supported on amd64/x86_64. Detected archictecture was: ${CMAKE_SYSTEM_PROCESSOR}"
)
endif()
if(APPLE AND NOT HOMEBREW)
find_program(HOMEBREW brew)
endif()

View File

@@ -71,12 +71,14 @@ words:
- coldwallet
- compr
- conanfile
- cppcoro
- conanrun
- confs
- connectability
- coro
- coros
- cowid
- cppcoro
- cryptocondition
- cryptoconditional
- cryptoconditions
@@ -99,11 +101,14 @@ words:
- endmacro
- exceptioned
- Falco
- fcontext
- finalizers
- firewalled
- fcontext
- fmtdur
- fsanitize
- funclets
- gantt
- gcov
- gcovr
- ghead
@@ -187,6 +192,7 @@ words:
- ostr
- pargs
- partitioner
- pratik
- paychan
- paychans
- permdex
@@ -194,6 +200,7 @@ words:
- permissioned
- pointee
- populator
- pratik
- preauth
- preauthorization
- preauthorize
@@ -208,6 +215,7 @@ words:
- queuable
- Raphson
- replayer
- repost
- rerere
- retriable
- RIPD
@@ -238,6 +246,7 @@ words:
- soci
- socidb
- sslws
- stackful
- statsd
- STATSDCOLLECTOR
- stissue

View File

@@ -0,0 +1,699 @@
#pragma once
#include <xrpl/beast/utility/instrumentation.h>
#include <coroutine>
#include <exception>
#include <type_traits>
#include <utility>
#include <variant>
namespace xrpl {
template <typename T = void>
class CoroTask;
/**
* CoroTask<void> -- coroutine return type for void-returning coroutines.
*
* Class / Dependency Diagram
* ==========================
*
* CoroTask<void>
* +-----------------------------------------------+
* | - handle_ : Handle (coroutine_handle<promise>) |
* +-----------------------------------------------+
* | + handle(), done() |
* | + await_ready/suspend/resume (Awaiter iface) |
* +-----------------------------------------------+
* | owns
* v
* promise_type
* +-----------------------------------------------+
* | - exception_ : std::exception_ptr |
* | - continuation_ : std::coroutine_handle<> |
* +-----------------------------------------------+
* | + get_return_object() -> CoroTask |
* | + initial_suspend() -> suspend_always (lazy) |
* | + final_suspend() -> FinalAwaiter |
* | + return_void() |
* | + unhandled_exception() |
* +-----------------------------------------------+
* | returns at final_suspend
* v
* FinalAwaiter
* +-----------------------------------------------+
* | await_suspend(h): |
* | if continuation_ set -> symmetric transfer |
* | else -> noop_coroutine |
* +-----------------------------------------------+
*
* Design Notes
* ------------
* - Lazy start: initial_suspend returns suspend_always, so the coroutine
* body does not execute until the handle is explicitly resumed.
* - Symmetric transfer: await_suspend returns a coroutine_handle instead
* of void/bool, allowing the scheduler to jump directly to the next
* coroutine without growing the call stack.
* - Continuation chaining: when one CoroTask is co_await-ed inside
* another, the caller's handle is stored as continuation_ so
* FinalAwaiter can resume it when this task finishes.
* - Move-only: the handle is exclusively owned; copy is deleted.
*
* Usage Examples
* ==============
*
* 1. Basic void coroutine (the most common case in rippled):
*
* CoroTask<void> doWork(std::shared_ptr<CoroTaskRunner> runner) {
* // do something
* co_await runner->suspend(); // yield control
* // resumed later via runner->post() or runner->resume()
* co_return;
* }
*
* 2. co_await-ing one CoroTask<void> from another (chaining):
*
* CoroTask<void> inner() {
* // ...
* co_return;
* }
* CoroTask<void> outer() {
* co_await inner(); // continuation_ links outer -> inner
* co_return; // FinalAwaiter resumes outer
* }
*
* 3. Exceptions propagate through co_await:
*
* CoroTask<void> failing() {
* throw std::runtime_error("oops");
* co_return;
* }
* CoroTask<void> caller() {
* try { co_await failing(); }
* catch (std::runtime_error const&) { // caught here }
* }
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Dangling references in coroutine parameters.
* Coroutine parameters are copied into the frame, but references
* are NOT -- they are stored as-is. If the referent goes out of scope
* before the coroutine finishes, you get use-after-free.
*
* // BROKEN -- local dies before coroutine runs:
* CoroTask<void> bad(int& ref) { co_return; }
* void launch() {
* int local = 42;
* auto task = bad(local); // frame stores &local
* } // local destroyed; frame holds dangling ref
*
* // FIX -- pass by value, or ensure lifetime via shared_ptr.
*
* BUG-RISK: GCC 14 corrupts reference captures in coroutine lambdas.
* When a lambda that returns CoroTask captures by reference ([&]),
* GCC 14 may generate a corrupted coroutine frame. Always capture
* by explicit pointer-to-value instead:
*
* // BROKEN on GCC 14:
* jq.postCoroTask(t, n, [&](auto) -> CoroTask<void> { ... });
*
* // FIX -- capture pointers explicitly:
* jq.postCoroTask(t, n, [ptr = &val](auto) -> CoroTask<void> { ... });
*
* BUG-RISK: Resuming a destroyed or completed CoroTask.
* Calling handle().resume() after the coroutine has already run to
* completion (done() == true) is undefined behavior. The CoroTaskRunner
* guards against this with an XRPL_ASSERT, but standalone usage of
* CoroTask must check done() before resuming.
*
* BUG-RISK: Moving a CoroTask that is being awaited.
* If task A is co_await-ed by task B (so A.continuation_ == B), moving
* or destroying A will invalidate the continuation link. Never move
* or reassign a CoroTask while it is mid-execution or being awaited.
*
* LIMITATION: CoroTask is fire-and-forget for the top-level owner.
* There is no built-in notification when the coroutine finishes.
* The caller must use external synchronization (e.g. CoroTaskRunner::join
* or a gate/condition_variable) to know when it is done.
*
* LIMITATION: No cancellation token.
* There is no way to cancel a suspended CoroTask from outside. The
* coroutine body must cooperatively check a flag (e.g. jq_.isStopping())
* after each co_await and co_return early if needed.
*
* LIMITATION: Stackless -- cannot suspend from nested non-coroutine calls.
* If a coroutine calls a regular function that wants to "yield", it
* cannot. Only the immediate coroutine body can use co_await.
* This is acceptable for rippled because all yield() sites are shallow.
*/
template <>
class CoroTask<void>
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
/**
* Coroutine promise. Compiler uses this to manage coroutine state.
* Stores the exception (if any) and the continuation handle for
* symmetric transfer back to the awaiting coroutine.
*/
struct promise_type
{
// Captured exception from the coroutine body, rethrown in
// await_resume() when this task is co_await-ed by a caller.
std::exception_ptr exception_;
// Handle to the coroutine that is co_await-ing this task.
// Set by await_suspend(). FinalAwaiter uses it for symmetric
// transfer back to the caller. Null if this is a top-level task.
std::coroutine_handle<> continuation_;
/**
* Create the CoroTask return object.
* Called by the compiler at coroutine creation.
*/
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
/**
* Lazy start. The coroutine body does not execute until the
* handle is explicitly resumed (e.g. by CoroTaskRunner::resume).
*/
std::suspend_always
initial_suspend() noexcept
{
return {};
}
/**
* Awaiter returned by final_suspend(). Uses symmetric transfer:
* if a continuation exists, transfers control directly to it
* (tail-call, no stack growth). Otherwise returns noop_coroutine
* so the coroutine frame stays alive for the owner to destroy.
*/
struct FinalAwaiter
{
/**
* Always false. We need await_suspend to run for
* symmetric transfer.
*/
bool
await_ready() noexcept
{
return false;
}
/**
* Symmetric transfer: returns the continuation handle so
* the compiler emits a tail-call instead of a nested resume.
* If no continuation is set, returns noop_coroutine to
* suspend at final_suspend without destroying the frame.
*
* @param h Handle to this completing coroutine
*
* @return Continuation handle, or noop_coroutine
*/
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
/**
* Returns FinalAwaiter for symmetric transfer at coroutine end.
*/
FinalAwaiter
final_suspend() noexcept
{
return {};
}
/**
* Called by the compiler for `co_return;` (void coroutine).
*/
void
return_void()
{
}
/**
* Called by the compiler when an exception escapes the coroutine
* body. Captures it for later rethrowing in await_resume().
*/
void
unhandled_exception()
{
exception_ = std::current_exception();
}
};
/**
* Default constructor. Creates an empty (null handle) task.
*/
CoroTask() = default;
/**
* Takes ownership of a compiler-generated coroutine handle.
*
* @param h Coroutine handle to own
*/
explicit CoroTask(Handle h) : handle_(h)
{
}
/**
* Destroys the coroutine frame if this task owns one.
*/
~CoroTask()
{
if (handle_)
handle_.destroy();
}
/**
* Move constructor. Transfers handle ownership, leaves other empty.
*/
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
/**
* Move assignment. Destroys current frame (if any), takes other's.
*/
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
/**
* @return The underlying coroutine_handle
*/
Handle
handle() const
{
return handle_;
}
/**
* @return true if the coroutine has run to completion (or thrown)
*/
bool
done() const
{
return handle_ && handle_.done();
}
// -- Awaiter interface: allows `co_await someCoroTask;` --
/**
* Always false. This task is lazy, so co_await always suspends
* the caller to set up the continuation link.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Stores the caller's handle as our continuation, then returns
* our handle for symmetric transfer (caller suspends, we resume).
*
* @param caller Handle of the coroutine doing co_await on us
*
* @return Our handle for symmetric transfer
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
XRPL_ASSERT(handle_, "xrpl::CoroTask<void>::await_suspend : handle is valid");
handle_.promise().continuation_ = caller;
return handle_; // Symmetric transfer
}
/**
* Called in the awaiting coroutine's context after this task
* completes. Rethrows any exception captured by
* unhandled_exception().
*/
void
await_resume()
{
XRPL_ASSERT(handle_, "xrpl::CoroTask<void>::await_resume : handle is valid");
if (auto& ep = handle_.promise().exception_)
std::rethrow_exception(ep);
}
private:
// Exclusively-owned coroutine handle. Null after move or default
// construction. Destroyed in the destructor.
Handle handle_;
};
/**
* CoroTask<T> -- coroutine return type for value-returning coroutines.
*
* Class / Dependency Diagram
* ==========================
*
* CoroTask<T>
* +-----------------------------------------------+
* | - handle_ : Handle (coroutine_handle<promise>) |
* +-----------------------------------------------+
* | + handle(), done() |
* | + await_ready/suspend/resume (Awaiter iface) |
* +-----------------------------------------------+
* | owns
* v
* promise_type
* +-----------------------------------------------+
* | - result_ : variant<monostate, T, |
* | exception_ptr> |
* | - continuation_ : std::coroutine_handle<> |
* +-----------------------------------------------+
* | + get_return_object() -> CoroTask |
* | + initial_suspend() -> suspend_always (lazy) |
* | + final_suspend() -> FinalAwaiter |
* | + return_value(T) -> stores in result_[1] |
* | + unhandled_exception -> stores in result_[2] |
* +-----------------------------------------------+
* | returns at final_suspend
* v
* FinalAwaiter (same symmetric-transfer pattern as CoroTask<void>)
*
* Value Extraction
* ----------------
* await_resume() inspects the variant:
* - index 2 (exception_ptr) -> rethrow
* - index 1 (T) -> return value via move
*
* Usage Examples
* ==============
*
* 1. Simple value return:
*
* CoroTask<int> computeAnswer() { co_return 42; }
*
* CoroTask<void> caller() {
* int v = co_await computeAnswer(); // v == 42
* }
*
* 2. Chaining value-returning coroutines:
*
* CoroTask<int> add(int a, int b) { co_return a + b; }
* CoroTask<int> doubleSum(int a, int b) {
* int s = co_await add(a, b);
* co_return s * 2;
* }
*
* 3. Exception propagation from inner to outer:
*
* CoroTask<int> failing() {
* throw std::runtime_error("bad");
* co_return 0; // never reached
* }
* CoroTask<void> caller() {
* try {
* int v = co_await failing(); // throws here
* } catch (std::runtime_error const& e) {
* // e.what() == "bad"
* }
* }
*
* Caveats / Pitfalls (in addition to CoroTask<void> caveats above)
* ================================================================
*
* BUG-RISK: await_resume() moves the value out of the variant.
* Calling co_await on the same CoroTask<T> instance twice is undefined
* behavior -- the second call will see a moved-from T. CoroTask is
* single-shot: one co_return, one co_await.
*
* BUG-RISK: T must be move-constructible.
* return_value(T) takes by value and moves into the variant.
* Types that are not movable cannot be used as T.
*
* LIMITATION: No co_yield support.
* CoroTask<T> only supports a single co_return. It does not implement
* yield_value(), so using co_yield inside a CoroTask<T> coroutine is a
* compile error. For streaming values, a different return type
* (e.g. Generator<T>) would be needed.
*
* LIMITATION: Result is only accessible via co_await.
* There is no .get() or .result() method. The value can only be
* extracted by co_await-ing the CoroTask<T> from inside another
* coroutine. For extracting results in non-coroutine code, pass a
* pointer to the caller and write through it (as the tests do).
*/
template <typename T>
class CoroTask
{
static_assert(
std::is_move_constructible_v<T>,
"CoroTask<T> requires T to be move-constructible");
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
/**
* Coroutine promise for value-returning coroutines.
* Stores the result as a variant: monostate (not yet set),
* T (co_return value), or exception_ptr (unhandled exception).
*/
struct promise_type
{
// Tri-state result:
// index 0 (monostate) -- coroutine has not yet completed
// index 1 (T) -- co_return value stored here
// index 2 (exception) -- unhandled exception captured here
std::variant<std::monostate, T, std::exception_ptr> result_;
// Handle to the coroutine co_await-ing this task. Used by
// FinalAwaiter for symmetric transfer. Null for top-level tasks.
std::coroutine_handle<> continuation_;
/**
* Create the CoroTask return object.
* Called by the compiler at coroutine creation.
*/
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
/**
* Lazy start. Coroutine body does not run until explicitly resumed.
*/
std::suspend_always
initial_suspend() noexcept
{
return {};
}
/**
* Symmetric-transfer awaiter at coroutine completion.
* Same pattern as CoroTask<void>::FinalAwaiter.
*/
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
/**
* Returns continuation for symmetric transfer, or
* noop_coroutine if this is a top-level task.
*
* @param h Handle to this completing coroutine
*
* @return Continuation handle, or noop_coroutine
*/
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
/**
* Called by the compiler for `co_return value;`.
* Moves the value into result_ at index 1.
*
* @param value The value to store
*/
void
return_value(T value)
{
result_.template emplace<1>(std::move(value));
}
/**
* Captures unhandled exceptions at index 2 of result_.
* Rethrown later in await_resume().
*/
void
unhandled_exception()
{
result_.template emplace<2>(std::current_exception());
}
};
/**
* Default constructor. Creates an empty (null handle) task.
*/
CoroTask() = default;
/**
* Takes ownership of a compiler-generated coroutine handle.
*
* @param h Coroutine handle to own
*/
explicit CoroTask(Handle h) : handle_(h)
{
}
/**
* Destroys the coroutine frame if this task owns one.
*/
~CoroTask()
{
if (handle_)
handle_.destroy();
}
/**
* Move constructor. Transfers handle ownership, leaves other empty.
*/
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
/**
* Move assignment. Destroys current frame (if any), takes other's.
*/
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
/**
* @return The underlying coroutine_handle
*/
Handle
handle() const
{
return handle_;
}
/**
* @return true if the coroutine has run to completion (or thrown)
*/
bool
done() const
{
return handle_ && handle_.done();
}
// -- Awaiter interface: allows `T val = co_await someCoroTask;` --
/**
* Always false. co_await always suspends to set up continuation.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Stores caller as continuation, returns our handle for
* symmetric transfer.
*
* @param caller Handle of the coroutine doing co_await on us
*
* @return Our handle for symmetric transfer
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
XRPL_ASSERT(handle_, "xrpl::CoroTask<T>::await_suspend : handle is valid");
handle_.promise().continuation_ = caller;
return handle_;
}
/**
* Extracts the result: rethrows if exception, otherwise moves
* the T value out of the variant. Single-shot: calling twice
* on the same task is undefined (moved-from T).
*
* @return The co_return-ed value
*/
T
await_resume()
{
XRPL_ASSERT(handle_, "xrpl::CoroTask<T>::await_resume : handle is valid");
auto& result = handle_.promise().result_;
if (auto* ep = std::get_if<2>(&result))
std::rethrow_exception(*ep);
return std::get<1>(std::move(result));
}
private:
// Exclusively-owned coroutine handle. Null after move or default
// construction. Destroyed in the destructor.
Handle handle_;
};
} // namespace xrpl

View File

@@ -0,0 +1,374 @@
#pragma once
/**
* @file CoroTaskRunner.ipp
*
* CoroTaskRunner inline implementation.
*
* This file contains the business logic for managing C++20 coroutines
* on the JobQueue. It is included at the bottom of JobQueue.h.
*
* Data Flow: suspend / post / resume cycle
* =========================================
*
* coroutine body CoroTaskRunner JobQueue
* -------------- -------------- --------
* |
* co_await runner->suspend()
* |
* +--- await_suspend ------> onSuspend()
* | ++nSuspend_ ------------> nSuspend_
* | [coroutine is now suspended]
* |
* . (externally or by yieldAndPost())
* .
* +--- (caller calls) -----> post()
* | ++runCount_
* | addJob(resume) ----------> job enqueued
* | |
* | [worker picks up]
* | |
* +--- <----- resume() <-----------------------------------+
* | --nSuspend_ ------> nSuspend_
* | swap in LocalValues (lvs_)
* | task_.handle().resume()
* | |
* | [coroutine body continues here]
* | |
* | swap out LocalValues
* | --runCount_
* | cv_.notify_all()
* v
*
* Thread Safety
* =============
* - mutex_ : guards task_.handle().resume() so that post()-before-suspend
* races cannot resume the coroutine while it is still running.
* (See the race condition discussion in JobQueue.h)
* - mutex_run_ : guards runCount_ counter; used by join() to wait until
* all in-flight resume operations complete.
* - jq_.m_mutex: guards nSuspend_ increments/decrements.
*
* Common Mistakes When Modifying This File
* =========================================
*
* 1. Changing lock ordering.
* resume() acquires locks sequentially (never held simultaneously):
* jq_.m_mutex (released immediately), then mutex_ (held across resume),
* then mutex_run_ (released after decrement). post() acquires only
* mutex_run_. Any new code path must follow the same order.
*
* 2. Removing the shared_from_this() capture in post().
* The lambda passed to addJob captures [this, sp = shared_from_this()].
* If you remove sp, 'this' can be destroyed before the job runs,
* causing use-after-free. The sp capture is load-bearing.
*
* 3. Forgetting to decrement nSuspend_ on a new code path.
* Every ++nSuspend_ must have a matching --nSuspend_. If you add a new
* suspension path (e.g. a new awaiter) and forget to decrement on resume
* or on failure, JobQueue::stop() will hang.
*
* 4. Calling task_.handle().resume() without holding mutex_.
* This allows a race where the coroutine runs on two threads
* simultaneously. Always hold mutex_ around resume().
*
* 5. Swapping LocalValues outside of the mutex_ critical section.
* The swap-in and swap-out of LocalValues must bracket the resume()
* call. If you move the swap-out before the lock_guard(mutex_) is
* released, you break LocalValue isolation for any code that runs
* after the coroutine suspends but before the lock is dropped.
*/
namespace xrpl {
/**
* Construct a CoroTaskRunner. Sets runCount_ to 0; does not
* create the coroutine. Call init() afterwards.
*
* @param jq The JobQueue this coroutine will run on
* @param type Job type for scheduling priority
* @param name Human-readable name for logging
*/
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
create_t,
JobQueue& jq,
JobType type,
std::string const& name)
: jq_(jq), type_(type), name_(name), runCount_(0)
{
}
/**
* Initialize with a coroutine-returning callable.
* Stores the callable on the heap (FuncStore) so it outlives the
* coroutine frame. Coroutine frames store a reference to the
* callable's implicit object parameter (the lambda). If the callable
* is a temporary, that reference dangles after the caller returns.
* Keeping the callable alive here ensures the coroutine's captures
* remain valid.
*
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
*/
template <class F>
void
JobQueue::CoroTaskRunner::init(F&& f)
{
using Fn = std::decay_t<F>;
auto store = std::make_unique<FuncStore<Fn>>(std::forward<F>(f));
task_ = store->func(shared_from_this());
storedFunc_ = std::move(store);
}
/**
* Destructor. Waits for any in-flight resume() to complete, then
* asserts (debug) that the coroutine has finished or
* expectEarlyExit() was called.
*
* The join() call is necessary because with async dispatch the
* coroutine runs on a worker thread. The gate signal (which wakes
* the test thread) can arrive before resume() has set finished_.
* join() synchronizes via mutex_run_, establishing a happens-before
* edge: finished_ = true -> unlock(mutex_run_) in resume() ->
* lock(mutex_run_) in join() -> read finished_.
*/
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
{
#ifndef NDEBUG
join();
XRPL_ASSERT(finished_, "xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
#endif
}
/**
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
*/
inline void
JobQueue::CoroTaskRunner::onSuspend()
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
/**
* Decrement nSuspend_ without resuming.
*/
inline void
JobQueue::CoroTaskRunner::onUndoSuspend()
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
/**
* Return a SuspendAwaiter whose await_suspend() increments nSuspend_
* before the coroutine actually suspends. The caller must later call
* post() or resume() to continue execution.
*
* @return Awaiter for use with `co_await runner->suspend()`
*/
inline auto
JobQueue::CoroTaskRunner::suspend()
{
/**
* Custom awaiter for suspend(). Always suspends (await_ready
* returns false) and increments nSuspend_ in await_suspend().
*/
struct SuspendAwaiter
{
CoroTaskRunner& runner_; // The runner that owns this coroutine.
/**
* Always returns false so the coroutine suspends.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Called when the coroutine suspends. Increments nSuspend_
* so the JobQueue knows a coroutine is waiting.
*/
void
await_suspend(std::coroutine_handle<>) const
{
runner_.onSuspend();
}
void
await_resume() const noexcept
{
}
};
return SuspendAwaiter{*this};
}
/**
* Suspend and immediately repost on the JobQueue. Equivalent to
* `co_await JobQueueAwaiter{runner}` but uses an inline struct
* to work around a GCC-12 codegen bug (see declaration in JobQueue.h).
*
* If the JobQueue is stopping (post fails), the suspend count is
* undone and the coroutine is resumed immediately via h.resume().
*
* @return An inline YieldPostAwaiter
*/
inline auto
JobQueue::CoroTaskRunner::yieldAndPost()
{
struct YieldPostAwaiter
{
CoroTaskRunner& runner_;
bool
await_ready() const noexcept
{
return false;
}
void
await_suspend(std::coroutine_handle<> h)
{
runner_.onSuspend();
if (!runner_.post())
{
runner_.onUndoSuspend();
h.resume();
}
}
void
await_resume() const noexcept
{
}
};
return YieldPostAwaiter{*this};
}
/**
* Schedule coroutine resumption as a job on the JobQueue.
* A shared_ptr capture (sp) prevents this CoroTaskRunner from being
* destroyed while the job is queued but not yet executed.
*
* @return false if the JobQueue rejected the job (shutting down)
*/
inline bool
JobQueue::CoroTaskRunner::post()
{
{
std::lock_guard lk(mutex_run_);
++runCount_;
}
// sp prevents 'this' from being destroyed while the job is pending
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Undo the runCount_ increment.
std::lock_guard lk(mutex_run_);
--runCount_;
cv_.notify_all();
return false;
}
/**
* Resume the coroutine on the current thread.
*
* Steps:
* 1. Decrement nSuspend_ (under jq_.m_mutex)
* 2. Swap in this coroutine's LocalValues for thread-local isolation
* 3. Resume the coroutine handle (under mutex_)
* 4. Swap out LocalValues, restoring the thread's previous state
* 5. Decrement runCount_ and notify join() waiters
*
* @pre post() must have been called before resume(). Direct calls
* without a prior post() will corrupt runCount_ and break join().
* Note: runCount_ is NOT incremented here — post() already did that.
* This ensures join() stays blocked for the entire post->resume lifetime.
*/
inline void
JobQueue::CoroTaskRunner::resume()
{
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(
task_.handle() && !task_.done(),
"xrpl::JobQueue::CoroTaskRunner::resume : task handle is valid and not done");
task_.handle().resume();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
if (task_.done())
{
finished_ = true;
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
// Use std::move (not task_ = {}) so task_.handle_ is null BEFORE the
// frame is destroyed. operator= would destroy the frame while handle_
// still holds the old value -- a re-entrancy hazard on GCC-12 if
// frame destruction triggers runner cleanup.
[[maybe_unused]] auto completed = std::move(task_);
}
std::lock_guard lk(mutex_run_);
--runCount_;
cv_.notify_all();
}
/**
* @return true if the coroutine has not yet run to completion
*/
inline bool
JobQueue::CoroTaskRunner::runnable() const
{
// After normal completion, task_ is reset to break the shared_ptr cycle
// (handle_ becomes null). A null handle means the coroutine is done.
return task_.handle() && !task_.done();
}
/**
* Handle early termination when the coroutine never ran (e.g. JobQueue
* is stopping). Decrements nSuspend_ and destroys the coroutine frame
* to break the shared_ptr cycle: frame -> lambda -> runner -> frame.
*/
inline void
JobQueue::CoroTaskRunner::expectEarlyExit()
{
if (!finished_)
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
finished_ = true;
}
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
// The coroutine is at initial_suspend and never ran user code, so
// destroying it is safe. Use std::move (not task_ = {}) so
// task_.handle_ is null before the frame is destroyed.
{
[[maybe_unused]] auto completed = std::move(task_);
}
storedFunc_.reset();
}
/**
* Block until all pending/active resume operations complete.
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0 or
* finished_ becomes true. The finished_ check handles the case
* where resume() is called directly (without post()), which
* decrements runCount_ below zero. In that scenario runCount_
* never returns to 0, but finished_ becoming true guarantees
* the coroutine is done and no more resumes will occur.
*/
inline void
JobQueue::CoroTaskRunner::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return runCount_ == 0 || finished_; });
}
} // namespace xrpl

View File

@@ -2,6 +2,7 @@
#include <xrpl/basics/LocalValue.h>
#include <xrpl/core/ClosureCounter.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobTypeData.h>
#include <xrpl/core/JobTypes.h>
#include <xrpl/core/detail/Workers.h>
@@ -9,6 +10,7 @@
#include <boost/coroutine/all.hpp>
#include <coroutine>
#include <set>
namespace xrpl {
@@ -119,6 +121,419 @@ public:
join();
};
/** C++20 coroutine lifecycle manager. Replaces Coro for new code.
*
* Class / Inheritance / Dependency Diagram
* =========================================
*
* std::enable_shared_from_this<CoroTaskRunner>
* ^
* | (public inheritance)
* |
* CoroTaskRunner
* +---------------------------------------------------+
* | - lvs_ : detail::LocalValues |
* | - jq_ : JobQueue& |
* | - type_ : JobType |
* | - name_ : std::string |
* | - runCount_ : int (in-flight resumes) |
* | - mutex_ : std::mutex (coroutine guard) |
* | - mutex_run_ : std::mutex (join guard) |
* | - cv_ : condition_variable |
* | - task_ : CoroTask<void> |
* | - storedFunc_ : unique_ptr<FuncBase> (type-erased)|
* +---------------------------------------------------+
* | + init(F&&) : set up coroutine callable |
* | + onSuspend() : ++jq_.nSuspend_ |
* | + onUndoSuspend() : --jq_.nSuspend_ |
* | + suspend() : returns SuspendAwaiter |
* | + post() : schedule resume on JobQueue |
* | + resume() : resume coroutine on caller |
* | + runnable() : !task_.done() |
* | + expectEarlyExit() : teardown for failed post |
* | + join() : block until not running |
* +---------------------------------------------------+
* | |
* | owns | references
* v v
* CoroTask<void> JobQueue
* (coroutine frame) (thread pool + nSuspend_)
*
* FuncBase / FuncStore<F> (type-erased heap storage
* for the coroutine lambda)
*
* Coroutine Lifecycle (Control Flow)
* ===================================
*
* Caller thread JobQueue worker thread
* ------------- ----------------------
* postCoroTask(f)
* |
* +-- check stopping_ (reject if JQ shutting down)
* +-- ++nSuspend_ (lazy start counts as suspended)
* +-- make_shared<CoroTaskRunner>
* +-- init(f)
* | +-- store lambda on heap (FuncStore)
* | +-- task_ = f(shared_from_this())
* | [coroutine created, suspended at initial_suspend]
* +-- post()
* | +-- ++runCount_
* | +-- addJob(type_, [resume]{})
* | resume()
* | |
* | +-- --nSuspend_
* | +-- swap in LocalValues
* | +-- task_.handle().resume()
* | | [coroutine body runs]
* | | ...
* | | co_await suspend()
* | | +-- ++nSuspend_
* | | [coroutine suspends]
* | +-- swap out LocalValues
* | +-- --runCount_
* | +-- cv_.notify_all()
* |
* post() <-- called externally or by yieldAndPost()
* +-- ++runCount_
* +-- addJob(type_, [resume]{})
* resume()
* |
* +-- [coroutine body continues]
* +-- co_return
* +-- --runCount_
* +-- cv_.notify_all()
* join()
* +-- cv_.wait([]{runCount_ == 0})
* +-- [done]
*
* Usage Examples
* ==============
*
* 1. Fire-and-forget coroutine (most common pattern):
*
* jq.postCoroTask(jtCLIENT, "MyWork",
* [](auto runner) -> CoroTask<void> {
* doSomeWork();
* co_await runner->suspend(); // yield to other jobs
* doMoreWork();
* co_return;
* });
*
* 2. Manually controlling suspend / resume (external trigger):
*
* auto runner = jq.postCoroTask(jtCLIENT, "ExtTrigger",
* [&result](auto runner) -> CoroTask<void> {
* startAsyncOperation(callback);
* co_await runner->suspend();
* // callback called runner->post() to get here
* result = collectResult();
* co_return;
* });
* // ... later, from the callback:
* runner->post(); // reschedule the coroutine on the JobQueue
*
* 3. Using yieldAndPost() for automatic suspend + repost:
*
* jq.postCoroTask(jtCLIENT, "AutoRepost",
* [](auto runner) -> CoroTask<void> {
* step1();
* co_await runner->yieldAndPost(); // yield + auto-repost
* step2();
* co_await runner->yieldAndPost();
* step3();
* co_return;
* });
*
* 4. Checking shutdown after co_await (cooperative cancellation):
*
* jq.postCoroTask(jtCLIENT, "Cancellable",
* [&jq](auto runner) -> CoroTask<void> {
* while (moreWork()) {
* co_await runner->yieldAndPost();
* if (jq.isStopping())
* co_return; // bail out cleanly
* processNextItem();
* }
* co_return;
* });
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Calling suspend() without a matching post()/resume().
* After co_await runner->suspend(), the coroutine is parked and
* nSuspend_ is incremented. If nothing ever calls post() or
* resume(), the coroutine is leaked and JobQueue::stop() will
* hang forever waiting for nSuspend_ to reach zero.
*
* BUG-RISK: Calling post() on an already-running coroutine.
* post() schedules a resume() job. If the coroutine has not
* actually suspended yet (no co_await executed), the resume job
* will try to call handle().resume() while the coroutine is still
* running on another thread. This is UB. The mutex_ prevents
* data corruption but the logic is wrong — always co_await
* suspend() before calling post(). (The test incorrect_order()
* shows this works only because mutex_ serializes the calls.)
*
* BUG-RISK: Dropping the shared_ptr<CoroTaskRunner> before join().
* The CoroTaskRunner destructor asserts that finished_ is true
* (the coroutine completed). If you let the last shared_ptr die
* while the coroutine is still running or suspended, you get an
* assertion failure in debug and UB in release. Always call
* join() or expectEarlyExit() first.
*
* BUG-RISK: Lambda captures outliving the coroutine frame.
* The lambda passed to postCoroTask is heap-allocated (FuncStore)
* to prevent dangling. But objects captured by pointer still need
* their own lifetime management. If you capture a raw pointer to
* a stack variable, and the stack frame exits before the coroutine
* finishes, the pointer dangles. Use shared_ptr or ensure the
* pointed-to object outlives the coroutine.
*
* BUG-RISK: Forgetting co_return in a void coroutine.
* If the coroutine body falls off the end without co_return,
* the compiler may silently treat it as co_return (per standard),
* but some compilers warn. Always write explicit co_return.
*
* LIMITATION: CoroTaskRunner only supports CoroTask<void>.
* The task_ member is CoroTask<void>. To return values from
* the top-level coroutine, write through a captured pointer
* (as the tests demonstrate), or co_await inner CoroTask<T>
* coroutines that return values.
*
* LIMITATION: One coroutine per CoroTaskRunner.
* init() must be called exactly once. You cannot reuse a
* CoroTaskRunner to run a second coroutine. Create a new one
* via postCoroTask() instead.
*
* LIMITATION: No timeout on join().
* join() blocks indefinitely. If the coroutine is suspended
* and never posted, join() will deadlock. Use timed waits
* on the gate pattern (condition_variable + wait_for) in tests.
*/
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
{
private:
// Per-coroutine thread-local storage. Swapped in before resume()
// and swapped out after, so each coroutine sees its own LocalValue
// state regardless of which worker thread executes it.
detail::LocalValues lvs_;
// Back-reference to the owning JobQueue. Used to post jobs,
// increment/decrement nSuspend_, and acquire jq_.m_mutex.
JobQueue& jq_;
// Job type passed to addJob() when posting this coroutine.
JobType type_;
// Human-readable name for this coroutine job (for logging).
std::string name_;
// Number of in-flight resume operations (pending + active).
// Incremented by post(), decremented when resume() finishes.
// Guarded by mutex_run_. join() blocks until this reaches 0.
//
// A counter (not a bool) is needed because post() can be called
// from within the coroutine body (e.g. via yieldAndPost()),
// enqueuing a second resume while the first is still running.
// A bool would be clobbered: R2.post() sets true, then R1's
// cleanup sets false — losing the fact that R2 is still pending.
int runCount_;
// Serializes all coroutine resume() calls, preventing concurrent
// execution of the coroutine body on multiple threads. Handles the
// race where post() enqueues a resume before the coroutine has
// actually suspended (post-before-suspend pattern).
std::mutex mutex_;
// Guards runCount_. Used with cv_ for join() to wait
// until all pending/active resume operations complete.
std::mutex mutex_run_;
// Notified when runCount_ reaches zero, allowing
// join() waiters to wake up.
std::condition_variable cv_;
// The coroutine handle wrapper. Owns the coroutine frame.
// Set by init(). Reset to empty in resume() upon coroutine
// completion (to break the shared_ptr cycle) or in
// expectEarlyExit() on early termination.
CoroTask<void> task_;
/**
* Type-erased base for heap-stored callables.
* Prevents the coroutine lambda from being destroyed before
* the coroutine frame is done with it.
*
* @see FuncStore
*/
struct FuncBase
{
virtual ~FuncBase() = default;
};
/**
* Concrete type-erased storage for a callable of type F.
* The coroutine frame stores a reference to the lambda's implicit
* object parameter. If the lambda is a temporary, that reference
* dangles after the call returns. FuncStore keeps it alive on
* the heap for the lifetime of the CoroTaskRunner.
*/
template <class F>
struct FuncStore : FuncBase
{
F func; // The stored callable (coroutine lambda).
explicit FuncStore(F&& f) : func(std::move(f))
{
}
};
// Heap-allocated callable storage. Set by init(), ensures the
// lambda outlives the coroutine frame that references it.
std::unique_ptr<FuncBase> storedFunc_;
// True once the coroutine has completed or expectEarlyExit() was
// called. Asserted in the destructor (debug) to catch leaked
// runners. Available in all builds to guard expectEarlyExit()
// against double-decrementing nSuspend_.
bool finished_ = false;
public:
/**
* Tag type for private construction. Prevents external code
* from constructing CoroTaskRunner directly. Use postCoroTask().
*/
struct create_t
{
explicit create_t() = default;
};
/**
* Construct a CoroTaskRunner. Private by convention (create_t tag).
*
* @param jq The JobQueue this coroutine will run on
* @param type Job type for scheduling priority
* @param name Human-readable name for logging
*/
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
CoroTaskRunner(CoroTaskRunner const&) = delete;
CoroTaskRunner&
operator=(CoroTaskRunner const&) = delete;
/**
* Destructor. Asserts (debug) that the coroutine has finished
* or expectEarlyExit() was called.
*/
~CoroTaskRunner();
/**
* Initialize with a coroutine-returning callable.
* Must be called exactly once, after the object is managed by
* shared_ptr (because init uses shared_from_this internally).
* This is handled automatically by postCoroTask().
*
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
*/
template <class F>
void
init(F&& f);
/**
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
* Called when the coroutine is about to suspend. Every call
* must be balanced by a corresponding decrement (via resume()
* or onUndoSuspend()), or JobQueue::stop() will hang.
*/
void
onSuspend();
/**
* Decrement nSuspend_ without resuming.
* Used to undo onSuspend() when a scheduled post() fails
* (e.g. JobQueue is stopping).
*/
void
onUndoSuspend();
/**
* Suspend the coroutine.
* The awaiter's await_suspend() increments nSuspend_ before the
* coroutine actually suspends. The caller must later call post()
* or resume() to continue execution.
*
* @return An awaiter for use with `co_await runner->suspend()`
*/
auto
suspend();
/**
* Suspend the coroutine and immediately repost it on the
* JobQueue. Combines suspend() + post() atomically inside
* await_suspend, so there is no window where an external
* event could race between the two.
*
* Equivalent to JobQueueAwaiter but defined as an inline
* awaiter returned from a member function. This avoids a
* GCC-12 coroutine codegen bug where an external awaiter
* struct (JobQueueAwaiter) used at multiple co_await points
* corrupts the coroutine state machine's resume index,
* causing the coroutine to hang on the third resumption.
*
* @return An awaiter for use with `co_await runner->yieldAndPost()`
*/
auto
yieldAndPost();
/**
* Schedule coroutine resumption as a job on the JobQueue.
* Captures shared_from_this() to prevent this runner from being
* destroyed while the job is queued.
*
* @return true if the job was accepted; false if the JobQueue
* is stopping (caller must handle cleanup)
*/
bool
post();
/**
* Resume the coroutine on the current thread.
* Decrements nSuspend_, swaps in LocalValues, resumes the
* coroutine handle, swaps out LocalValues, and notifies join()
* waiters. Lock ordering (sequential, non-overlapping):
* jq_.m_mutex -> mutex_ -> mutex_run_.
*
* @pre post() must have been called before resume(). Direct
* calls without a prior post() will corrupt runCount_
* and break join().
*/
void
resume();
/**
* @return true if the coroutine has not yet run to completion
*/
bool
runnable() const;
/**
* Handle early termination when the coroutine never ran.
* Decrements nSuspend_ and destroys the coroutine frame to
* break the shared_ptr cycle (frame -> lambda -> runner -> frame).
* Called by postCoroTask() when post() fails.
*/
void
expectEarlyExit();
/**
* Block until all pending/active resume operations complete.
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0.
* Warning: deadlocks if the coroutine is suspended and never posted.
*/
void
join();
};
using JobFunction = std::function<void()>;
JobQueue(
@@ -165,6 +580,19 @@ public:
std::shared_ptr<Coro>
postCoro(JobType t, std::string const& name, F&& f);
/** Creates a C++20 coroutine and adds a job to the queue to run it.
@param t The type of job.
@param name Name of the job.
@param f Callable with signature
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
*/
template <class F>
std::shared_ptr<CoroTaskRunner>
postCoroTask(JobType t, std::string const& name, F&& f);
/** Jobs waiting at this priority.
*/
int
@@ -379,6 +807,7 @@ private:
} // namespace xrpl
#include <xrpl/core/Coro.ipp>
#include <xrpl/core/CoroTaskRunner.ipp>
namespace xrpl {
@@ -401,4 +830,69 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
return coro;
}
// postCoroTask — entry point for launching a C++20 coroutine on the JobQueue.
//
// Control Flow
// ============
//
// postCoroTask(t, name, f)
// |
// +-- 1. Check stopping_ — reject if JQ shutting down
// |
// +-- 2. ++nSuspend_ (mirrors Boost Coro ctor's implicit yield)
// | The coroutine is "suspended" from the JobQueue's perspective
// | even though it hasn't run yet — this keeps the JQ shutdown
// | logic correct (it waits for nSuspend_ to reach 0).
// |
// +-- 3. Create CoroTaskRunner (shared_ptr, ref-counted)
// |
// +-- 4. runner->init(f)
// | +-- Heap-allocate the lambda (FuncStore) to prevent
// | | dangling captures in the coroutine frame
// | +-- task_ = f(shared_from_this())
// | [coroutine created but NOT started — lazy initial_suspend]
// |
// +-- 5. runner->post()
// | +-- addJob(type_, [resume]{}) → resume on worker thread
// | +-- failure (JQ stopping):
// | +-- runner->expectEarlyExit()
// | | --nSuspend_, destroy coroutine frame
// | +-- return nullptr
//
// Why async post() instead of synchronous resume()?
// ==================================================
// The initial dispatch MUST use async post() so the coroutine body runs on
// a JobQueue worker thread, not the caller's thread. resume() swaps the
// caller's thread-local LocalValues with the coroutine's private copy.
// If the coroutine mutates LocalValues (e.g. thread_specific_storage test),
// those mutations bleed back into the caller's thread-local state after the
// swap-out, corrupting subsequent tests that share the same thread pool.
// Async post() avoids this by running the coroutine on a worker thread whose
// LocalValues are managed by the thread pool, not by the caller.
//
template <class F>
std::shared_ptr<JobQueue::CoroTaskRunner>
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
{
// Reject if the JQ is shutting down — matches addJob()'s stopping_ check.
// Must check before incrementing nSuspend_ to avoid leaving an orphan
// count that would cause stop() to hang.
if (stopping_)
return nullptr;
{
std::lock_guard lock(m_mutex);
++nSuspend_;
}
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
runner->init(std::forward<F>(f));
if (!runner->post())
{
runner->expectEarlyExit();
runner.reset();
}
return runner;
}
} // namespace xrpl

View File

@@ -0,0 +1,206 @@
#pragma once
#include <xrpl/core/JobQueue.h>
#include <coroutine>
#include <memory>
namespace xrpl {
/**
* Awaiter that suspends and immediately reschedules on the JobQueue.
* Equivalent to calling yield() followed by post() in the old Coro API.
*
* Usage:
* co_await JobQueueAwaiter{runner};
*
* What it waits for: The coroutine is re-queued as a job and resumes
* when a worker thread picks it up.
*
* Which thread resumes: A JobQueue worker thread.
*
* What await_resume() returns: void.
*
* Dependency Diagram
* ==================
*
* JobQueueAwaiter
* +----------------------------------------------+
* | + runner : shared_ptr<CoroTaskRunner> |
* +----------------------------------------------+
* | + await_ready() -> false (always suspend) |
* | + await_suspend() -> bool (suspend or cancel) |
* | + await_resume() -> void |
* +----------------------------------------------+
* | |
* | uses | uses
* v v
* CoroTaskRunner JobQueue
* .onSuspend() (via runner->post() -> addJob)
* .onUndoSuspend()
* .post()
*
* Control Flow (await_suspend)
* ============================
*
* co_await JobQueueAwaiter{runner}
* |
* +-- await_ready() -> false
* +-- await_suspend(handle)
* |
* +-- runner->onSuspend() // ++nSuspend_
* +-- runner->post() // addJob to JobQueue
* | |
* | +-- success? return noop_coroutine()
* | | // coroutine stays suspended;
* | | // worker thread will call resume()
* | +-- failure? (JQ stopping)
* | +-- runner->onUndoSuspend() // --nSuspend_
* | +-- return handle // symmetric transfer back
* | // coroutine continues immediately
* | // so it can clean up and co_return
*
* DEPRECATED — prefer `co_await runner->yieldAndPost()`
* =====================================================
*
* GCC-12 has a coroutine codegen bug where using this external awaiter
* struct at multiple co_await points in the same coroutine corrupts the
* state machine's resume index. After the second co_await, the third
* resumption enters handle().resume() but never reaches await_resume()
* or any subsequent user code — the coroutine hangs indefinitely.
*
* The fix is `co_await runner->yieldAndPost()`, which defines the
* awaiter as an inline struct inside a CoroTaskRunner member function.
* GCC-12 handles inline awaiters correctly at multiple co_await points.
*
* This struct is retained for single-use scenarios and documentation
* purposes. For any code that may use co_await in a loop or at
* multiple points, always use `runner->yieldAndPost()`.
*
* Usage Examples
* ==============
*
* 1. Yield and auto-repost (preferred — works on all compilers):
*
* CoroTask<void> handler(auto runner) {
* doPartA();
* co_await runner->yieldAndPost(); // yield + repost
* doPartB(); // runs on a worker thread
* co_return;
* }
*
* 2. Multiple yield points in a loop:
*
* CoroTask<void> batchProcessor(auto runner) {
* for (auto& item : items) {
* process(item);
* co_await runner->yieldAndPost(); // let other jobs run
* }
* co_return;
* }
*
* 3. Graceful shutdown — checking after resume:
*
* CoroTask<void> longTask(auto runner, JobQueue& jq) {
* while (hasWork()) {
* co_await runner->yieldAndPost();
* // If JQ is stopping, await_suspend resumes the coroutine
* // immediately without re-queuing. Always check
* // isStopping() to decide whether to proceed:
* if (jq.isStopping())
* co_return;
* doNextChunk();
* }
* co_return;
* }
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Using a stale or null runner.
* The runner shared_ptr must be valid and point to the CoroTaskRunner
* that owns the coroutine currently executing. Passing a runner from
* a different coroutine, or a default-constructed shared_ptr, is UB.
*
* BUG-RISK: Assuming resume happens on the same thread.
* After co_await, the coroutine resumes on whatever worker thread
* picks up the job. Do not rely on thread-local state unless it is
* managed through LocalValue (which CoroTaskRunner automatically
* swaps in/out).
*
* BUG-RISK: Ignoring the shutdown path.
* When the JobQueue is stopping, post() fails and await_suspend()
* resumes the coroutine immediately (symmetric transfer back to h).
* The coroutine body continues on the same thread. If your code
* after co_await assumes it was re-queued and is running on a worker
* thread, that assumption breaks during shutdown. Always handle the
* "JQ is stopping" case, either by checking jq.isStopping() or by
* letting the coroutine fall through to co_return naturally.
*
* DIFFERENCE from runner->suspend() + runner->post():
* Both JobQueueAwaiter and yieldAndPost() combine suspend + post
* in one atomic operation. With the manual suspend()/post() pattern,
* there is a window between the two calls where an external event
* could race. The atomic awaiters remove that window — onSuspend()
* and post() happen within the same await_suspend() call while the
* coroutine is guaranteed to be suspended. Use yieldAndPost() unless
* you need an external party to decide *when* to call post().
*/
struct JobQueueAwaiter
{
// The CoroTaskRunner that owns the currently executing coroutine.
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
/**
* Always returns false so the coroutine suspends.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Increment nSuspend (equivalent to yield()) and schedule resume
* on the JobQueue (equivalent to post()). If the JobQueue is
* stopping, undoes the suspend count and transfers back to the
* coroutine so it can clean up and co_return.
*
* Returns a coroutine_handle<> (symmetric transfer) instead of
* bool to work around a GCC-12 codegen bug where bool-returning
* await_suspend leaves the coroutine in an invalid state —
* neither properly suspended nor resumed — causing a hang.
*
* WARNING: GCC-12 has an additional codegen bug where using this
* external awaiter struct at multiple co_await points in the same
* coroutine corrupts the state machine's resume index, causing the
* coroutine to hang on the third resumption. Prefer
* `co_await runner->yieldAndPost()` which uses an inline awaiter
* that GCC-12 handles correctly.
*
* @return noop_coroutine() to stay suspended (job posted);
* the caller's handle to resume immediately (JQ stopping)
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> h)
{
XRPL_ASSERT(runner, "xrpl::JobQueueAwaiter::await_suspend : runner is valid");
runner->onSuspend();
if (!runner->post())
{
// JobQueue is stopping. Undo the suspend count and
// transfer back to the coroutine so it can clean up
// and co_return.
runner->onUndoSuspend();
return h;
}
return std::noop_coroutine();
}
void
await_resume() const noexcept
{
}
};
} // namespace xrpl

View File

@@ -42,7 +42,7 @@ TRANSACTION(ttPAYMENT, 0, Payment,
/** This transaction type creates an escrow object. */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/escrow/EscrowCreate.h>
# include <xrpl/tx/transactors/escrow/Escrow.h>
#endif
TRANSACTION(ttESCROW_CREATE, 1, EscrowCreate,
Delegation::delegable,
@@ -58,9 +58,6 @@ TRANSACTION(ttESCROW_CREATE, 1, EscrowCreate,
}))
/** This transaction type completes an existing escrow. */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/escrow/EscrowFinish.h>
#endif
TRANSACTION(ttESCROW_FINISH, 2, EscrowFinish,
Delegation::delegable,
uint256{},
@@ -97,7 +94,7 @@ TRANSACTION(ttACCOUNT_SET, 3, AccountSet,
/** This transaction type cancels an existing escrow. */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/escrow/EscrowCancel.h>
# include <xrpl/tx/transactors/escrow/Escrow.h>
#endif
TRANSACTION(ttESCROW_CANCEL, 4, EscrowCancel,
Delegation::delegable,
@@ -183,7 +180,7 @@ TRANSACTION(ttSIGNER_LIST_SET, 12, SignerListSet,
/** This transaction type creates a new unidirectional XRP payment channel. */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/payment_channel/PayChanCreate.h>
# include <xrpl/tx/transactors/payment_channel/PayChan.h>
#endif
TRANSACTION(ttPAYCHAN_CREATE, 13, PaymentChannelCreate,
Delegation::delegable,
@@ -199,9 +196,6 @@ TRANSACTION(ttPAYCHAN_CREATE, 13, PaymentChannelCreate,
}))
/** This transaction type funds an existing unidirectional XRP payment channel. */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/payment_channel/PayChanFund.h>
#endif
TRANSACTION(ttPAYCHAN_FUND, 14, PaymentChannelFund,
Delegation::delegable,
uint256{},
@@ -213,9 +207,6 @@ TRANSACTION(ttPAYCHAN_FUND, 14, PaymentChannelFund,
}))
/** This transaction type submits a claim against an existing unidirectional payment channel. */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/payment_channel/PayChanClaim.h>
#endif
TRANSACTION(ttPAYCHAN_CLAIM, 15, PaymentChannelClaim,
Delegation::delegable,
uint256{},
@@ -626,7 +617,7 @@ TRANSACTION(ttXCHAIN_CREATE_BRIDGE, 48, XChainCreateBridge,
/** This transaction type creates or updates a DID */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/did/DIDSet.h>
# include <xrpl/tx/transactors/did/DID.h>
#endif
TRANSACTION(ttDID_SET, 49, DIDSet,
Delegation::delegable,
@@ -639,9 +630,6 @@ TRANSACTION(ttDID_SET, 49, DIDSet,
}))
/** This transaction type deletes a DID */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/did/DIDDelete.h>
#endif
TRANSACTION(ttDID_DELETE, 50, DIDDelete,
Delegation::delegable,
featureDID,
@@ -751,7 +739,7 @@ TRANSACTION(ttMPTOKEN_AUTHORIZE, 57, MPTokenAuthorize,
/** This transaction type create an Credential instance */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/credentials/CredentialCreate.h>
# include <xrpl/tx/transactors/credentials/Credentials.h>
#endif
TRANSACTION(ttCREDENTIAL_CREATE, 58, CredentialCreate,
Delegation::delegable,
@@ -765,9 +753,6 @@ TRANSACTION(ttCREDENTIAL_CREATE, 58, CredentialCreate,
}))
/** This transaction type accept an Credential object */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/credentials/CredentialAccept.h>
#endif
TRANSACTION(ttCREDENTIAL_ACCEPT, 59, CredentialAccept,
Delegation::delegable,
featureCredentials,
@@ -778,9 +763,6 @@ TRANSACTION(ttCREDENTIAL_ACCEPT, 59, CredentialAccept,
}))
/** This transaction type delete an Credential object */
#if TRANSACTION_INCLUDE
# include <xrpl/tx/transactors/credentials/CredentialDelete.h>
#endif
TRANSACTION(ttCREDENTIAL_DELETE, 60, CredentialDelete,
Delegation::delegable,
featureCredentials,

View File

@@ -1,29 +0,0 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class CredentialAccept : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit CredentialAccept(ApplyContext& ctx) : Transactor(ctx)
{
}
static std::uint32_t
getFlagsMask(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
} // namespace xrpl

View File

@@ -1,29 +0,0 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class CredentialCreate : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit CredentialCreate(ApplyContext& ctx) : Transactor(ctx)
{
}
static std::uint32_t
getFlagsMask(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
} // namespace xrpl

View File

@@ -1,29 +0,0 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class CredentialDelete : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit CredentialDelete(ApplyContext& ctx) : Transactor(ctx)
{
}
static std::uint32_t
getFlagsMask(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
} // namespace xrpl

View File

@@ -0,0 +1,77 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class CredentialCreate : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit CredentialCreate(ApplyContext& ctx) : Transactor(ctx)
{
}
static std::uint32_t
getFlagsMask(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
//------------------------------------------------------------------------------
class CredentialDelete : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit CredentialDelete(ApplyContext& ctx) : Transactor(ctx)
{
}
static std::uint32_t
getFlagsMask(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
//------------------------------------------------------------------------------
class CredentialAccept : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit CredentialAccept(ApplyContext& ctx) : Transactor(ctx)
{
}
static std::uint32_t
getFlagsMask(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
} // namespace xrpl

View File

@@ -4,6 +4,24 @@
namespace xrpl {
class DIDSet : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit DIDSet(ApplyContext& ctx) : Transactor(ctx)
{
}
static NotTEC
preflight(PreflightContext const& ctx);
TER
doApply() override;
};
//------------------------------------------------------------------------------
class DIDDelete : public Transactor
{
public:

View File

@@ -1,23 +0,0 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class DIDSet : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit DIDSet(ApplyContext& ctx) : Transactor(ctx)
{
}
static NotTEC
preflight(PreflightContext const& ctx);
TER
doApply() override;
};
} // namespace xrpl

View File

@@ -0,0 +1,80 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class EscrowCreate : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Custom};
explicit EscrowCreate(ApplyContext& ctx) : Transactor(ctx)
{
}
static TxConsequences
makeTxConsequences(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
//------------------------------------------------------------------------------
class EscrowFinish : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit EscrowFinish(ApplyContext& ctx) : Transactor(ctx)
{
}
static bool
checkExtraFeatures(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static NotTEC
preflightSigValidated(PreflightContext const& ctx);
static XRPAmount
calculateBaseFee(ReadView const& view, STTx const& tx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
//------------------------------------------------------------------------------
class EscrowCancel : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit EscrowCancel(ApplyContext& ctx) : Transactor(ctx)
{
}
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
} // namespace xrpl

View File

@@ -1,26 +0,0 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class EscrowCancel : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit EscrowCancel(ApplyContext& ctx) : Transactor(ctx)
{
}
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
} // namespace xrpl

View File

@@ -1,29 +0,0 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class EscrowCreate : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Custom};
explicit EscrowCreate(ApplyContext& ctx) : Transactor(ctx)
{
}
static TxConsequences
makeTxConsequences(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
} // namespace xrpl

View File

@@ -1,35 +0,0 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class EscrowFinish : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit EscrowFinish(ApplyContext& ctx) : Transactor(ctx)
{
}
static bool
checkExtraFeatures(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static NotTEC
preflightSigValidated(PreflightContext const& ctx);
static XRPAmount
calculateBaseFee(ReadView const& view, STTx const& tx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
} // namespace xrpl

View File

@@ -0,0 +1,83 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class PayChanCreate : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Custom};
explicit PayChanCreate(ApplyContext& ctx) : Transactor(ctx)
{
}
static TxConsequences
makeTxConsequences(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
using PaymentChannelCreate = PayChanCreate;
//------------------------------------------------------------------------------
class PayChanFund : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Custom};
explicit PayChanFund(ApplyContext& ctx) : Transactor(ctx)
{
}
static TxConsequences
makeTxConsequences(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
TER
doApply() override;
};
using PaymentChannelFund = PayChanFund;
//------------------------------------------------------------------------------
class PayChanClaim : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit PayChanClaim(ApplyContext& ctx) : Transactor(ctx)
{
}
static bool
checkExtraFeatures(PreflightContext const& ctx);
static std::uint32_t
getFlagsMask(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
using PaymentChannelClaim = PayChanClaim;
} // namespace xrpl

View File

@@ -1,34 +0,0 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class PayChanClaim : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Normal};
explicit PayChanClaim(ApplyContext& ctx) : Transactor(ctx)
{
}
static bool
checkExtraFeatures(PreflightContext const& ctx);
static std::uint32_t
getFlagsMask(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
using PaymentChannelClaim = PayChanClaim;
} // namespace xrpl

View File

@@ -1,31 +0,0 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class PayChanCreate : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Custom};
explicit PayChanCreate(ApplyContext& ctx) : Transactor(ctx)
{
}
static TxConsequences
makeTxConsequences(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
static TER
preclaim(PreclaimContext const& ctx);
TER
doApply() override;
};
using PaymentChannelCreate = PayChanCreate;
} // namespace xrpl

View File

@@ -1,28 +0,0 @@
#pragma once
#include <xrpl/tx/Transactor.h>
namespace xrpl {
class PayChanFund : public Transactor
{
public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Custom};
explicit PayChanFund(ApplyContext& ctx) : Transactor(ctx)
{
}
static TxConsequences
makeTxConsequences(PreflightContext const& ctx);
static NotTEC
preflight(PreflightContext const& ctx);
TER
doApply() override;
};
using PaymentChannelFund = PayChanFund;
} // namespace xrpl

View File

@@ -168,7 +168,7 @@ decode(void* dest, char const* src, std::size_t len)
break;
++in;
c4[i] = v;
if (++i == 4)
if (++i; i == 4)
{
c3[0] = (c4[0] << 2) + ((c4[1] & 0x30) >> 4);
c3[1] = ((c4[1] & 0xf) << 4) + ((c4[2] & 0x3c) >> 2);

View File

@@ -729,21 +729,18 @@ Reader::decodeUnicodeCodePoint(Token& token, Location& current, Location end, un
unsigned int surrogatePair;
if (*(current++) == '\\' && *(current++) == 'u')
{
if (decodeUnicodeEscapeSequence(token, current, end, surrogatePair))
{
unicode = 0x10000 + ((unicode & 0x3FF) << 10) + (surrogatePair & 0x3FF);
}
else
return false;
}
else
if (*current != '\\' || *(current + 1) != 'u')
return addError(
"expecting another \\u token to begin the second half of a "
"unicode surrogate pair",
"expecting another \\u token to begin the second half of a unicode surrogate pair",
token,
current);
current += 2; // skip two characters checked above
if (!decodeUnicodeEscapeSequence(token, current, end, surrogatePair))
return false;
unicode = 0x10000 + ((unicode & 0x3FF) << 10) + (surrogatePair & 0x3FF);
}
return true;

View File

@@ -319,7 +319,7 @@ StyledWriter::writeValue(Value const& value)
document_ += " : ";
writeValue(childValue);
if (++it == members.end())
if (++it; it == members.end())
break;
document_ += ",";

View File

@@ -74,8 +74,10 @@ BookDirs::const_iterator::operator++()
XRPL_ASSERT(index_ != zero, "xrpl::BookDirs::const_iterator::operator++ : nonzero index");
if (!cdirNext(*view_, cur_key_, sle_, entry_, index_))
{
if (index_ != 0 ||
(cur_key_ = view_->succ(++cur_key_, next_quality_).value_or(zero)) == zero)
if (index_ == 0)
cur_key_ = view_->succ(++cur_key_, next_quality_).value_or(zero);
if (index_ != 0 || cur_key_ == zero)
{
cur_key_ = key_;
entry_ = 0;
@@ -84,9 +86,7 @@ BookDirs::const_iterator::operator++()
else if (!cdirFirst(*view_, cur_key_, sle_, entry_, index_))
{
// LCOV_EXCL_START
UNREACHABLE(
"xrpl::BookDirs::const_iterator::operator++ : directory is "
"empty");
UNREACHABLE("xrpl::BookDirs::const_iterator::operator++ : directory is empty");
// LCOV_EXCL_STOP
}
}

View File

@@ -23,6 +23,9 @@ STBase::STBase(SField const& n) : fName(&n)
STBase&
STBase::operator=(STBase const& t)
{
if (this == &t)
return *this;
if (!fName->isUseful())
fName = t.fName;
return *this;

View File

@@ -39,6 +39,9 @@ Consumer::~Consumer()
Consumer&
Consumer::operator=(Consumer const& other)
{
if (this == &other)
return *this;
// remove old ref
if (m_logic && m_entry)
m_logic->release(*m_entry);

View File

@@ -11,7 +11,7 @@
#include <xrpl/tx/transactors/account/DeleteAccount.h>
#include <xrpl/tx/transactors/account/SetSignerList.h>
#include <xrpl/tx/transactors/delegate/DelegateSet.h>
#include <xrpl/tx/transactors/did/DIDDelete.h>
#include <xrpl/tx/transactors/did/DID.h>
#include <xrpl/tx/transactors/nft/NFTokenUtils.h>
#include <xrpl/tx/transactors/oracle/DeleteOracle.h>
#include <xrpl/tx/transactors/payment/DepositPreauth.h>

View File

@@ -1,113 +0,0 @@
#include <xrpl/basics/Log.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/CredentialHelpers.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/tx/transactors/credentials/CredentialAccept.h>
#include <chrono>
namespace xrpl {
using namespace credentials;
std::uint32_t
CredentialAccept::getFlagsMask(PreflightContext const& ctx)
{
// 0 means "Allow any flags"
return ctx.rules.enabled(fixInvalidTxFlags) ? tfUniversalMask : 0;
}
NotTEC
CredentialAccept::preflight(PreflightContext const& ctx)
{
if (!ctx.tx[sfIssuer])
{
JLOG(ctx.j.trace()) << "Malformed transaction: Issuer field zeroed.";
return temINVALID_ACCOUNT_ID;
}
auto const credType = ctx.tx[sfCredentialType];
if (credType.empty() || (credType.size() > maxCredentialTypeLength))
{
JLOG(ctx.j.trace()) << "Malformed transaction: invalid size of CredentialType.";
return temMALFORMED;
}
return tesSUCCESS;
}
TER
CredentialAccept::preclaim(PreclaimContext const& ctx)
{
AccountID const subject = ctx.tx[sfAccount];
AccountID const issuer = ctx.tx[sfIssuer];
auto const credType(ctx.tx[sfCredentialType]);
if (!ctx.view.exists(keylet::account(issuer)))
{
JLOG(ctx.j.warn()) << "No issuer: " << to_string(issuer);
return tecNO_ISSUER;
}
auto const sleCred = ctx.view.read(keylet::credential(subject, issuer, credType));
if (!sleCred)
{
JLOG(ctx.j.warn()) << "No credential: " << to_string(subject) << ", " << to_string(issuer)
<< ", " << credType;
return tecNO_ENTRY;
}
if (sleCred->getFieldU32(sfFlags) & lsfAccepted)
{
JLOG(ctx.j.warn()) << "Credential already accepted: " << to_string(subject) << ", "
<< to_string(issuer) << ", " << credType;
return tecDUPLICATE;
}
return tesSUCCESS;
}
TER
CredentialAccept::doApply()
{
AccountID const issuer{ctx_.tx[sfIssuer]};
// Both exist as credential object exist itself (checked in preclaim)
auto const sleSubject = view().peek(keylet::account(account_));
auto const sleIssuer = view().peek(keylet::account(issuer));
if (!sleSubject || !sleIssuer)
return tefINTERNAL; // LCOV_EXCL_LINE
{
STAmount const reserve{
view().fees().accountReserve(sleSubject->getFieldU32(sfOwnerCount) + 1)};
if (mPriorBalance < reserve)
return tecINSUFFICIENT_RESERVE;
}
auto const credType(ctx_.tx[sfCredentialType]);
Keylet const credentialKey = keylet::credential(account_, issuer, credType);
auto const sleCred = view().peek(credentialKey); // Checked in preclaim()
if (checkExpired(sleCred, view().header().parentCloseTime))
{
JLOG(j_.trace()) << "Credential is expired: " << sleCred->getText();
// delete expired credentials even if the transaction failed
auto const err = credentials::deleteSLE(view(), sleCred, j_);
return isTesSuccess(err) ? tecEXPIRED : err;
}
sleCred->setFieldU32(sfFlags, lsfAccepted);
view().update(sleCred);
adjustOwnerCount(view(), sleIssuer, -1, j_);
adjustOwnerCount(view(), sleSubject, 1, j_);
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -1,164 +0,0 @@
#include <xrpl/basics/Log.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/CredentialHelpers.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/tx/transactors/credentials/CredentialCreate.h>
#include <chrono>
namespace xrpl {
/*
Credentials
======
A verifiable credentials (VC
https://en.wikipedia.org/wiki/Verifiable_credentials), as defined by the W3C
specification (https://www.w3.org/TR/vc-data-model-2.0/), is a
secure and tamper-evident way to represent information about a subject, such
as an individual, organization, or even an IoT device. These credentials are
issued by a trusted entity and can be verified by third parties without
directly involving the issuer at all.
*/
using namespace credentials;
std::uint32_t
CredentialCreate::getFlagsMask(PreflightContext const& ctx)
{
// 0 means "Allow any flags"
return ctx.rules.enabled(fixInvalidTxFlags) ? tfUniversalMask : 0;
}
NotTEC
CredentialCreate::preflight(PreflightContext const& ctx)
{
auto const& tx = ctx.tx;
auto& j = ctx.j;
if (!tx[sfSubject])
{
JLOG(j.trace()) << "Malformed transaction: Invalid Subject";
return temMALFORMED;
}
auto const uri = tx[~sfURI];
if (uri && (uri->empty() || (uri->size() > maxCredentialURILength)))
{
JLOG(j.trace()) << "Malformed transaction: invalid size of URI.";
return temMALFORMED;
}
auto const credType = tx[sfCredentialType];
if (credType.empty() || (credType.size() > maxCredentialTypeLength))
{
JLOG(j.trace()) << "Malformed transaction: invalid size of CredentialType.";
return temMALFORMED;
}
return tesSUCCESS;
}
TER
CredentialCreate::preclaim(PreclaimContext const& ctx)
{
auto const credType(ctx.tx[sfCredentialType]);
auto const subject = ctx.tx[sfSubject];
if (!ctx.view.exists(keylet::account(subject)))
{
JLOG(ctx.j.trace()) << "Subject doesn't exist.";
return tecNO_TARGET;
}
if (ctx.view.exists(keylet::credential(subject, ctx.tx[sfAccount], credType)))
{
JLOG(ctx.j.trace()) << "Credential already exists.";
return tecDUPLICATE;
}
return tesSUCCESS;
}
TER
CredentialCreate::doApply()
{
auto const subject = ctx_.tx[sfSubject];
auto const credType(ctx_.tx[sfCredentialType]);
Keylet const credentialKey = keylet::credential(subject, account_, credType);
auto const sleCred = std::make_shared<SLE>(credentialKey);
if (!sleCred)
return tefINTERNAL; // LCOV_EXCL_LINE
auto const optExp = ctx_.tx[~sfExpiration];
if (optExp)
{
std::uint32_t const closeTime =
ctx_.view().header().parentCloseTime.time_since_epoch().count();
if (closeTime > *optExp)
{
JLOG(j_.trace()) << "Malformed transaction: "
"Expiration time is in the past.";
return tecEXPIRED;
}
sleCred->setFieldU32(sfExpiration, ctx_.tx.getFieldU32(sfExpiration));
}
auto const sleIssuer = view().peek(keylet::account(account_));
if (!sleIssuer)
return tefINTERNAL; // LCOV_EXCL_LINE
{
STAmount const reserve{
view().fees().accountReserve(sleIssuer->getFieldU32(sfOwnerCount) + 1)};
if (mPriorBalance < reserve)
return tecINSUFFICIENT_RESERVE;
}
sleCred->setAccountID(sfSubject, subject);
sleCred->setAccountID(sfIssuer, account_);
sleCred->setFieldVL(sfCredentialType, credType);
if (ctx_.tx.isFieldPresent(sfURI))
sleCred->setFieldVL(sfURI, ctx_.tx.getFieldVL(sfURI));
{
auto const page =
view().dirInsert(keylet::ownerDir(account_), credentialKey, describeOwnerDir(account_));
JLOG(j_.trace()) << "Adding Credential to owner directory " << to_string(credentialKey.key)
<< ": " << (page ? "success" : "failure");
if (!page)
return tecDIR_FULL;
sleCred->setFieldU64(sfIssuerNode, *page);
adjustOwnerCount(view(), sleIssuer, 1, j_);
}
if (subject == account_)
{
sleCred->setFieldU32(sfFlags, lsfAccepted);
}
else
{
auto const page =
view().dirInsert(keylet::ownerDir(subject), credentialKey, describeOwnerDir(subject));
JLOG(j_.trace()) << "Adding Credential to owner directory " << to_string(credentialKey.key)
<< ": " << (page ? "success" : "failure");
if (!page)
return tecDIR_FULL;
sleCred->setFieldU64(sfSubjectNode, *page);
view().update(view().peek(keylet::account(subject)));
}
view().insert(sleCred);
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -1,90 +0,0 @@
#include <xrpl/basics/Log.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/CredentialHelpers.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/tx/transactors/credentials/CredentialDelete.h>
#include <chrono>
namespace xrpl {
using namespace credentials;
std::uint32_t
CredentialDelete::getFlagsMask(PreflightContext const& ctx)
{
// 0 means "Allow any flags"
return ctx.rules.enabled(fixInvalidTxFlags) ? tfUniversalMask : 0;
}
NotTEC
CredentialDelete::preflight(PreflightContext const& ctx)
{
auto const subject = ctx.tx[~sfSubject];
auto const issuer = ctx.tx[~sfIssuer];
if (!subject && !issuer)
{
// Neither field is present, the transaction is malformed.
JLOG(ctx.j.trace()) << "Malformed transaction: "
"No Subject or Issuer fields.";
return temMALFORMED;
}
// Make sure that the passed account is valid.
if ((subject && subject->isZero()) || (issuer && issuer->isZero()))
{
JLOG(ctx.j.trace()) << "Malformed transaction: Subject or Issuer "
"field zeroed.";
return temINVALID_ACCOUNT_ID;
}
auto const credType = ctx.tx[sfCredentialType];
if (credType.empty() || (credType.size() > maxCredentialTypeLength))
{
JLOG(ctx.j.trace()) << "Malformed transaction: invalid size of CredentialType.";
return temMALFORMED;
}
return tesSUCCESS;
}
TER
CredentialDelete::preclaim(PreclaimContext const& ctx)
{
AccountID const account{ctx.tx[sfAccount]};
auto const subject = ctx.tx[~sfSubject].value_or(account);
auto const issuer = ctx.tx[~sfIssuer].value_or(account);
auto const credType(ctx.tx[sfCredentialType]);
if (!ctx.view.exists(keylet::credential(subject, issuer, credType)))
return tecNO_ENTRY;
return tesSUCCESS;
}
TER
CredentialDelete::doApply()
{
auto const subject = ctx_.tx[~sfSubject].value_or(account_);
auto const issuer = ctx_.tx[~sfIssuer].value_or(account_);
auto const credType(ctx_.tx[sfCredentialType]);
auto const sleCred = view().peek(keylet::credential(subject, issuer, credType));
if (!sleCred)
return tefINTERNAL; // LCOV_EXCL_LINE
if ((subject != account_) && (issuer != account_) &&
!checkExpired(sleCred, ctx_.view().header().parentCloseTime))
{
JLOG(j_.trace()) << "Can't delete non-expired credential.";
return tecNO_PERMISSION;
}
return deleteSLE(view(), sleCred, j_);
}
} // namespace xrpl

View File

@@ -0,0 +1,341 @@
#include <xrpl/basics/Log.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/CredentialHelpers.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/tx/transactors/credentials/Credentials.h>
#include <chrono>
namespace xrpl {
/*
Credentials
======
A verifiable credentials (VC
https://en.wikipedia.org/wiki/Verifiable_credentials), as defined by the W3C
specification (https://www.w3.org/TR/vc-data-model-2.0/), is a
secure and tamper-evident way to represent information about a subject, such
as an individual, organization, or even an IoT device. These credentials are
issued by a trusted entity and can be verified by third parties without
directly involving the issuer at all.
*/
using namespace credentials;
// ------- CREATE --------------------------
std::uint32_t
CredentialCreate::getFlagsMask(PreflightContext const& ctx)
{
// 0 means "Allow any flags"
return ctx.rules.enabled(fixInvalidTxFlags) ? tfUniversalMask : 0;
}
NotTEC
CredentialCreate::preflight(PreflightContext const& ctx)
{
auto const& tx = ctx.tx;
auto& j = ctx.j;
if (!tx[sfSubject])
{
JLOG(j.trace()) << "Malformed transaction: Invalid Subject";
return temMALFORMED;
}
auto const uri = tx[~sfURI];
if (uri && (uri->empty() || (uri->size() > maxCredentialURILength)))
{
JLOG(j.trace()) << "Malformed transaction: invalid size of URI.";
return temMALFORMED;
}
auto const credType = tx[sfCredentialType];
if (credType.empty() || (credType.size() > maxCredentialTypeLength))
{
JLOG(j.trace()) << "Malformed transaction: invalid size of CredentialType.";
return temMALFORMED;
}
return tesSUCCESS;
}
TER
CredentialCreate::preclaim(PreclaimContext const& ctx)
{
auto const credType(ctx.tx[sfCredentialType]);
auto const subject = ctx.tx[sfSubject];
if (!ctx.view.exists(keylet::account(subject)))
{
JLOG(ctx.j.trace()) << "Subject doesn't exist.";
return tecNO_TARGET;
}
if (ctx.view.exists(keylet::credential(subject, ctx.tx[sfAccount], credType)))
{
JLOG(ctx.j.trace()) << "Credential already exists.";
return tecDUPLICATE;
}
return tesSUCCESS;
}
TER
CredentialCreate::doApply()
{
auto const subject = ctx_.tx[sfSubject];
auto const credType(ctx_.tx[sfCredentialType]);
Keylet const credentialKey = keylet::credential(subject, account_, credType);
auto const sleCred = std::make_shared<SLE>(credentialKey);
if (!sleCred)
return tefINTERNAL; // LCOV_EXCL_LINE
auto const optExp = ctx_.tx[~sfExpiration];
if (optExp)
{
std::uint32_t const closeTime =
ctx_.view().header().parentCloseTime.time_since_epoch().count();
if (closeTime > *optExp)
{
JLOG(j_.trace()) << "Malformed transaction: "
"Expiration time is in the past.";
return tecEXPIRED;
}
sleCred->setFieldU32(sfExpiration, ctx_.tx.getFieldU32(sfExpiration));
}
auto const sleIssuer = view().peek(keylet::account(account_));
if (!sleIssuer)
return tefINTERNAL; // LCOV_EXCL_LINE
{
STAmount const reserve{
view().fees().accountReserve(sleIssuer->getFieldU32(sfOwnerCount) + 1)};
if (mPriorBalance < reserve)
return tecINSUFFICIENT_RESERVE;
}
sleCred->setAccountID(sfSubject, subject);
sleCred->setAccountID(sfIssuer, account_);
sleCred->setFieldVL(sfCredentialType, credType);
if (ctx_.tx.isFieldPresent(sfURI))
sleCred->setFieldVL(sfURI, ctx_.tx.getFieldVL(sfURI));
{
auto const page =
view().dirInsert(keylet::ownerDir(account_), credentialKey, describeOwnerDir(account_));
JLOG(j_.trace()) << "Adding Credential to owner directory " << to_string(credentialKey.key)
<< ": " << (page ? "success" : "failure");
if (!page)
return tecDIR_FULL;
sleCred->setFieldU64(sfIssuerNode, *page);
adjustOwnerCount(view(), sleIssuer, 1, j_);
}
if (subject == account_)
{
sleCred->setFieldU32(sfFlags, lsfAccepted);
}
else
{
auto const page =
view().dirInsert(keylet::ownerDir(subject), credentialKey, describeOwnerDir(subject));
JLOG(j_.trace()) << "Adding Credential to owner directory " << to_string(credentialKey.key)
<< ": " << (page ? "success" : "failure");
if (!page)
return tecDIR_FULL;
sleCred->setFieldU64(sfSubjectNode, *page);
view().update(view().peek(keylet::account(subject)));
}
view().insert(sleCred);
return tesSUCCESS;
}
// ------- DELETE --------------------------
std::uint32_t
CredentialDelete::getFlagsMask(PreflightContext const& ctx)
{
// 0 means "Allow any flags"
return ctx.rules.enabled(fixInvalidTxFlags) ? tfUniversalMask : 0;
}
NotTEC
CredentialDelete::preflight(PreflightContext const& ctx)
{
auto const subject = ctx.tx[~sfSubject];
auto const issuer = ctx.tx[~sfIssuer];
if (!subject && !issuer)
{
// Neither field is present, the transaction is malformed.
JLOG(ctx.j.trace()) << "Malformed transaction: "
"No Subject or Issuer fields.";
return temMALFORMED;
}
// Make sure that the passed account is valid.
if ((subject && subject->isZero()) || (issuer && issuer->isZero()))
{
JLOG(ctx.j.trace()) << "Malformed transaction: Subject or Issuer "
"field zeroed.";
return temINVALID_ACCOUNT_ID;
}
auto const credType = ctx.tx[sfCredentialType];
if (credType.empty() || (credType.size() > maxCredentialTypeLength))
{
JLOG(ctx.j.trace()) << "Malformed transaction: invalid size of CredentialType.";
return temMALFORMED;
}
return tesSUCCESS;
}
TER
CredentialDelete::preclaim(PreclaimContext const& ctx)
{
AccountID const account{ctx.tx[sfAccount]};
auto const subject = ctx.tx[~sfSubject].value_or(account);
auto const issuer = ctx.tx[~sfIssuer].value_or(account);
auto const credType(ctx.tx[sfCredentialType]);
if (!ctx.view.exists(keylet::credential(subject, issuer, credType)))
return tecNO_ENTRY;
return tesSUCCESS;
}
TER
CredentialDelete::doApply()
{
auto const subject = ctx_.tx[~sfSubject].value_or(account_);
auto const issuer = ctx_.tx[~sfIssuer].value_or(account_);
auto const credType(ctx_.tx[sfCredentialType]);
auto const sleCred = view().peek(keylet::credential(subject, issuer, credType));
if (!sleCred)
return tefINTERNAL; // LCOV_EXCL_LINE
if ((subject != account_) && (issuer != account_) &&
!checkExpired(sleCred, ctx_.view().header().parentCloseTime))
{
JLOG(j_.trace()) << "Can't delete non-expired credential.";
return tecNO_PERMISSION;
}
return deleteSLE(view(), sleCred, j_);
}
// ------- APPLY --------------------------
std::uint32_t
CredentialAccept::getFlagsMask(PreflightContext const& ctx)
{
// 0 means "Allow any flags"
return ctx.rules.enabled(fixInvalidTxFlags) ? tfUniversalMask : 0;
}
NotTEC
CredentialAccept::preflight(PreflightContext const& ctx)
{
if (!ctx.tx[sfIssuer])
{
JLOG(ctx.j.trace()) << "Malformed transaction: Issuer field zeroed.";
return temINVALID_ACCOUNT_ID;
}
auto const credType = ctx.tx[sfCredentialType];
if (credType.empty() || (credType.size() > maxCredentialTypeLength))
{
JLOG(ctx.j.trace()) << "Malformed transaction: invalid size of CredentialType.";
return temMALFORMED;
}
return tesSUCCESS;
}
TER
CredentialAccept::preclaim(PreclaimContext const& ctx)
{
AccountID const subject = ctx.tx[sfAccount];
AccountID const issuer = ctx.tx[sfIssuer];
auto const credType(ctx.tx[sfCredentialType]);
if (!ctx.view.exists(keylet::account(issuer)))
{
JLOG(ctx.j.warn()) << "No issuer: " << to_string(issuer);
return tecNO_ISSUER;
}
auto const sleCred = ctx.view.read(keylet::credential(subject, issuer, credType));
if (!sleCred)
{
JLOG(ctx.j.warn()) << "No credential: " << to_string(subject) << ", " << to_string(issuer)
<< ", " << credType;
return tecNO_ENTRY;
}
if (sleCred->getFieldU32(sfFlags) & lsfAccepted)
{
JLOG(ctx.j.warn()) << "Credential already accepted: " << to_string(subject) << ", "
<< to_string(issuer) << ", " << credType;
return tecDUPLICATE;
}
return tesSUCCESS;
}
TER
CredentialAccept::doApply()
{
AccountID const issuer{ctx_.tx[sfIssuer]};
// Both exist as credential object exist itself (checked in preclaim)
auto const sleSubject = view().peek(keylet::account(account_));
auto const sleIssuer = view().peek(keylet::account(issuer));
if (!sleSubject || !sleIssuer)
return tefINTERNAL; // LCOV_EXCL_LINE
{
STAmount const reserve{
view().fees().accountReserve(sleSubject->getFieldU32(sfOwnerCount) + 1)};
if (mPriorBalance < reserve)
return tecINSUFFICIENT_RESERVE;
}
auto const credType(ctx_.tx[sfCredentialType]);
Keylet const credentialKey = keylet::credential(account_, issuer, credType);
auto const sleCred = view().peek(credentialKey); // Checked in preclaim()
if (checkExpired(sleCred, view().header().parentCloseTime))
{
JLOG(j_.trace()) << "Credential is expired: " << sleCred->getText();
// delete expired credentials even if the transaction failed
auto const err = credentials::deleteSLE(view(), sleCred, j_);
return isTesSuccess(err) ? tecEXPIRED : err;
}
sleCred->setFieldU32(sfFlags, lsfAccepted);
view().update(sleCred);
adjustOwnerCount(view(), sleIssuer, -1, j_);
adjustOwnerCount(view(), sleSubject, 1, j_);
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -4,7 +4,7 @@
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/tx/transactors/did/DIDSet.h>
#include <xrpl/tx/transactors/did/DID.h>
namespace xrpl {
@@ -134,4 +134,54 @@ DIDSet::doApply()
return addSLE(ctx_, sleDID, account_);
}
NotTEC
DIDDelete::preflight(PreflightContext const& ctx)
{
return tesSUCCESS;
}
TER
DIDDelete::deleteSLE(ApplyContext& ctx, Keylet sleKeylet, AccountID const owner)
{
auto const sle = ctx.view().peek(sleKeylet);
if (!sle)
return tecNO_ENTRY;
return DIDDelete::deleteSLE(ctx.view(), sle, owner, ctx.journal);
}
TER
DIDDelete::deleteSLE(
ApplyView& view,
std::shared_ptr<SLE> sle,
AccountID const owner,
beast::Journal j)
{
// Remove object from owner directory
if (!view.dirRemove(keylet::ownerDir(owner), (*sle)[sfOwnerNode], sle->key(), true))
{
// LCOV_EXCL_START
JLOG(j.fatal()) << "Unable to delete DID Token from owner.";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
auto const sleOwner = view.peek(keylet::account(owner));
if (!sleOwner)
return tecINTERNAL; // LCOV_EXCL_LINE
adjustOwnerCount(view, sleOwner, -1, j);
view.update(sleOwner);
// Remove object from ledger
view.erase(sle);
return tesSUCCESS;
}
TER
DIDDelete::doApply()
{
return deleteSLE(ctx_, keylet::did(account_), account_);
}
} // namespace xrpl

View File

@@ -1,59 +0,0 @@
#include <xrpl/basics/Log.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/tx/transactors/did/DIDDelete.h>
namespace xrpl {
NotTEC
DIDDelete::preflight(PreflightContext const& ctx)
{
return tesSUCCESS;
}
TER
DIDDelete::deleteSLE(ApplyContext& ctx, Keylet sleKeylet, AccountID const owner)
{
auto const sle = ctx.view().peek(sleKeylet);
if (!sle)
return tecNO_ENTRY;
return DIDDelete::deleteSLE(ctx.view(), sle, owner, ctx.journal);
}
TER
DIDDelete::deleteSLE(
ApplyView& view,
std::shared_ptr<SLE> sle,
AccountID const owner,
beast::Journal j)
{
// Remove object from owner directory
if (!view.dirRemove(keylet::ownerDir(owner), (*sle)[sfOwnerNode], sle->key(), true))
{
// LCOV_EXCL_START
JLOG(j.fatal()) << "Unable to delete DID Token from owner.";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
auto const sleOwner = view.peek(keylet::account(owner));
if (!sleOwner)
return tecINTERNAL; // LCOV_EXCL_LINE
adjustOwnerCount(view, sleOwner, -1, j);
view.update(sleOwner);
// Remove object from ledger
view.erase(sle);
return tesSUCCESS;
}
TER
DIDDelete::doApply()
{
return deleteSLE(ctx_, keylet::did(account_), account_);
}
} // namespace xrpl

File diff suppressed because it is too large Load Diff

View File

@@ -1,201 +0,0 @@
#include <xrpl/basics/Log.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/tx/transactors/escrow/EscrowCancel.h>
#include <libxrpl/tx/transactors/escrow/EscrowHelpers.h>
namespace xrpl {
NotTEC
EscrowCancel::preflight(PreflightContext const& ctx)
{
return tesSUCCESS;
}
template <ValidIssueType T>
static TER
escrowCancelPreclaimHelper(
PreclaimContext const& ctx,
AccountID const& account,
STAmount const& amount);
template <>
TER
escrowCancelPreclaimHelper<Issue>(
PreclaimContext const& ctx,
AccountID const& account,
STAmount const& amount)
{
AccountID issuer = amount.getIssuer();
// If the issuer is the same as the account, return tecINTERNAL
if (issuer == account)
return tecINTERNAL; // LCOV_EXCL_LINE
// If the issuer has requireAuth set, check if the account is authorized
if (auto const ter = requireAuth(ctx.view, amount.issue(), account); ter != tesSUCCESS)
return ter;
return tesSUCCESS;
}
template <>
TER
escrowCancelPreclaimHelper<MPTIssue>(
PreclaimContext const& ctx,
AccountID const& account,
STAmount const& amount)
{
AccountID issuer = amount.getIssuer();
// If the issuer is the same as the account, return tecINTERNAL
if (issuer == account)
return tecINTERNAL; // LCOV_EXCL_LINE
// If the mpt does not exist, return tecOBJECT_NOT_FOUND
auto const issuanceKey = keylet::mptIssuance(amount.get<MPTIssue>().getMptID());
auto const sleIssuance = ctx.view.read(issuanceKey);
if (!sleIssuance)
return tecOBJECT_NOT_FOUND;
// If the issuer has requireAuth set, check if the account is
// authorized
auto const& mptIssue = amount.get<MPTIssue>();
if (auto const ter = requireAuth(ctx.view, mptIssue, account, AuthType::WeakAuth);
ter != tesSUCCESS)
return ter;
return tesSUCCESS;
}
TER
EscrowCancel::preclaim(PreclaimContext const& ctx)
{
if (ctx.view.rules().enabled(featureTokenEscrow))
{
auto const k = keylet::escrow(ctx.tx[sfOwner], ctx.tx[sfOfferSequence]);
auto const slep = ctx.view.read(k);
if (!slep)
return tecNO_TARGET;
AccountID const account = (*slep)[sfAccount];
STAmount const amount = (*slep)[sfAmount];
if (!isXRP(amount))
{
if (auto const ret = std::visit(
[&]<typename T>(T const&) {
return escrowCancelPreclaimHelper<T>(ctx, account, amount);
},
amount.asset().value());
!isTesSuccess(ret))
return ret;
}
}
return tesSUCCESS;
}
TER
EscrowCancel::doApply()
{
auto const k = keylet::escrow(ctx_.tx[sfOwner], ctx_.tx[sfOfferSequence]);
auto const slep = ctx_.view().peek(k);
if (!slep)
{
if (ctx_.view().rules().enabled(featureTokenEscrow))
return tecINTERNAL; // LCOV_EXCL_LINE
return tecNO_TARGET;
}
auto const now = ctx_.view().header().parentCloseTime;
// No cancel time specified: can't execute at all.
if (!(*slep)[~sfCancelAfter])
return tecNO_PERMISSION;
// Too soon: can't execute before the cancel time.
if (!after(now, (*slep)[sfCancelAfter]))
return tecNO_PERMISSION;
AccountID const account = (*slep)[sfAccount];
// Remove escrow from owner directory
{
auto const page = (*slep)[sfOwnerNode];
if (!ctx_.view().dirRemove(keylet::ownerDir(account), page, k.key, true))
{
// LCOV_EXCL_START
JLOG(j_.fatal()) << "Unable to delete Escrow from owner.";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
}
// Remove escrow from recipient's owner directory, if present.
if (auto const optPage = (*slep)[~sfDestinationNode]; optPage)
{
if (!ctx_.view().dirRemove(keylet::ownerDir((*slep)[sfDestination]), *optPage, k.key, true))
{
// LCOV_EXCL_START
JLOG(j_.fatal()) << "Unable to delete Escrow from recipient.";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
}
auto const sle = ctx_.view().peek(keylet::account(account));
STAmount const amount = slep->getFieldAmount(sfAmount);
// Transfer amount back to the owner
if (isXRP(amount))
(*sle)[sfBalance] = (*sle)[sfBalance] + amount;
else
{
if (!ctx_.view().rules().enabled(featureTokenEscrow))
return temDISABLED; // LCOV_EXCL_LINE
auto const issuer = amount.getIssuer();
bool const createAsset = account == account_;
if (auto const ret = std::visit(
[&]<typename T>(T const&) {
return escrowUnlockApplyHelper<T>(
ctx_.view(),
parityRate,
slep,
mPriorBalance,
amount,
issuer,
account, // sender and receiver are the same
account,
createAsset,
j_);
},
amount.asset().value());
!isTesSuccess(ret))
return ret; // LCOV_EXCL_LINE
// Remove escrow from issuers owner directory, if present.
if (auto const optPage = (*slep)[~sfIssuerNode]; optPage)
{
if (!ctx_.view().dirRemove(keylet::ownerDir(issuer), *optPage, k.key, true))
{
// LCOV_EXCL_START
JLOG(j_.fatal()) << "Unable to delete Escrow from recipient.";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
}
}
adjustOwnerCount(ctx_.view(), sle, -1, ctx_.journal);
ctx_.view().update(sle);
// Remove escrow from ledger
ctx_.view().erase(slep);
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -1,498 +0,0 @@
#include <xrpl/basics/Log.h>
#include <xrpl/basics/chrono.h>
#include <xrpl/conditions/Condition.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/MPTAmount.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/tx/transactors/escrow/EscrowCreate.h>
namespace xrpl {
/*
Escrow
======
Escrow is a feature of the XRP Ledger that allows you to send conditional
XRP payments. These conditional payments, called escrows, set aside XRP and
deliver it later when certain conditions are met. Conditions to successfully
finish an escrow include time-based unlocks and crypto-conditions. Escrows
can also be set to expire if not finished in time.
The XRP set aside in an escrow is locked up. No one can use or destroy the
XRP until the escrow has been successfully finished or canceled. Before the
expiration time, only the intended receiver can get the XRP. After the
expiration time, the XRP can only be returned to the sender.
For more details on escrow, including examples, diagrams and more please
visit https://xrpl.org/escrow.html
For details on specific transactions, including fields and validation rules
please see:
`EscrowCreate`
--------------
See: https://xrpl.org/escrowcreate.html
`EscrowFinish`
--------------
See: https://xrpl.org/escrowfinish.html
`EscrowCancel`
--------------
See: https://xrpl.org/escrowcancel.html
*/
//------------------------------------------------------------------------------
TxConsequences
EscrowCreate::makeTxConsequences(PreflightContext const& ctx)
{
auto const amount = ctx.tx[sfAmount];
return TxConsequences{ctx.tx, isXRP(amount) ? amount.xrp() : beast::zero};
}
template <ValidIssueType T>
static NotTEC
escrowCreatePreflightHelper(PreflightContext const& ctx);
template <>
NotTEC
escrowCreatePreflightHelper<Issue>(PreflightContext const& ctx)
{
STAmount const amount = ctx.tx[sfAmount];
if (amount.native() || amount <= beast::zero)
return temBAD_AMOUNT;
if (badCurrency() == amount.getCurrency())
return temBAD_CURRENCY;
return tesSUCCESS;
}
template <>
NotTEC
escrowCreatePreflightHelper<MPTIssue>(PreflightContext const& ctx)
{
if (!ctx.rules.enabled(featureMPTokensV1))
return temDISABLED;
auto const amount = ctx.tx[sfAmount];
if (amount.native() || amount.mpt() > MPTAmount{maxMPTokenAmount} || amount <= beast::zero)
return temBAD_AMOUNT;
return tesSUCCESS;
}
NotTEC
EscrowCreate::preflight(PreflightContext const& ctx)
{
STAmount const amount{ctx.tx[sfAmount]};
if (!isXRP(amount))
{
if (!ctx.rules.enabled(featureTokenEscrow))
return temBAD_AMOUNT;
if (auto const ret = std::visit(
[&]<typename T>(T const&) { return escrowCreatePreflightHelper<T>(ctx); },
amount.asset().value());
!isTesSuccess(ret))
return ret;
}
else
{
if (amount <= beast::zero)
return temBAD_AMOUNT;
}
// We must specify at least one timeout value
if (!ctx.tx[~sfCancelAfter] && !ctx.tx[~sfFinishAfter])
return temBAD_EXPIRATION;
// If both finish and cancel times are specified then the cancel time must
// be strictly after the finish time.
if (ctx.tx[~sfCancelAfter] && ctx.tx[~sfFinishAfter] &&
ctx.tx[sfCancelAfter] <= ctx.tx[sfFinishAfter])
return temBAD_EXPIRATION;
// In the absence of a FinishAfter, the escrow can be finished
// immediately, which can be confusing. When creating an escrow,
// we want to ensure that either a FinishAfter time is explicitly
// specified or a completion condition is attached.
if (!ctx.tx[~sfFinishAfter] && !ctx.tx[~sfCondition])
return temMALFORMED;
if (auto const cb = ctx.tx[~sfCondition])
{
using namespace xrpl::cryptoconditions;
std::error_code ec;
auto condition = Condition::deserialize(*cb, ec);
if (!condition)
{
JLOG(ctx.j.debug()) << "Malformed condition during escrow creation: " << ec.message();
return temMALFORMED;
}
}
return tesSUCCESS;
}
template <ValidIssueType T>
static TER
escrowCreatePreclaimHelper(
PreclaimContext const& ctx,
AccountID const& account,
AccountID const& dest,
STAmount const& amount);
template <>
TER
escrowCreatePreclaimHelper<Issue>(
PreclaimContext const& ctx,
AccountID const& account,
AccountID const& dest,
STAmount const& amount)
{
AccountID issuer = amount.getIssuer();
// If the issuer is the same as the account, return tecNO_PERMISSION
if (issuer == account)
return tecNO_PERMISSION;
// If the lsfAllowTrustLineLocking is not enabled, return tecNO_PERMISSION
auto const sleIssuer = ctx.view.read(keylet::account(issuer));
if (!sleIssuer)
return tecNO_ISSUER;
if (!sleIssuer->isFlag(lsfAllowTrustLineLocking))
return tecNO_PERMISSION;
// If the account does not have a trustline to the issuer, return tecNO_LINE
auto const sleRippleState = ctx.view.read(keylet::line(account, issuer, amount.getCurrency()));
if (!sleRippleState)
return tecNO_LINE;
STAmount const balance = (*sleRippleState)[sfBalance];
// If balance is positive, issuer must have higher address than account
if (balance > beast::zero && issuer < account)
return tecNO_PERMISSION; // LCOV_EXCL_LINE
// If balance is negative, issuer must have lower address than account
if (balance < beast::zero && issuer > account)
return tecNO_PERMISSION; // LCOV_EXCL_LINE
// If the issuer has requireAuth set, check if the account is authorized
if (auto const ter = requireAuth(ctx.view, amount.issue(), account); ter != tesSUCCESS)
return ter;
// If the issuer has requireAuth set, check if the destination is authorized
if (auto const ter = requireAuth(ctx.view, amount.issue(), dest); ter != tesSUCCESS)
return ter;
// If the issuer has frozen the account, return tecFROZEN
if (isFrozen(ctx.view, account, amount.issue()))
return tecFROZEN;
// If the issuer has frozen the destination, return tecFROZEN
if (isFrozen(ctx.view, dest, amount.issue()))
return tecFROZEN;
STAmount const spendableAmount =
accountHolds(ctx.view, account, amount.getCurrency(), issuer, fhIGNORE_FREEZE, ctx.j);
// If the balance is less than or equal to 0, return tecINSUFFICIENT_FUNDS
if (spendableAmount <= beast::zero)
return tecINSUFFICIENT_FUNDS;
// If the spendable amount is less than the amount, return
// tecINSUFFICIENT_FUNDS
if (spendableAmount < amount)
return tecINSUFFICIENT_FUNDS;
// If the amount is not addable to the balance, return tecPRECISION_LOSS
if (!canAdd(spendableAmount, amount))
return tecPRECISION_LOSS;
return tesSUCCESS;
}
template <>
TER
escrowCreatePreclaimHelper<MPTIssue>(
PreclaimContext const& ctx,
AccountID const& account,
AccountID const& dest,
STAmount const& amount)
{
AccountID issuer = amount.getIssuer();
// If the issuer is the same as the account, return tecNO_PERMISSION
if (issuer == account)
return tecNO_PERMISSION;
// If the mpt does not exist, return tecOBJECT_NOT_FOUND
auto const issuanceKey = keylet::mptIssuance(amount.get<MPTIssue>().getMptID());
auto const sleIssuance = ctx.view.read(issuanceKey);
if (!sleIssuance)
return tecOBJECT_NOT_FOUND;
// If the lsfMPTCanEscrow is not enabled, return tecNO_PERMISSION
if (!sleIssuance->isFlag(lsfMPTCanEscrow))
return tecNO_PERMISSION;
// If the issuer is not the same as the issuer of the mpt, return
// tecNO_PERMISSION
if (sleIssuance->getAccountID(sfIssuer) != issuer)
return tecNO_PERMISSION; // LCOV_EXCL_LINE
// If the account does not have the mpt, return tecOBJECT_NOT_FOUND
if (!ctx.view.exists(keylet::mptoken(issuanceKey.key, account)))
return tecOBJECT_NOT_FOUND;
// If the issuer has requireAuth set, check if the account is
// authorized
auto const& mptIssue = amount.get<MPTIssue>();
if (auto const ter = requireAuth(ctx.view, mptIssue, account, AuthType::WeakAuth);
ter != tesSUCCESS)
return ter;
// If the issuer has requireAuth set, check if the destination is
// authorized
if (auto const ter = requireAuth(ctx.view, mptIssue, dest, AuthType::WeakAuth);
ter != tesSUCCESS)
return ter;
// If the issuer has frozen the account, return tecLOCKED
if (isFrozen(ctx.view, account, mptIssue))
return tecLOCKED;
// If the issuer has frozen the destination, return tecLOCKED
if (isFrozen(ctx.view, dest, mptIssue))
return tecLOCKED;
// If the mpt cannot be transferred, return tecNO_AUTH
if (auto const ter = canTransfer(ctx.view, mptIssue, account, dest); ter != tesSUCCESS)
return ter;
STAmount const spendableAmount = accountHolds(
ctx.view, account, amount.get<MPTIssue>(), fhIGNORE_FREEZE, ahIGNORE_AUTH, ctx.j);
// If the balance is less than or equal to 0, return tecINSUFFICIENT_FUNDS
if (spendableAmount <= beast::zero)
return tecINSUFFICIENT_FUNDS;
// If the spendable amount is less than the amount, return
// tecINSUFFICIENT_FUNDS
if (spendableAmount < amount)
return tecINSUFFICIENT_FUNDS;
return tesSUCCESS;
}
TER
EscrowCreate::preclaim(PreclaimContext const& ctx)
{
STAmount const amount{ctx.tx[sfAmount]};
AccountID const account{ctx.tx[sfAccount]};
AccountID const dest{ctx.tx[sfDestination]};
auto const sled = ctx.view.read(keylet::account(dest));
if (!sled)
return tecNO_DST;
// Pseudo-accounts cannot receive escrow. Note, this is not amendment-gated
// because all writes to pseudo-account discriminator fields **are**
// amendment gated, hence the behaviour of this check will always match the
// currently active amendments.
if (isPseudoAccount(sled))
return tecNO_PERMISSION;
if (!isXRP(amount))
{
if (!ctx.view.rules().enabled(featureTokenEscrow))
return temDISABLED; // LCOV_EXCL_LINE
if (auto const ret = std::visit(
[&]<typename T>(T const&) {
return escrowCreatePreclaimHelper<T>(ctx, account, dest, amount);
},
amount.asset().value());
!isTesSuccess(ret))
return ret;
}
return tesSUCCESS;
}
template <ValidIssueType T>
static TER
escrowLockApplyHelper(
ApplyView& view,
AccountID const& issuer,
AccountID const& sender,
STAmount const& amount,
beast::Journal journal);
template <>
TER
escrowLockApplyHelper<Issue>(
ApplyView& view,
AccountID const& issuer,
AccountID const& sender,
STAmount const& amount,
beast::Journal journal)
{
// Defensive: Issuer cannot create an escrow
if (issuer == sender)
return tecINTERNAL; // LCOV_EXCL_LINE
auto const ter = rippleCredit(
view, sender, issuer, amount, amount.holds<MPTIssue>() ? false : true, journal);
if (ter != tesSUCCESS)
return ter; // LCOV_EXCL_LINE
return tesSUCCESS;
}
template <>
TER
escrowLockApplyHelper<MPTIssue>(
ApplyView& view,
AccountID const& issuer,
AccountID const& sender,
STAmount const& amount,
beast::Journal journal)
{
// Defensive: Issuer cannot create an escrow
if (issuer == sender)
return tecINTERNAL; // LCOV_EXCL_LINE
auto const ter = rippleLockEscrowMPT(view, sender, amount, journal);
if (ter != tesSUCCESS)
return ter; // LCOV_EXCL_LINE
return tesSUCCESS;
}
TER
EscrowCreate::doApply()
{
auto const closeTime = ctx_.view().header().parentCloseTime;
if (ctx_.tx[~sfCancelAfter] && after(closeTime, ctx_.tx[sfCancelAfter]))
return tecNO_PERMISSION;
if (ctx_.tx[~sfFinishAfter] && after(closeTime, ctx_.tx[sfFinishAfter]))
return tecNO_PERMISSION;
auto const sle = ctx_.view().peek(keylet::account(account_));
if (!sle)
return tefINTERNAL; // LCOV_EXCL_LINE
// Check reserve and funds availability
STAmount const amount{ctx_.tx[sfAmount]};
auto const reserve = ctx_.view().fees().accountReserve((*sle)[sfOwnerCount] + 1);
if (mSourceBalance < reserve)
return tecINSUFFICIENT_RESERVE;
// Check reserve and funds availability
if (isXRP(amount))
{
if (mSourceBalance < reserve + STAmount(amount).xrp())
return tecUNFUNDED;
}
// Check destination account
{
auto const sled = ctx_.view().read(keylet::account(ctx_.tx[sfDestination]));
if (!sled)
return tecNO_DST; // LCOV_EXCL_LINE
if (((*sled)[sfFlags] & lsfRequireDestTag) && !ctx_.tx[~sfDestinationTag])
return tecDST_TAG_NEEDED;
}
// Create escrow in ledger. Note that we we use the value from the
// sequence or ticket. For more explanation see comments in SeqProxy.h.
Keylet const escrowKeylet = keylet::escrow(account_, ctx_.tx.getSeqValue());
auto const slep = std::make_shared<SLE>(escrowKeylet);
(*slep)[sfAmount] = amount;
(*slep)[sfAccount] = account_;
(*slep)[~sfCondition] = ctx_.tx[~sfCondition];
(*slep)[~sfSourceTag] = ctx_.tx[~sfSourceTag];
(*slep)[sfDestination] = ctx_.tx[sfDestination];
(*slep)[~sfCancelAfter] = ctx_.tx[~sfCancelAfter];
(*slep)[~sfFinishAfter] = ctx_.tx[~sfFinishAfter];
(*slep)[~sfDestinationTag] = ctx_.tx[~sfDestinationTag];
if (ctx_.view().rules().enabled(fixIncludeKeyletFields))
{
(*slep)[sfSequence] = ctx_.tx.getSeqValue();
}
if (ctx_.view().rules().enabled(featureTokenEscrow) && !isXRP(amount))
{
auto const xferRate = transferRate(ctx_.view(), amount);
if (xferRate != parityRate)
(*slep)[sfTransferRate] = xferRate.value;
}
ctx_.view().insert(slep);
// Add escrow to sender's owner directory
{
auto page = ctx_.view().dirInsert(
keylet::ownerDir(account_), escrowKeylet, describeOwnerDir(account_));
if (!page)
return tecDIR_FULL; // LCOV_EXCL_LINE
(*slep)[sfOwnerNode] = *page;
}
// If it's not a self-send, add escrow to recipient's owner directory.
AccountID const dest = ctx_.tx[sfDestination];
if (dest != account_)
{
auto page =
ctx_.view().dirInsert(keylet::ownerDir(dest), escrowKeylet, describeOwnerDir(dest));
if (!page)
return tecDIR_FULL; // LCOV_EXCL_LINE
(*slep)[sfDestinationNode] = *page;
}
// IOU escrow objects are added to the issuer's owner directory to help
// track the total locked balance. For MPT, this isn't necessary because the
// locked balance is already stored directly in the MPTokenIssuance object.
AccountID const issuer = amount.getIssuer();
if (!isXRP(amount) && issuer != account_ && issuer != dest && !amount.holds<MPTIssue>())
{
auto page =
ctx_.view().dirInsert(keylet::ownerDir(issuer), escrowKeylet, describeOwnerDir(issuer));
if (!page)
return tecDIR_FULL; // LCOV_EXCL_LINE
(*slep)[sfIssuerNode] = *page;
}
// Deduct owner's balance
if (isXRP(amount))
(*sle)[sfBalance] = (*sle)[sfBalance] - amount;
else
{
if (auto const ret = std::visit(
[&]<typename T>(T const&) {
return escrowLockApplyHelper<T>(ctx_.view(), issuer, account_, amount, j_);
},
amount.asset().value());
!isTesSuccess(ret))
{
return ret; // LCOV_EXCL_LINE
}
}
// increment owner count
adjustOwnerCount(ctx_.view(), sle, 1, ctx_.journal);
ctx_.view().update(sle);
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -1,372 +0,0 @@
#include <xrpl/basics/Log.h>
#include <xrpl/basics/chrono.h>
#include <xrpl/conditions/Condition.h>
#include <xrpl/conditions/Fulfillment.h>
#include <xrpl/core/HashRouter.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/CredentialHelpers.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/tx/transactors/escrow/EscrowFinish.h>
#include <libxrpl/tx/transactors/escrow/EscrowHelpers.h>
namespace xrpl {
// During an EscrowFinish, the transaction must specify both
// a condition and a fulfillment. We track whether that
// fulfillment matches and validates the condition.
constexpr HashRouterFlags SF_CF_INVALID = HashRouterFlags::PRIVATE5;
constexpr HashRouterFlags SF_CF_VALID = HashRouterFlags::PRIVATE6;
//------------------------------------------------------------------------------
static bool
checkCondition(Slice f, Slice c)
{
using namespace xrpl::cryptoconditions;
std::error_code ec;
auto condition = Condition::deserialize(c, ec);
if (!condition)
return false;
auto fulfillment = Fulfillment::deserialize(f, ec);
if (!fulfillment)
return false;
return validate(*fulfillment, *condition);
}
bool
EscrowFinish::checkExtraFeatures(PreflightContext const& ctx)
{
return !ctx.tx.isFieldPresent(sfCredentialIDs) || ctx.rules.enabled(featureCredentials);
}
NotTEC
EscrowFinish::preflight(PreflightContext const& ctx)
{
auto const cb = ctx.tx[~sfCondition];
auto const fb = ctx.tx[~sfFulfillment];
// If you specify a condition, then you must also specify
// a fulfillment.
if (static_cast<bool>(cb) != static_cast<bool>(fb))
return temMALFORMED;
return tesSUCCESS;
}
NotTEC
EscrowFinish::preflightSigValidated(PreflightContext const& ctx)
{
auto const cb = ctx.tx[~sfCondition];
auto const fb = ctx.tx[~sfFulfillment];
if (cb && fb)
{
auto& router = ctx.registry.getHashRouter();
auto const id = ctx.tx.getTransactionID();
auto const flags = router.getFlags(id);
// If we haven't checked the condition, check it
// now. Whether it passes or not isn't important
// in preflight.
if (!any(flags & (SF_CF_INVALID | SF_CF_VALID)))
{
if (checkCondition(*fb, *cb))
router.setFlags(id, SF_CF_VALID);
else
router.setFlags(id, SF_CF_INVALID);
}
}
if (auto const err = credentials::checkFields(ctx.tx, ctx.j); !isTesSuccess(err))
return err;
return tesSUCCESS;
}
XRPAmount
EscrowFinish::calculateBaseFee(ReadView const& view, STTx const& tx)
{
XRPAmount extraFee{0};
if (auto const fb = tx[~sfFulfillment])
{
extraFee += view.fees().base * (32 + (fb->size() / 16));
}
return Transactor::calculateBaseFee(view, tx) + extraFee;
}
template <ValidIssueType T>
static TER
escrowFinishPreclaimHelper(
PreclaimContext const& ctx,
AccountID const& dest,
STAmount const& amount);
template <>
TER
escrowFinishPreclaimHelper<Issue>(
PreclaimContext const& ctx,
AccountID const& dest,
STAmount const& amount)
{
AccountID issuer = amount.getIssuer();
// If the issuer is the same as the account, return tesSUCCESS
if (issuer == dest)
return tesSUCCESS;
// If the issuer has requireAuth set, check if the destination is authorized
if (auto const ter = requireAuth(ctx.view, amount.issue(), dest); ter != tesSUCCESS)
return ter;
// If the issuer has deep frozen the destination, return tecFROZEN
if (isDeepFrozen(ctx.view, dest, amount.getCurrency(), amount.getIssuer()))
return tecFROZEN;
return tesSUCCESS;
}
template <>
TER
escrowFinishPreclaimHelper<MPTIssue>(
PreclaimContext const& ctx,
AccountID const& dest,
STAmount const& amount)
{
AccountID issuer = amount.getIssuer();
// If the issuer is the same as the dest, return tesSUCCESS
if (issuer == dest)
return tesSUCCESS;
// If the mpt does not exist, return tecOBJECT_NOT_FOUND
auto const issuanceKey = keylet::mptIssuance(amount.get<MPTIssue>().getMptID());
auto const sleIssuance = ctx.view.read(issuanceKey);
if (!sleIssuance)
return tecOBJECT_NOT_FOUND;
// If the issuer has requireAuth set, check if the destination is
// authorized
auto const& mptIssue = amount.get<MPTIssue>();
if (auto const ter = requireAuth(ctx.view, mptIssue, dest, AuthType::WeakAuth);
ter != tesSUCCESS)
return ter;
// If the issuer has frozen the destination, return tecLOCKED
if (isFrozen(ctx.view, dest, mptIssue))
return tecLOCKED;
return tesSUCCESS;
}
TER
EscrowFinish::preclaim(PreclaimContext const& ctx)
{
if (ctx.view.rules().enabled(featureCredentials))
{
if (auto const err = credentials::valid(ctx.tx, ctx.view, ctx.tx[sfAccount], ctx.j);
!isTesSuccess(err))
return err;
}
if (ctx.view.rules().enabled(featureTokenEscrow))
{
auto const k = keylet::escrow(ctx.tx[sfOwner], ctx.tx[sfOfferSequence]);
auto const slep = ctx.view.read(k);
if (!slep)
return tecNO_TARGET;
AccountID const dest = (*slep)[sfDestination];
STAmount const amount = (*slep)[sfAmount];
if (!isXRP(amount))
{
if (auto const ret = std::visit(
[&]<typename T>(T const&) {
return escrowFinishPreclaimHelper<T>(ctx, dest, amount);
},
amount.asset().value());
!isTesSuccess(ret))
return ret;
}
}
return tesSUCCESS;
}
TER
EscrowFinish::doApply()
{
auto const k = keylet::escrow(ctx_.tx[sfOwner], ctx_.tx[sfOfferSequence]);
auto const slep = ctx_.view().peek(k);
if (!slep)
{
if (ctx_.view().rules().enabled(featureTokenEscrow))
return tecINTERNAL; // LCOV_EXCL_LINE
return tecNO_TARGET;
}
// If a cancel time is present, a finish operation should only succeed prior
// to that time.
auto const now = ctx_.view().header().parentCloseTime;
// Too soon: can't execute before the finish time
if ((*slep)[~sfFinishAfter] && !after(now, (*slep)[sfFinishAfter]))
return tecNO_PERMISSION;
// Too late: can't execute after the cancel time
if ((*slep)[~sfCancelAfter] && after(now, (*slep)[sfCancelAfter]))
return tecNO_PERMISSION;
// Check cryptocondition fulfillment
{
auto const id = ctx_.tx.getTransactionID();
auto flags = ctx_.registry.getHashRouter().getFlags(id);
auto const cb = ctx_.tx[~sfCondition];
// It's unlikely that the results of the check will
// expire from the hash router, but if it happens,
// simply re-run the check.
if (cb && !any(flags & (SF_CF_INVALID | SF_CF_VALID)))
{
// LCOV_EXCL_START
auto const fb = ctx_.tx[~sfFulfillment];
if (!fb)
return tecINTERNAL;
if (checkCondition(*fb, *cb))
flags = SF_CF_VALID;
else
flags = SF_CF_INVALID;
ctx_.registry.getHashRouter().setFlags(id, flags);
// LCOV_EXCL_STOP
}
// If the check failed, then simply return an error
// and don't look at anything else.
if (any(flags & SF_CF_INVALID))
return tecCRYPTOCONDITION_ERROR;
// Check against condition in the ledger entry:
auto const cond = (*slep)[~sfCondition];
// If a condition wasn't specified during creation,
// one shouldn't be included now.
if (!cond && cb)
return tecCRYPTOCONDITION_ERROR;
// If a condition was specified during creation of
// the suspended payment, the identical condition
// must be presented again. We don't check if the
// fulfillment matches the condition since we did
// that in preflight.
if (cond && (cond != cb))
return tecCRYPTOCONDITION_ERROR;
}
// NOTE: Escrow payments cannot be used to fund accounts.
AccountID const destID = (*slep)[sfDestination];
auto const sled = ctx_.view().peek(keylet::account(destID));
if (!sled)
return tecNO_DST;
if (auto err = verifyDepositPreauth(ctx_.tx, ctx_.view(), account_, destID, sled, ctx_.journal);
!isTesSuccess(err))
return err;
AccountID const account = (*slep)[sfAccount];
// Remove escrow from owner directory
{
auto const page = (*slep)[sfOwnerNode];
if (!ctx_.view().dirRemove(keylet::ownerDir(account), page, k.key, true))
{
// LCOV_EXCL_START
JLOG(j_.fatal()) << "Unable to delete Escrow from owner.";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
}
// Remove escrow from recipient's owner directory, if present.
if (auto const optPage = (*slep)[~sfDestinationNode])
{
if (!ctx_.view().dirRemove(keylet::ownerDir(destID), *optPage, k.key, true))
{
// LCOV_EXCL_START
JLOG(j_.fatal()) << "Unable to delete Escrow from recipient.";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
}
STAmount const amount = slep->getFieldAmount(sfAmount);
// Transfer amount to destination
if (isXRP(amount))
(*sled)[sfBalance] = (*sled)[sfBalance] + amount;
else
{
if (!ctx_.view().rules().enabled(featureTokenEscrow))
return temDISABLED; // LCOV_EXCL_LINE
Rate lockedRate = slep->isFieldPresent(sfTransferRate)
? xrpl::Rate(slep->getFieldU32(sfTransferRate))
: parityRate;
auto const issuer = amount.getIssuer();
bool const createAsset = destID == account_;
if (auto const ret = std::visit(
[&]<typename T>(T const&) {
return escrowUnlockApplyHelper<T>(
ctx_.view(),
lockedRate,
sled,
mPriorBalance,
amount,
issuer,
account,
destID,
createAsset,
j_);
},
amount.asset().value());
!isTesSuccess(ret))
return ret;
// Remove escrow from issuers owner directory, if present.
if (auto const optPage = (*slep)[~sfIssuerNode]; optPage)
{
if (!ctx_.view().dirRemove(keylet::ownerDir(issuer), *optPage, k.key, true))
{
// LCOV_EXCL_START
JLOG(j_.fatal()) << "Unable to delete Escrow from recipient.";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
}
}
ctx_.view().update(sled);
// Adjust source owner count
auto const sle = ctx_.view().peek(keylet::account(account));
adjustOwnerCount(ctx_.view(), sle, -1, ctx_.journal);
ctx_.view().update(sle);
// Remove escrow from ledger
ctx_.view().erase(slep);
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -1,229 +0,0 @@
#pragma once
#include <xrpl/basics/Log.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/MPTAmount.h>
#include <xrpl/tx/transactors/token/MPTokenAuthorize.h>
namespace xrpl {
template <ValidIssueType T>
TER
escrowUnlockApplyHelper(
ApplyView& view,
Rate lockedRate,
std::shared_ptr<SLE> const& sleDest,
STAmount const& xrpBalance,
STAmount const& amount,
AccountID const& issuer,
AccountID const& sender,
AccountID const& receiver,
bool createAsset,
beast::Journal journal);
template <>
inline TER
escrowUnlockApplyHelper<Issue>(
ApplyView& view,
Rate lockedRate,
std::shared_ptr<SLE> const& sleDest,
STAmount const& xrpBalance,
STAmount const& amount,
AccountID const& issuer,
AccountID const& sender,
AccountID const& receiver,
bool createAsset,
beast::Journal journal)
{
Keylet const trustLineKey = keylet::line(receiver, amount.issue());
bool const recvLow = issuer > receiver;
bool const senderIssuer = issuer == sender;
bool const receiverIssuer = issuer == receiver;
bool const issuerHigh = issuer > receiver;
if (senderIssuer)
return tecINTERNAL; // LCOV_EXCL_LINE
if (receiverIssuer)
return tesSUCCESS;
if (!view.exists(trustLineKey) && createAsset && !receiverIssuer)
{
// Can the account cover the trust line's reserve?
if (std::uint32_t const ownerCount = {sleDest->at(sfOwnerCount)};
xrpBalance < view.fees().accountReserve(ownerCount + 1))
{
JLOG(journal.trace()) << "Trust line does not exist. "
"Insufficient reserve to create line.";
return tecNO_LINE_INSUF_RESERVE;
}
Currency const currency = amount.getCurrency();
STAmount initialBalance(amount.issue());
initialBalance.setIssuer(noAccount());
// clang-format off
if (TER const ter = trustCreate(
view, // payment sandbox
recvLow, // is dest low?
issuer, // source
receiver, // destination
trustLineKey.key, // ledger index
sleDest, // Account to add to
false, // authorize account
(sleDest->getFlags() & lsfDefaultRipple) == 0,
false, // freeze trust line
false, // deep freeze trust line
initialBalance, // zero initial balance
Issue(currency, receiver), // limit of zero
0, // quality in
0, // quality out
journal); // journal
!isTesSuccess(ter))
{
return ter; // LCOV_EXCL_LINE
}
// clang-format on
view.update(sleDest);
}
if (!view.exists(trustLineKey) && !receiverIssuer)
return tecNO_LINE;
auto const xferRate = transferRate(view, amount);
// update if issuer rate is less than locked rate
if (xferRate < lockedRate)
lockedRate = xferRate;
// Transfer Rate only applies when:
// 1. Issuer is not involved in the transfer (senderIssuer or
// receiverIssuer)
// 2. The locked rate is different from the parity rate
// NOTE: Transfer fee in escrow works a bit differently from a normal
// payment. In escrow, the fee is deducted from the locked/sending amount,
// whereas in a normal payment, the transfer fee is taken on top of the
// sending amount.
auto finalAmt = amount;
if ((!senderIssuer && !receiverIssuer) && lockedRate != parityRate)
{
// compute transfer fee, if any
auto const xferFee = amount.value() - divideRound(amount, lockedRate, amount.issue(), true);
// compute balance to transfer
finalAmt = amount.value() - xferFee;
}
// validate the line limit if the account submitting txn is not the receiver
// of the funds
if (!createAsset)
{
auto const sleRippleState = view.peek(trustLineKey);
if (!sleRippleState)
return tecINTERNAL; // LCOV_EXCL_LINE
// if the issuer is the high, then we use the low limit
// otherwise we use the high limit
STAmount const lineLimit =
sleRippleState->getFieldAmount(issuerHigh ? sfLowLimit : sfHighLimit);
STAmount lineBalance = sleRippleState->getFieldAmount(sfBalance);
// flip the sign of the line balance if the issuer is not high
if (!issuerHigh)
lineBalance.negate();
// add the final amount to the line balance
lineBalance += finalAmt;
// if the transfer would exceed the line limit return tecLIMIT_EXCEEDED
if (lineLimit < lineBalance)
return tecLIMIT_EXCEEDED;
}
// if destination is not the issuer then transfer funds
if (!receiverIssuer)
{
auto const ter = rippleCredit(view, issuer, receiver, finalAmt, true, journal);
if (ter != tesSUCCESS)
return ter; // LCOV_EXCL_LINE
}
return tesSUCCESS;
}
template <>
inline TER
escrowUnlockApplyHelper<MPTIssue>(
ApplyView& view,
Rate lockedRate,
std::shared_ptr<SLE> const& sleDest,
STAmount const& xrpBalance,
STAmount const& amount,
AccountID const& issuer,
AccountID const& sender,
AccountID const& receiver,
bool createAsset,
beast::Journal journal)
{
bool const senderIssuer = issuer == sender;
bool const receiverIssuer = issuer == receiver;
auto const mptID = amount.get<MPTIssue>().getMptID();
auto const issuanceKey = keylet::mptIssuance(mptID);
if (!view.exists(keylet::mptoken(issuanceKey.key, receiver)) && createAsset && !receiverIssuer)
{
if (std::uint32_t const ownerCount = {sleDest->at(sfOwnerCount)};
xrpBalance < view.fees().accountReserve(ownerCount + 1))
{
return tecINSUFFICIENT_RESERVE;
}
if (auto const ter = MPTokenAuthorize::createMPToken(view, mptID, receiver, 0);
!isTesSuccess(ter))
{
return ter; // LCOV_EXCL_LINE
}
// update owner count.
adjustOwnerCount(view, sleDest, 1, journal);
}
if (!view.exists(keylet::mptoken(issuanceKey.key, receiver)) && !receiverIssuer)
return tecNO_PERMISSION;
auto const xferRate = transferRate(view, amount);
// update if issuer rate is less than locked rate
if (xferRate < lockedRate)
lockedRate = xferRate;
// Transfer Rate only applies when:
// 1. Issuer is not involved in the transfer (senderIssuer or
// receiverIssuer)
// 2. The locked rate is different from the parity rate
// NOTE: Transfer fee in escrow works a bit differently from a normal
// payment. In escrow, the fee is deducted from the locked/sending amount,
// whereas in a normal payment, the transfer fee is taken on top of the
// sending amount.
auto finalAmt = amount;
if ((!senderIssuer && !receiverIssuer) && lockedRate != parityRate)
{
// compute transfer fee, if any
auto const xferFee = amount.value() - divideRound(amount, lockedRate, amount.asset(), true);
// compute balance to transfer
finalAmt = amount.value() - xferFee;
}
return rippleUnlockEscrowMPT(
view,
sender,
receiver,
finalAmt,
view.rules().enabled(fixTokenEscrowV1) ? amount : finalAmt,
journal);
}
} // namespace xrpl

View File

@@ -0,0 +1,542 @@
#include <xrpl/basics/Log.h>
#include <xrpl/basics/chrono.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/CredentialHelpers.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/PayChan.h>
#include <xrpl/protocol/PublicKey.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/protocol/digest.h>
#include <xrpl/tx/transactors/payment_channel/PayChan.h>
namespace xrpl {
/*
PaymentChannel
Payment channels permit off-ledger checkpoints of XRP payments flowing
in a single direction. A channel sequesters the owner's XRP in its own
ledger entry. The owner can authorize the recipient to claim up to a
given balance by giving the receiver a signed message (off-ledger). The
recipient can use this signed message to claim any unpaid balance while
the channel remains open. The owner can top off the line as needed. If
the channel has not paid out all its funds, the owner must wait out a
delay to close the channel to give the recipient a chance to supply any
claims. The recipient can close the channel at any time. Any transaction
that touches the channel after the expiration time will close the
channel. The total amount paid increases monotonically as newer claims
are issued. When the channel is closed any remaining balance is returned
to the owner. Channels are intended to permit intermittent off-ledger
settlement of ILP trust lines as balances get substantial. For
bidirectional channels, a payment channel can be used in each direction.
PaymentChannelCreate
Create a unidirectional channel. The parameters are:
Destination
The recipient at the end of the channel.
Amount
The amount of XRP to deposit in the channel immediately.
SettleDelay
The amount of time everyone but the recipient must wait for a
superior claim.
PublicKey
The key that will sign claims against the channel.
CancelAfter (optional)
Any channel transaction that touches this channel after the
`CancelAfter` time will close it.
DestinationTag (optional)
Destination tags allow the different accounts inside of a Hosted
Wallet to be mapped back onto the Ripple ledger. The destination tag
tells the server to which account in the Hosted Wallet the funds are
intended to go to. Required if the destination has lsfRequireDestTag
set.
SourceTag (optional)
Source tags allow the different accounts inside of a Hosted Wallet
to be mapped back onto the Ripple ledger. Source tags are similar to
destination tags but are for the channel owner to identify their own
transactions.
PaymentChannelFund
Add additional funds to the payment channel. Only the channel owner may
use this transaction. The parameters are:
Channel
The 256-bit ID of the channel.
Amount
The amount of XRP to add.
Expiration (optional)
Time the channel closes. The transaction will fail if the expiration
times does not satisfy the SettleDelay constraints.
PaymentChannelClaim
Place a claim against an existing channel. The parameters are:
Channel
The 256-bit ID of the channel.
Balance (optional)
The total amount of XRP delivered after this claim is processed
(optional, not needed if just closing). Amount (optional) The amount of XRP
the signature is for (not needed if equal to Balance or just closing the
line). Signature (optional) Authorization for the balance above, signed by
the owner (optional, not needed if closing or owner is performing the
transaction). The signature if for the following message: CLM\0 followed by
the 256-bit channel ID, and a 64-bit integer drops. PublicKey (optional) The
public key that made the signature (optional, required if a signature is
present) Flags tfClose Request that the channel be closed tfRenew Request
that the channel's expiration be reset. Only the owner may renew a channel.
*/
//------------------------------------------------------------------------------
static TER
closeChannel(
std::shared_ptr<SLE> const& slep,
ApplyView& view,
uint256 const& key,
beast::Journal j)
{
AccountID const src = (*slep)[sfAccount];
// Remove PayChan from owner directory
{
auto const page = (*slep)[sfOwnerNode];
if (!view.dirRemove(keylet::ownerDir(src), page, key, true))
{
// LCOV_EXCL_START
JLOG(j.fatal()) << "Could not remove paychan from src owner directory";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
}
// Remove PayChan from recipient's owner directory, if present.
if (auto const page = (*slep)[~sfDestinationNode])
{
auto const dst = (*slep)[sfDestination];
if (!view.dirRemove(keylet::ownerDir(dst), *page, key, true))
{
// LCOV_EXCL_START
JLOG(j.fatal()) << "Could not remove paychan from dst owner directory";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
}
// Transfer amount back to owner, decrement owner count
auto const sle = view.peek(keylet::account(src));
if (!sle)
return tefINTERNAL; // LCOV_EXCL_LINE
XRPL_ASSERT(
(*slep)[sfAmount] >= (*slep)[sfBalance], "xrpl::closeChannel : minimum channel amount");
(*sle)[sfBalance] = (*sle)[sfBalance] + (*slep)[sfAmount] - (*slep)[sfBalance];
adjustOwnerCount(view, sle, -1, j);
view.update(sle);
// Remove PayChan from ledger
view.erase(slep);
return tesSUCCESS;
}
//------------------------------------------------------------------------------
TxConsequences
PayChanCreate::makeTxConsequences(PreflightContext const& ctx)
{
return TxConsequences{ctx.tx, ctx.tx[sfAmount].xrp()};
}
NotTEC
PayChanCreate::preflight(PreflightContext const& ctx)
{
if (!isXRP(ctx.tx[sfAmount]) || (ctx.tx[sfAmount] <= beast::zero))
return temBAD_AMOUNT;
if (ctx.tx[sfAccount] == ctx.tx[sfDestination])
return temDST_IS_SRC;
if (!publicKeyType(ctx.tx[sfPublicKey]))
return temMALFORMED;
return tesSUCCESS;
}
TER
PayChanCreate::preclaim(PreclaimContext const& ctx)
{
auto const account = ctx.tx[sfAccount];
auto const sle = ctx.view.read(keylet::account(account));
if (!sle)
return terNO_ACCOUNT;
// Check reserve and funds availability
{
auto const balance = (*sle)[sfBalance];
auto const reserve = ctx.view.fees().accountReserve((*sle)[sfOwnerCount] + 1);
if (balance < reserve)
return tecINSUFFICIENT_RESERVE;
if (balance < reserve + ctx.tx[sfAmount])
return tecUNFUNDED;
}
auto const dst = ctx.tx[sfDestination];
{
// Check destination account
auto const sled = ctx.view.read(keylet::account(dst));
if (!sled)
return tecNO_DST;
auto const flags = sled->getFlags();
// Check if they have disallowed incoming payment channels
if (flags & lsfDisallowIncomingPayChan)
return tecNO_PERMISSION;
if ((flags & lsfRequireDestTag) && !ctx.tx[~sfDestinationTag])
return tecDST_TAG_NEEDED;
// Pseudo-accounts cannot receive payment channels, other than native
// to their underlying ledger object - implemented in their respective
// transaction types. Note, this is not amendment-gated because all
// writes to pseudo-account discriminator fields **are** amendment
// gated, hence the behaviour of this check will always match the
// currently active amendments.
if (isPseudoAccount(sled))
return tecNO_PERMISSION;
}
return tesSUCCESS;
}
TER
PayChanCreate::doApply()
{
auto const account = ctx_.tx[sfAccount];
auto const sle = ctx_.view().peek(keylet::account(account));
if (!sle)
return tefINTERNAL; // LCOV_EXCL_LINE
if (ctx_.view().rules().enabled(fixPayChanCancelAfter))
{
auto const closeTime = ctx_.view().header().parentCloseTime;
if (ctx_.tx[~sfCancelAfter] && after(closeTime, ctx_.tx[sfCancelAfter]))
return tecEXPIRED;
}
auto const dst = ctx_.tx[sfDestination];
// Create PayChan in ledger.
//
// Note that we we use the value from the sequence or ticket as the
// payChan sequence. For more explanation see comments in SeqProxy.h.
Keylet const payChanKeylet = keylet::payChan(account, dst, ctx_.tx.getSeqValue());
auto const slep = std::make_shared<SLE>(payChanKeylet);
// Funds held in this channel
(*slep)[sfAmount] = ctx_.tx[sfAmount];
// Amount channel has already paid
(*slep)[sfBalance] = ctx_.tx[sfAmount].zeroed();
(*slep)[sfAccount] = account;
(*slep)[sfDestination] = dst;
(*slep)[sfSettleDelay] = ctx_.tx[sfSettleDelay];
(*slep)[sfPublicKey] = ctx_.tx[sfPublicKey];
(*slep)[~sfCancelAfter] = ctx_.tx[~sfCancelAfter];
(*slep)[~sfSourceTag] = ctx_.tx[~sfSourceTag];
(*slep)[~sfDestinationTag] = ctx_.tx[~sfDestinationTag];
if (ctx_.view().rules().enabled(fixIncludeKeyletFields))
{
(*slep)[sfSequence] = ctx_.tx.getSeqValue();
}
ctx_.view().insert(slep);
// Add PayChan to owner directory
{
auto const page = ctx_.view().dirInsert(
keylet::ownerDir(account), payChanKeylet, describeOwnerDir(account));
if (!page)
return tecDIR_FULL; // LCOV_EXCL_LINE
(*slep)[sfOwnerNode] = *page;
}
// Add PayChan to the recipient's owner directory
{
auto const page =
ctx_.view().dirInsert(keylet::ownerDir(dst), payChanKeylet, describeOwnerDir(dst));
if (!page)
return tecDIR_FULL; // LCOV_EXCL_LINE
(*slep)[sfDestinationNode] = *page;
}
// Deduct owner's balance, increment owner count
(*sle)[sfBalance] = (*sle)[sfBalance] - ctx_.tx[sfAmount];
adjustOwnerCount(ctx_.view(), sle, 1, ctx_.journal);
ctx_.view().update(sle);
return tesSUCCESS;
}
//------------------------------------------------------------------------------
TxConsequences
PayChanFund::makeTxConsequences(PreflightContext const& ctx)
{
return TxConsequences{ctx.tx, ctx.tx[sfAmount].xrp()};
}
NotTEC
PayChanFund::preflight(PreflightContext const& ctx)
{
if (!isXRP(ctx.tx[sfAmount]) || (ctx.tx[sfAmount] <= beast::zero))
return temBAD_AMOUNT;
return tesSUCCESS;
}
TER
PayChanFund::doApply()
{
Keylet const k(ltPAYCHAN, ctx_.tx[sfChannel]);
auto const slep = ctx_.view().peek(k);
if (!slep)
return tecNO_ENTRY;
AccountID const src = (*slep)[sfAccount];
auto const txAccount = ctx_.tx[sfAccount];
auto const expiration = (*slep)[~sfExpiration];
{
auto const cancelAfter = (*slep)[~sfCancelAfter];
auto const closeTime = ctx_.view().header().parentCloseTime.time_since_epoch().count();
if ((cancelAfter && closeTime >= *cancelAfter) || (expiration && closeTime >= *expiration))
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.journal("View"));
}
if (src != txAccount)
// only the owner can add funds or extend
return tecNO_PERMISSION;
if (auto extend = ctx_.tx[~sfExpiration])
{
auto minExpiration = ctx_.view().header().parentCloseTime.time_since_epoch().count() +
(*slep)[sfSettleDelay];
if (expiration && *expiration < minExpiration)
minExpiration = *expiration;
if (*extend < minExpiration)
return temBAD_EXPIRATION;
(*slep)[~sfExpiration] = *extend;
ctx_.view().update(slep);
}
auto const sle = ctx_.view().peek(keylet::account(txAccount));
if (!sle)
return tefINTERNAL; // LCOV_EXCL_LINE
{
// Check reserve and funds availability
auto const balance = (*sle)[sfBalance];
auto const reserve = ctx_.view().fees().accountReserve((*sle)[sfOwnerCount]);
if (balance < reserve)
return tecINSUFFICIENT_RESERVE;
if (balance < reserve + ctx_.tx[sfAmount])
return tecUNFUNDED;
}
// do not allow adding funds if dst does not exist
if (AccountID const dst = (*slep)[sfDestination]; !ctx_.view().read(keylet::account(dst)))
{
return tecNO_DST;
}
(*slep)[sfAmount] = (*slep)[sfAmount] + ctx_.tx[sfAmount];
ctx_.view().update(slep);
(*sle)[sfBalance] = (*sle)[sfBalance] - ctx_.tx[sfAmount];
ctx_.view().update(sle);
return tesSUCCESS;
}
//------------------------------------------------------------------------------
bool
PayChanClaim::checkExtraFeatures(PreflightContext const& ctx)
{
return !ctx.tx.isFieldPresent(sfCredentialIDs) || ctx.rules.enabled(featureCredentials);
}
std::uint32_t
PayChanClaim::getFlagsMask(PreflightContext const&)
{
return tfPaymentChannelClaimMask;
}
NotTEC
PayChanClaim::preflight(PreflightContext const& ctx)
{
auto const bal = ctx.tx[~sfBalance];
if (bal && (!isXRP(*bal) || *bal <= beast::zero))
return temBAD_AMOUNT;
auto const amt = ctx.tx[~sfAmount];
if (amt && (!isXRP(*amt) || *amt <= beast::zero))
return temBAD_AMOUNT;
if (bal && amt && *bal > *amt)
return temBAD_AMOUNT;
{
auto const flags = ctx.tx.getFlags();
if ((flags & tfClose) && (flags & tfRenew))
return temMALFORMED;
}
if (auto const sig = ctx.tx[~sfSignature])
{
if (!(ctx.tx[~sfPublicKey] && bal))
return temMALFORMED;
// Check the signature
// The signature isn't needed if txAccount == src, but if it's
// present, check it
auto const reqBalance = bal->xrp();
auto const authAmt = amt ? amt->xrp() : reqBalance;
if (reqBalance > authAmt)
return temBAD_AMOUNT;
Keylet const k(ltPAYCHAN, ctx.tx[sfChannel]);
if (!publicKeyType(ctx.tx[sfPublicKey]))
return temMALFORMED;
PublicKey const pk(ctx.tx[sfPublicKey]);
Serializer msg;
serializePayChanAuthorization(msg, k.key, authAmt);
if (!verify(pk, msg.slice(), *sig))
return temBAD_SIGNATURE;
}
if (auto const err = credentials::checkFields(ctx.tx, ctx.j); !isTesSuccess(err))
return err;
return tesSUCCESS;
}
TER
PayChanClaim::preclaim(PreclaimContext const& ctx)
{
if (!ctx.view.rules().enabled(featureCredentials))
return Transactor::preclaim(ctx);
if (auto const err = credentials::valid(ctx.tx, ctx.view, ctx.tx[sfAccount], ctx.j);
!isTesSuccess(err))
return err;
return tesSUCCESS;
}
TER
PayChanClaim::doApply()
{
Keylet const k(ltPAYCHAN, ctx_.tx[sfChannel]);
auto const slep = ctx_.view().peek(k);
if (!slep)
return tecNO_TARGET;
AccountID const src = (*slep)[sfAccount];
AccountID const dst = (*slep)[sfDestination];
AccountID const txAccount = ctx_.tx[sfAccount];
auto const curExpiration = (*slep)[~sfExpiration];
{
auto const cancelAfter = (*slep)[~sfCancelAfter];
auto const closeTime = ctx_.view().header().parentCloseTime.time_since_epoch().count();
if ((cancelAfter && closeTime >= *cancelAfter) ||
(curExpiration && closeTime >= *curExpiration))
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.journal("View"));
}
if (txAccount != src && txAccount != dst)
return tecNO_PERMISSION;
if (ctx_.tx[~sfBalance])
{
auto const chanBalance = slep->getFieldAmount(sfBalance).xrp();
auto const chanFunds = slep->getFieldAmount(sfAmount).xrp();
auto const reqBalance = ctx_.tx[sfBalance].xrp();
if (txAccount == dst && !ctx_.tx[~sfSignature])
return temBAD_SIGNATURE;
if (ctx_.tx[~sfSignature])
{
PublicKey const pk((*slep)[sfPublicKey]);
if (ctx_.tx[sfPublicKey] != pk)
return temBAD_SIGNER;
}
if (reqBalance > chanFunds)
return tecUNFUNDED_PAYMENT;
if (reqBalance <= chanBalance)
// nothing requested
return tecUNFUNDED_PAYMENT;
auto const sled = ctx_.view().peek(keylet::account(dst));
if (!sled)
return tecNO_DST;
if (auto err =
verifyDepositPreauth(ctx_.tx, ctx_.view(), txAccount, dst, sled, ctx_.journal);
!isTesSuccess(err))
return err;
(*slep)[sfBalance] = ctx_.tx[sfBalance];
XRPAmount const reqDelta = reqBalance - chanBalance;
XRPL_ASSERT(reqDelta >= beast::zero, "xrpl::PayChanClaim::doApply : minimum balance delta");
(*sled)[sfBalance] = (*sled)[sfBalance] + reqDelta;
ctx_.view().update(sled);
ctx_.view().update(slep);
}
if (ctx_.tx.getFlags() & tfRenew)
{
if (src != txAccount)
return tecNO_PERMISSION;
(*slep)[~sfExpiration] = std::nullopt;
ctx_.view().update(slep);
}
if (ctx_.tx.getFlags() & tfClose)
{
// Channel will close immediately if dry or the receiver closes
if (dst == txAccount || (*slep)[sfBalance] == (*slep)[sfAmount])
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.journal("View"));
auto const settleExpiration =
ctx_.view().header().parentCloseTime.time_since_epoch().count() +
(*slep)[sfSettleDelay];
if (!curExpiration || *curExpiration > settleExpiration)
{
(*slep)[~sfExpiration] = settleExpiration;
ctx_.view().update(slep);
}
}
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -1,186 +0,0 @@
#include <xrpl/basics/Log.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/CredentialHelpers.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/PayChan.h>
#include <xrpl/protocol/PublicKey.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/tx/transactors/payment_channel/PayChanClaim.h>
#include <libxrpl/tx/transactors/payment_channel/PayChanHelpers.h>
namespace xrpl {
bool
PayChanClaim::checkExtraFeatures(PreflightContext const& ctx)
{
return !ctx.tx.isFieldPresent(sfCredentialIDs) || ctx.rules.enabled(featureCredentials);
}
std::uint32_t
PayChanClaim::getFlagsMask(PreflightContext const&)
{
return tfPaymentChannelClaimMask;
}
NotTEC
PayChanClaim::preflight(PreflightContext const& ctx)
{
auto const bal = ctx.tx[~sfBalance];
if (bal && (!isXRP(*bal) || *bal <= beast::zero))
return temBAD_AMOUNT;
auto const amt = ctx.tx[~sfAmount];
if (amt && (!isXRP(*amt) || *amt <= beast::zero))
return temBAD_AMOUNT;
if (bal && amt && *bal > *amt)
return temBAD_AMOUNT;
{
auto const flags = ctx.tx.getFlags();
if ((flags & tfClose) && (flags & tfRenew))
return temMALFORMED;
}
if (auto const sig = ctx.tx[~sfSignature])
{
if (!(ctx.tx[~sfPublicKey] && bal))
return temMALFORMED;
// Check the signature
// The signature isn't needed if txAccount == src, but if it's
// present, check it
auto const reqBalance = bal->xrp();
auto const authAmt = amt ? amt->xrp() : reqBalance;
if (reqBalance > authAmt)
return temBAD_AMOUNT;
Keylet const k(ltPAYCHAN, ctx.tx[sfChannel]);
if (!publicKeyType(ctx.tx[sfPublicKey]))
return temMALFORMED;
PublicKey const pk(ctx.tx[sfPublicKey]);
Serializer msg;
serializePayChanAuthorization(msg, k.key, authAmt);
if (!verify(pk, msg.slice(), *sig))
return temBAD_SIGNATURE;
}
if (auto const err = credentials::checkFields(ctx.tx, ctx.j); !isTesSuccess(err))
return err;
return tesSUCCESS;
}
TER
PayChanClaim::preclaim(PreclaimContext const& ctx)
{
if (!ctx.view.rules().enabled(featureCredentials))
return Transactor::preclaim(ctx);
if (auto const err = credentials::valid(ctx.tx, ctx.view, ctx.tx[sfAccount], ctx.j);
!isTesSuccess(err))
return err;
return tesSUCCESS;
}
TER
PayChanClaim::doApply()
{
Keylet const k(ltPAYCHAN, ctx_.tx[sfChannel]);
auto const slep = ctx_.view().peek(k);
if (!slep)
return tecNO_TARGET;
AccountID const src = (*slep)[sfAccount];
AccountID const dst = (*slep)[sfDestination];
AccountID const txAccount = ctx_.tx[sfAccount];
auto const curExpiration = (*slep)[~sfExpiration];
{
auto const cancelAfter = (*slep)[~sfCancelAfter];
auto const closeTime = ctx_.view().header().parentCloseTime.time_since_epoch().count();
if ((cancelAfter && closeTime >= *cancelAfter) ||
(curExpiration && closeTime >= *curExpiration))
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.journal("View"));
}
if (txAccount != src && txAccount != dst)
return tecNO_PERMISSION;
if (ctx_.tx[~sfBalance])
{
auto const chanBalance = slep->getFieldAmount(sfBalance).xrp();
auto const chanFunds = slep->getFieldAmount(sfAmount).xrp();
auto const reqBalance = ctx_.tx[sfBalance].xrp();
if (txAccount == dst && !ctx_.tx[~sfSignature])
return temBAD_SIGNATURE;
if (ctx_.tx[~sfSignature])
{
PublicKey const pk((*slep)[sfPublicKey]);
if (ctx_.tx[sfPublicKey] != pk)
return temBAD_SIGNER;
}
if (reqBalance > chanFunds)
return tecUNFUNDED_PAYMENT;
if (reqBalance <= chanBalance)
// nothing requested
return tecUNFUNDED_PAYMENT;
auto const sled = ctx_.view().peek(keylet::account(dst));
if (!sled)
return tecNO_DST;
if (auto err =
verifyDepositPreauth(ctx_.tx, ctx_.view(), txAccount, dst, sled, ctx_.journal);
!isTesSuccess(err))
return err;
(*slep)[sfBalance] = ctx_.tx[sfBalance];
XRPAmount const reqDelta = reqBalance - chanBalance;
XRPL_ASSERT(reqDelta >= beast::zero, "xrpl::PayChanClaim::doApply : minimum balance delta");
(*sled)[sfBalance] = (*sled)[sfBalance] + reqDelta;
ctx_.view().update(sled);
ctx_.view().update(slep);
}
if (ctx_.tx.getFlags() & tfRenew)
{
if (src != txAccount)
return tecNO_PERMISSION;
(*slep)[~sfExpiration] = std::nullopt;
ctx_.view().update(slep);
}
if (ctx_.tx.getFlags() & tfClose)
{
// Channel will close immediately if dry or the receiver closes
if (dst == txAccount || (*slep)[sfBalance] == (*slep)[sfAmount])
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.journal("View"));
auto const settleExpiration =
ctx_.view().header().parentCloseTime.time_since_epoch().count() +
(*slep)[sfSettleDelay];
if (!curExpiration || *curExpiration > settleExpiration)
{
(*slep)[~sfExpiration] = settleExpiration;
ctx_.view().update(slep);
}
}
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -1,173 +0,0 @@
#include <xrpl/basics/chrono.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/PublicKey.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/tx/transactors/payment_channel/PayChanCreate.h>
namespace xrpl {
/*
PaymentChannel
Payment channels permit off-ledger checkpoints of XRP payments flowing
in a single direction. A channel sequesters the owner's XRP in its own
ledger entry. The owner can authorize the recipient to claim up to a
given balance by giving the receiver a signed message (off-ledger). The
recipient can use this signed message to claim any unpaid balance while
the channel remains open. The owner can top off the line as needed. If
the channel has not paid out all its funds, the owner must wait out a
delay to close the channel to give the recipient a chance to supply any
claims. The recipient can close the channel at any time. Any transaction
that touches the channel after the expiration time will close the
channel. The total amount paid increases monotonically as newer claims
are issued. When the channel is closed any remaining balance is returned
to the owner. Channels are intended to permit intermittent off-ledger
settlement of ILP trust lines as balances get substantial. For
bidirectional channels, a payment channel can be used in each direction.
*/
//------------------------------------------------------------------------------
TxConsequences
PayChanCreate::makeTxConsequences(PreflightContext const& ctx)
{
return TxConsequences{ctx.tx, ctx.tx[sfAmount].xrp()};
}
NotTEC
PayChanCreate::preflight(PreflightContext const& ctx)
{
if (!isXRP(ctx.tx[sfAmount]) || (ctx.tx[sfAmount] <= beast::zero))
return temBAD_AMOUNT;
if (ctx.tx[sfAccount] == ctx.tx[sfDestination])
return temDST_IS_SRC;
if (!publicKeyType(ctx.tx[sfPublicKey]))
return temMALFORMED;
return tesSUCCESS;
}
TER
PayChanCreate::preclaim(PreclaimContext const& ctx)
{
auto const account = ctx.tx[sfAccount];
auto const sle = ctx.view.read(keylet::account(account));
if (!sle)
return terNO_ACCOUNT;
// Check reserve and funds availability
{
auto const balance = (*sle)[sfBalance];
auto const reserve = ctx.view.fees().accountReserve((*sle)[sfOwnerCount] + 1);
if (balance < reserve)
return tecINSUFFICIENT_RESERVE;
if (balance < reserve + ctx.tx[sfAmount])
return tecUNFUNDED;
}
auto const dst = ctx.tx[sfDestination];
{
// Check destination account
auto const sled = ctx.view.read(keylet::account(dst));
if (!sled)
return tecNO_DST;
auto const flags = sled->getFlags();
// Check if they have disallowed incoming payment channels
if (flags & lsfDisallowIncomingPayChan)
return tecNO_PERMISSION;
if ((flags & lsfRequireDestTag) && !ctx.tx[~sfDestinationTag])
return tecDST_TAG_NEEDED;
// Pseudo-accounts cannot receive payment channels, other than native
// to their underlying ledger object - implemented in their respective
// transaction types. Note, this is not amendment-gated because all
// writes to pseudo-account discriminator fields **are** amendment
// gated, hence the behaviour of this check will always match the
// currently active amendments.
if (isPseudoAccount(sled))
return tecNO_PERMISSION;
}
return tesSUCCESS;
}
TER
PayChanCreate::doApply()
{
auto const account = ctx_.tx[sfAccount];
auto const sle = ctx_.view().peek(keylet::account(account));
if (!sle)
return tefINTERNAL; // LCOV_EXCL_LINE
if (ctx_.view().rules().enabled(fixPayChanCancelAfter))
{
auto const closeTime = ctx_.view().header().parentCloseTime;
if (ctx_.tx[~sfCancelAfter] && after(closeTime, ctx_.tx[sfCancelAfter]))
return tecEXPIRED;
}
auto const dst = ctx_.tx[sfDestination];
// Create PayChan in ledger.
//
// Note that we we use the value from the sequence or ticket as the
// payChan sequence. For more explanation see comments in SeqProxy.h.
Keylet const payChanKeylet = keylet::payChan(account, dst, ctx_.tx.getSeqValue());
auto const slep = std::make_shared<SLE>(payChanKeylet);
// Funds held in this channel
(*slep)[sfAmount] = ctx_.tx[sfAmount];
// Amount channel has already paid
(*slep)[sfBalance] = ctx_.tx[sfAmount].zeroed();
(*slep)[sfAccount] = account;
(*slep)[sfDestination] = dst;
(*slep)[sfSettleDelay] = ctx_.tx[sfSettleDelay];
(*slep)[sfPublicKey] = ctx_.tx[sfPublicKey];
(*slep)[~sfCancelAfter] = ctx_.tx[~sfCancelAfter];
(*slep)[~sfSourceTag] = ctx_.tx[~sfSourceTag];
(*slep)[~sfDestinationTag] = ctx_.tx[~sfDestinationTag];
if (ctx_.view().rules().enabled(fixIncludeKeyletFields))
{
(*slep)[sfSequence] = ctx_.tx.getSeqValue();
}
ctx_.view().insert(slep);
// Add PayChan to owner directory
{
auto const page = ctx_.view().dirInsert(
keylet::ownerDir(account), payChanKeylet, describeOwnerDir(account));
if (!page)
return tecDIR_FULL; // LCOV_EXCL_LINE
(*slep)[sfOwnerNode] = *page;
}
// Add PayChan to the recipient's owner directory
{
auto const page =
ctx_.view().dirInsert(keylet::ownerDir(dst), payChanKeylet, describeOwnerDir(dst));
if (!page)
return tecDIR_FULL; // LCOV_EXCL_LINE
(*slep)[sfDestinationNode] = *page;
}
// Deduct owner's balance, increment owner count
(*sle)[sfBalance] = (*sle)[sfBalance] - ctx_.tx[sfAmount];
adjustOwnerCount(ctx_.view(), sle, 1, ctx_.journal);
ctx_.view().update(sle);
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -1,92 +0,0 @@
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/tx/transactors/payment_channel/PayChanFund.h>
#include <libxrpl/tx/transactors/payment_channel/PayChanHelpers.h>
namespace xrpl {
TxConsequences
PayChanFund::makeTxConsequences(PreflightContext const& ctx)
{
return TxConsequences{ctx.tx, ctx.tx[sfAmount].xrp()};
}
NotTEC
PayChanFund::preflight(PreflightContext const& ctx)
{
if (!isXRP(ctx.tx[sfAmount]) || (ctx.tx[sfAmount] <= beast::zero))
return temBAD_AMOUNT;
return tesSUCCESS;
}
TER
PayChanFund::doApply()
{
Keylet const k(ltPAYCHAN, ctx_.tx[sfChannel]);
auto const slep = ctx_.view().peek(k);
if (!slep)
return tecNO_ENTRY;
AccountID const src = (*slep)[sfAccount];
auto const txAccount = ctx_.tx[sfAccount];
auto const expiration = (*slep)[~sfExpiration];
{
auto const cancelAfter = (*slep)[~sfCancelAfter];
auto const closeTime = ctx_.view().header().parentCloseTime.time_since_epoch().count();
if ((cancelAfter && closeTime >= *cancelAfter) || (expiration && closeTime >= *expiration))
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.journal("View"));
}
if (src != txAccount)
// only the owner can add funds or extend
return tecNO_PERMISSION;
if (auto extend = ctx_.tx[~sfExpiration])
{
auto minExpiration = ctx_.view().header().parentCloseTime.time_since_epoch().count() +
(*slep)[sfSettleDelay];
if (expiration && *expiration < minExpiration)
minExpiration = *expiration;
if (*extend < minExpiration)
return temBAD_EXPIRATION;
(*slep)[~sfExpiration] = *extend;
ctx_.view().update(slep);
}
auto const sle = ctx_.view().peek(keylet::account(txAccount));
if (!sle)
return tefINTERNAL; // LCOV_EXCL_LINE
{
// Check reserve and funds availability
auto const balance = (*sle)[sfBalance];
auto const reserve = ctx_.view().fees().accountReserve((*sle)[sfOwnerCount]);
if (balance < reserve)
return tecINSUFFICIENT_RESERVE;
if (balance < reserve + ctx_.tx[sfAmount])
return tecUNFUNDED;
}
// do not allow adding funds if dst does not exist
if (AccountID const dst = (*slep)[sfDestination]; !ctx_.view().read(keylet::account(dst)))
{
return tecNO_DST;
}
(*slep)[sfAmount] = (*slep)[sfAmount] + ctx_.tx[sfAmount];
ctx_.view().update(slep);
(*sle)[sfBalance] = (*sle)[sfBalance] - ctx_.tx[sfAmount];
ctx_.view().update(sle);
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -1,58 +0,0 @@
#include <xrpl/basics/Log.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/Indexes.h>
#include <libxrpl/tx/transactors/payment_channel/PayChanHelpers.h>
namespace xrpl {
TER
closeChannel(
std::shared_ptr<SLE> const& slep,
ApplyView& view,
uint256 const& key,
beast::Journal j)
{
AccountID const src = (*slep)[sfAccount];
// Remove PayChan from owner directory
{
auto const page = (*slep)[sfOwnerNode];
if (!view.dirRemove(keylet::ownerDir(src), page, key, true))
{
// LCOV_EXCL_START
JLOG(j.fatal()) << "Could not remove paychan from src owner directory";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
}
// Remove PayChan from recipient's owner directory, if present.
if (auto const page = (*slep)[~sfDestinationNode])
{
auto const dst = (*slep)[sfDestination];
if (!view.dirRemove(keylet::ownerDir(dst), *page, key, true))
{
// LCOV_EXCL_START
JLOG(j.fatal()) << "Could not remove paychan from dst owner directory";
return tefBAD_LEDGER;
// LCOV_EXCL_STOP
}
}
// Transfer amount back to owner, decrement owner count
auto const sle = view.peek(keylet::account(src));
if (!sle)
return tefINTERNAL; // LCOV_EXCL_LINE
XRPL_ASSERT(
(*slep)[sfAmount] >= (*slep)[sfBalance], "xrpl::closeChannel : minimum channel amount");
(*sle)[sfBalance] = (*sle)[sfBalance] + (*slep)[sfAmount] - (*slep)[sfBalance];
adjustOwnerCount(view, sle, -1, j);
view.update(sle);
// Remove PayChan from ledger
view.erase(slep);
return tesSUCCESS;
}
} // namespace xrpl

View File

@@ -1,15 +0,0 @@
#pragma once
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/protocol/UintTypes.h>
namespace xrpl {
TER
closeChannel(
std::shared_ptr<SLE> const& slep,
ApplyView& view,
uint256 const& key,
beast::Journal j);
} // namespace xrpl

View File

@@ -8,6 +8,7 @@
#include <xrpld/rpc/detail/Tuning.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/protocol/ApiVersion.h>
@@ -131,7 +132,6 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -155,11 +155,11 @@ public:
Json::Value result;
gate g;
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;
@@ -240,28 +240,27 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
Json::Value result;
gate g;
// Test RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur + 1);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));
@@ -269,22 +268,22 @@ public:
// Test RPC::Tuning::max_auto_src_cur source currencies.
for (auto i = 0; i < (RPC::Tuning::max_auto_src_cur - 1); ++i)
env.trust(Account("alice")[std::to_string(i + 100)](100), "bob");
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_auto_src_cur source currencies.
env.trust(Account("alice")["AUD"](100), "bob");
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));

View File

@@ -0,0 +1,537 @@
#include <test/jtx.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/JobQueueAwaiter.h>
#include <chrono>
#include <mutex>
namespace xrpl {
namespace test {
/**
* Test suite for the C++20 coroutine primitives: CoroTask, CoroTaskRunner,
* and JobQueueAwaiter.
*
* Dependency Diagram
* ==================
*
* CoroTask_test
* +-------------------------------------------------+
* | + gate (inner class) : condition_variable helper |
* +-------------------------------------------------+
* | uses
* v
* jtx::Env --> JobQueue::postCoroTask()
* |
* +-- CoroTaskRunner (suspend / post / resume)
* +-- CoroTask<void> / CoroTask<T>
* +-- JobQueueAwaiter
*
* Test Coverage Matrix
* ====================
*
* Test | Primitives exercised
* --------------------------+----------------------------------------------
* testVoidCompletion | CoroTask<void> basic lifecycle
* testCorrectOrder | suspend() -> join() -> post() -> complete
* testIncorrectOrder | post() before suspend() (race-safe path)
* testJobQueueAwaiter | JobQueueAwaiter suspend + auto-repost
* testThreadSpecificStorage | LocalValue isolation across coroutines
* testExceptionPropagation | unhandled_exception() in promise_type
* testMultipleYields | N sequential suspend/resume cycles
* testValueReturn | CoroTask<T> co_return value
* testValueException | CoroTask<T> exception via co_await
* testValueChaining | nested CoroTask<T> -> CoroTask<T>
* testShutdownRejection | postCoroTask returns nullptr when stopping
*/
class CoroTask_test : public beast::unit_test::suite
{
public:
/**
* Simple one-shot gate for synchronizing between test thread
* and coroutine worker threads. signal() sets the flag;
* wait_for() blocks until signaled or timeout.
*/
class gate
{
private:
std::condition_variable cv_;
std::mutex mutex_;
bool signaled_ = false;
public:
/**
* Block until signaled or timeout expires.
*
* @param rel_time Maximum duration to wait
*
* @return true if signaled before timeout
*/
template <class Rep, class Period>
bool
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
{
std::unique_lock<std::mutex> lk(mutex_);
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
signaled_ = false;
return b;
}
/**
* Signal the gate, waking any waiting thread.
*/
void
signal()
{
std::lock_guard lk(mutex_);
signaled_ = true;
cv_.notify_all();
}
};
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
/**
* CoroTask<void> runs to completion and runner becomes non-runnable.
*/
void
testVoidCompletion()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("void completion");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(!runner->runnable());
}
/**
* Correct order: suspend, join, post, complete.
* Mirrors existing Coroutine_test::correct_order.
*/
void
testCorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("correct order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g1, g2;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[rp = &r, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
*rp = runner;
g1p->signal();
co_await runner->suspend();
g2p->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g1.wait_for(5s));
runner->join();
runner->post();
BEAST_EXPECT(g2.wait_for(5s));
runner->join();
}
/**
* Incorrect order: post() before suspend(). Verifies the
* race-safe path. Mirrors Coroutine_test::incorrect_order.
*/
void
testIncorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("incorrect order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto runner) -> CoroTask<void> {
runner->post();
co_await runner->suspend();
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
/**
* JobQueueAwaiter suspend + auto-repost across multiple yield points.
*/
void
testJobQueueAwaiter()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("JobQueueAwaiter");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int step = 0;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [sp = &step, gp = &g](auto runner) -> CoroTask<void> {
*sp = 1;
co_await runner->yieldAndPost();
*sp = 2;
co_await runner->yieldAndPost();
*sp = 3;
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(step == 3);
}
/**
* Per-coroutine LocalValue isolation. Each coroutine sees its own
* copy of thread-local state. Mirrors Coroutine_test::thread_specific_storage.
*/
void
testThreadSpecificStorage()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("thread specific storage");
Env env(*this);
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
gate g;
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -1);
*lv = -2;
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
for (int i = 0; i < N; ++i)
{
jq.postCoroTask(
jtCLIENT,
"CoroTaskTest",
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
(*ap)[id] = runner;
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(**lvp == -1);
**lvp = id;
this->BEAST_EXPECT(**lvp == id);
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(**lvp == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}
for (auto const& r : a)
{
r->post();
BEAST_EXPECT(g.wait_for(5s));
r->join();
}
for (auto const& r : a)
{
r->post();
r->join();
}
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
}
/**
* Exception thrown in coroutine body is caught by
* promise_type::unhandled_exception(). Coroutine completes.
*/
void
testExceptionPropagation()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("exception propagation");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
gp->signal();
throw std::runtime_error("test exception");
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
// The exception is caught by promise_type::unhandled_exception()
// and the coroutine is considered done
BEAST_EXPECT(!runner->runnable());
}
/**
* Multiple sequential suspend/resume cycles via co_await.
*/
void
testMultipleYields()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("multiple yields");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int counter = 0;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[rp = &r, cp = &counter, gp = &g](auto runner) -> CoroTask<void> {
*rp = runner;
++(*cp);
gp->signal();
co_await runner->suspend();
++(*cp);
gp->signal();
co_await runner->suspend();
++(*cp);
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 1);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 2);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 3);
runner->join();
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> returns a value via co_return. Outer coroutine
* extracts it with co_await.
*/
void
testValueReturn()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value return");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> { co_return 42; };
*rp = co_await inner();
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 42);
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> propagates exceptions from inner coroutines.
* Outer coroutine catches via try/catch around co_await.
*/
void
testValueException()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value exception");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
bool caught = false;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [cp = &caught, gp = &g](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> {
throw std::runtime_error("inner error");
co_return 0;
};
try
{
co_await inner();
}
catch (std::runtime_error const& e)
{
*cp = true;
}
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(caught);
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> chaining. Nested value-returning coroutines
* compose via co_await.
*/
void
testValueChaining()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value chaining");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
auto mul = [add](int a, int b) -> CoroTask<int> {
int sum = co_await add(a, b);
co_return sum * 2;
};
*rp = co_await mul(3, 4);
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 14); // (3 + 4) * 2
BEAST_EXPECT(!runner->runnable());
}
/**
* postCoroTask returns nullptr when JobQueue is stopping.
*/
void
testShutdownRejection()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("shutdown rejection");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
// Stop the JobQueue
env.app().getJobQueue().stop();
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [](auto) -> CoroTask<void> { co_return; });
BEAST_EXPECT(!runner);
}
void
run() override
{
testVoidCompletion();
testCorrectOrder();
testIncorrectOrder();
testJobQueueAwaiter();
testThreadSpecificStorage();
testExceptionPropagation();
testMultipleYields();
testValueReturn();
testValueException();
testValueChaining();
testShutdownRejection();
}
};
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
} // namespace test
} // namespace xrpl

View File

@@ -40,6 +40,11 @@ public:
}
};
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
void
correct_order()
{
@@ -54,13 +59,15 @@ public:
}));
gate g1, g2;
std::shared_ptr<JobQueue::Coro> c;
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& cr) {
c = cr;
g1.signal();
c->yield();
g2.signal();
});
std::shared_ptr<JobQueue::CoroTaskRunner> c;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTest", [cp = &c, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
*cp = runner;
g1p->signal();
co_await runner->suspend();
g2p->signal();
co_return;
});
BEAST_EXPECT(g1.wait_for(5s));
c->join();
c->post();
@@ -81,11 +88,17 @@ public:
}));
gate g;
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& c) {
c->post();
c->yield();
g.signal();
});
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTest", [gp = &g](auto runner) -> CoroTask<void> {
// Schedule a resume before suspending. The posted job
// cannot actually call resume() until the current resume()
// releases CoroTaskRunner::mutex_, which only happens after
// the coroutine suspends at co_await.
runner->post();
co_await runner->suspend();
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
@@ -101,7 +114,7 @@ public:
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::Coro>, N> a;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
@@ -118,19 +131,23 @@ public:
for (int i = 0; i < N; ++i)
{
jq.postCoro(jtCLIENT, "CoroTest", [&, id = i](auto const& c) {
a[id] = c;
g.signal();
c->yield();
jq.postCoroTask(
jtCLIENT,
"CoroTest",
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
(*ap)[id] = runner;
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == -1);
*lv = id;
this->BEAST_EXPECT(*lv == id);
g.signal();
c->yield();
this->BEAST_EXPECT(**lvp == -1);
**lvp = id;
this->BEAST_EXPECT(**lvp == id);
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == id);
});
this->BEAST_EXPECT(**lvp == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}

View File

@@ -43,87 +43,91 @@ class JobQueue_test : public beast::unit_test::suite
}
}
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
void
testPostCoro()
testPostCoroTask()
{
jtx::Env env{*this};
JobQueue& jQueue = env.app().getJobQueue();
{
// Test repeated post()s until the Coro completes.
// Test repeated post()s until the coroutine completes.
std::atomic<int> yieldCount{0};
auto const coro = jQueue.postCoro(
jtCLIENT,
"PostCoroTest1",
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
while (++yieldCount < 4)
coroCopy->yield();
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest1", [ycp = &yieldCount](auto runner) -> CoroTask<void> {
while (++(*ycp) < 4)
co_await runner->suspend();
co_return;
});
BEAST_EXPECT(coro != nullptr);
BEAST_EXPECT(runner != nullptr);
// Wait for the Job to run and yield.
while (yieldCount == 0)
;
// Now re-post until the Coro says it is done.
// Now re-post until the CoroTaskRunner says it is done.
int old = yieldCount;
while (coro->runnable())
while (runner->runnable())
{
BEAST_EXPECT(coro->post());
BEAST_EXPECT(runner->post());
while (old == yieldCount)
{
}
coro->join();
runner->join();
BEAST_EXPECT(++old == yieldCount);
}
BEAST_EXPECT(yieldCount == 4);
}
{
// Test repeated resume()s until the Coro completes.
// Test repeated resume()s until the coroutine completes.
int yieldCount{0};
auto const coro = jQueue.postCoro(
jtCLIENT,
"PostCoroTest2",
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
while (++yieldCount < 4)
coroCopy->yield();
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest2", [ycp = &yieldCount](auto runner) -> CoroTask<void> {
while (++(*ycp) < 4)
co_await runner->suspend();
co_return;
});
if (!coro)
if (!runner)
{
// There's no good reason we should not get a Coro, but we
// There's no good reason we should not get a runner, but we
// can't continue without one.
BEAST_EXPECT(false);
return;
}
// Wait for the Job to run and yield.
coro->join();
runner->join();
// Now resume until the Coro says it is done.
// Now resume until the CoroTaskRunner says it is done.
int old = yieldCount;
while (coro->runnable())
while (runner->runnable())
{
coro->resume(); // Resume runs synchronously on this thread.
runner->resume(); // Resume runs synchronously on this thread.
BEAST_EXPECT(++old == yieldCount);
}
BEAST_EXPECT(yieldCount == 4);
}
{
// If the JobQueue is stopped, we should no
// longer be able to add a Coro (and calling postCoro() should
// return false).
// longer be able to post a coroutine (and calling postCoroTask()
// should return nullptr).
using namespace std::chrono_literals;
jQueue.stop();
// The Coro should never run, so having the Coro access this
// The coroutine should never run, so having it access this
// unprotected variable on the stack should be completely safe.
// Not recommended for the faint of heart...
bool unprotected;
auto const coro = jQueue.postCoro(
jtCLIENT, "PostCoroTest3", [&unprotected](std::shared_ptr<JobQueue::Coro> const&) {
unprotected = false;
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest3", [up = &unprotected](auto) -> CoroTask<void> {
*up = false;
co_return;
});
BEAST_EXPECT(coro == nullptr);
BEAST_EXPECT(runner == nullptr);
}
}
@@ -132,7 +136,7 @@ public:
run() override
{
testAddJob();
testPostCoro();
testPostCoroTask();
}
};

View File

@@ -39,9 +39,9 @@ public:
a = std::move(b);
Account c(std::move(a));
}
Account("alice");
Account("alice", KeyType::secp256k1);
Account("alice", KeyType::ed25519);
Account("alice"); // NOLINT(bugprone-unused-raii)
Account("alice", KeyType::secp256k1); // NOLINT(bugprone-unused-raii)
Account("alice", KeyType::ed25519); // NOLINT(bugprone-unused-raii)
auto const gw = Account("gw");
[](AccountID) {}(gw);
auto const USD = gw["USD"];
@@ -56,11 +56,11 @@ public:
{
using namespace jtx;
PrettyAmount(0);
PrettyAmount(1);
PrettyAmount(0u);
PrettyAmount(1u);
PrettyAmount(-1);
PrettyAmount(0); // NOLINT(bugprone-unused-raii)
PrettyAmount(1); // NOLINT(bugprone-unused-raii)
PrettyAmount(0u); // NOLINT(bugprone-unused-raii)
PrettyAmount(1u); // NOLINT(bugprone-unused-raii)
PrettyAmount(-1); // NOLINT(bugprone-unused-raii)
static_assert(!std::is_trivially_constructible<PrettyAmount, char>::value, "");
static_assert(!std::is_trivially_constructible<PrettyAmount, unsigned char>::value, "");
static_assert(!std::is_trivially_constructible<PrettyAmount, short>::value, "");

View File

@@ -6,6 +6,7 @@
#include <xrpld/rpc/RPCHandler.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/protocol/ApiVersion.h>
#include <xrpl/protocol/STParsedJSON.h>
#include <xrpl/resource/Fees.h>
@@ -193,7 +194,6 @@ AMMTest::find_paths_request(
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -215,11 +215,11 @@ AMMTest::find_paths_request(
Json::Value result;
gate g;
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;

View File

@@ -1431,7 +1431,6 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
c,
Role::ADMIN,
{},
{},
RPC::apiMaximumSupportedVersion},
jvCommand};

View File

@@ -3,6 +3,7 @@
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/resource/Fees.h>
namespace xrpl {
@@ -99,13 +100,14 @@ GRPCServerImpl::CallData<Request, Response>::process()
// ensures that finished is always true when this CallData object
// is returned as a tag in handleRpcs(), after sending the response
finished_ = true;
auto coro = app_.getJobQueue().postCoro(
JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
thisShared->process(coro);
auto runner = app_.getJobQueue().postCoroTask(
JobType::jtRPC, "gRPC-Client", [thisShared](auto) -> CoroTask<void> {
thisShared->processRequest();
co_return;
});
// If coro is null, then the JobQueue has already been shutdown
if (!coro)
// If runner is null, then the JobQueue has already been shutdown
if (!runner)
{
grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
responder_.FinishWithError(status, this);
@@ -114,7 +116,7 @@ GRPCServerImpl::CallData<Request, Response>::process()
template <class Request, class Response>
void
GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::Coro> coro)
GRPCServerImpl::CallData<Request, Response>::processRequest()
{
try
{
@@ -156,7 +158,6 @@ GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::C
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
request_};

View File

@@ -206,9 +206,12 @@ private:
clone() override;
private:
// process the request. Called inside the coroutine passed to JobQueue
/**
* Process the gRPC request. Called inside the CoroTask lambda
* posted to the JobQueue by process().
*/
void
process(std::shared_ptr<JobQueue::Coro> coro);
processRequest();
// return load type of this RPC
Resource::Charge

View File

@@ -3,7 +3,6 @@
#include <xrpld/rpc/Role.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/server/InfoSub.h>
namespace xrpl {
@@ -24,7 +23,6 @@ struct Context
LedgerMaster& ledgerMaster;
Resource::Consumer& consumer;
Role role;
std::shared_ptr<JobQueue::Coro> coro{};
InfoSub::pointer infoSub{};
unsigned int apiVersion;
};

View File

@@ -169,13 +169,10 @@ public:
private:
Json::Value
processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv);
processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv);
void
processSession(std::shared_ptr<Session> const&, std::shared_ptr<JobQueue::Coro> coro);
processSession(std::shared_ptr<Session> const&);
void
processRequest(
@@ -183,7 +180,6 @@ private:
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&&,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user);

View File

@@ -14,6 +14,7 @@
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/beast/rfc2616.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/json/to_string.h>
@@ -284,9 +285,17 @@ ServerHandler::onRequest(Session& session)
}
std::shared_ptr<Session> detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](auto) -> CoroTask<void> {
try
{
processSession(detachedSession);
}
catch (std::exception const& e)
{
JLOG(m_journal.error()) << "RPC-Client coroutine exception: " << e.what();
}
co_return;
});
if (postResult == nullptr)
{
@@ -322,17 +331,26 @@ ServerHandler::onWSMessage(
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
auto const postResult = m_jobQueue.postCoro(
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_WEBSOCKET,
"WS-Client",
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
auto const jr = this->processSession(session, coro, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
[this, session, jv = std::move(jv)](auto) -> CoroTask<void> {
try
{
auto const jr = this->processSession(session, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(
boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
}
catch (std::exception const& e)
{
JLOG(m_journal.error()) << "WS-Client coroutine exception: " << e.what();
}
co_return;
});
if (postResult == nullptr)
{
@@ -373,10 +391,7 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ
}
Json::Value
ServerHandler::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv)
ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv)
{
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
if (is->getConsumer().disconnect(m_journal))
@@ -443,7 +458,6 @@ ServerHandler::processSession(
app_.getLedgerMaster(),
is->getConsumer(),
role,
coro,
is,
apiVersion},
jv,
@@ -514,18 +528,14 @@ ServerHandler::processSession(
return jr;
}
// Run as a coroutine.
void
ServerHandler::processSession(
std::shared_ptr<Session> const& session,
std::shared_ptr<JobQueue::Coro> coro)
ServerHandler::processSession(std::shared_ptr<Session> const& session)
{
processRequest(
session->port(),
buffers_to_string(session->request().body().data()),
session->remoteAddress().at_port(0),
makeOutput(*session),
coro,
forwardedFor(session->request()),
[&] {
auto const iter = session->request().find("X-User");
@@ -562,7 +572,6 @@ ServerHandler::processRequest(
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&& output,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user)
{
@@ -819,7 +828,6 @@ ServerHandler::processRequest(
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
params,

View File

@@ -7,6 +7,9 @@
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/resource/Fees.h>
#include <condition_variable>
#include <mutex>
namespace xrpl {
// This interface is deprecated.
@@ -37,98 +40,40 @@ doRipplePathFind(RPC::JsonContext& context)
PathRequest::pointer request;
lpLedger = context.ledgerMaster.getClosedLedger();
// It doesn't look like there's much odd happening here, but you should
// be aware this code runs in a JobQueue::Coro, which is a coroutine.
// And we may be flipping around between threads. Here's an overview:
//
// 1. We're running doRipplePathFind() due to a call to
// ripple_path_find. doRipplePathFind() is currently running
// inside of a JobQueue::Coro using a JobQueue thread.
//
// 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the
// path-finding request. That request will (probably) run at some
// indeterminate future time on a (probably different) JobQueue
// thread.
//
// 3. As a continuation from that path-finding JobQueue thread, the
// coroutine we're currently running in (!) is posted to the
// JobQueue. Because it is a continuation, that post won't
// happen until the path-finding request completes.
//
// 4. Once the continuation is enqueued, and we have reason to think
// the path-finding job is likely to run, then the coroutine we're
// running in yield()s. That means it surrenders its thread in
// the JobQueue. The coroutine is suspended, but ready to run,
// because it is kept resident by a shared_ptr in the
// path-finding continuation.
//
// 5. If all goes well then path-finding runs on a JobQueue thread
// and executes its continuation. The continuation posts this
// same coroutine (!) to the JobQueue.
//
// 6. When the JobQueue calls this coroutine, this coroutine resumes
// from the line below the coro->yield() and returns the
// path-finding result.
//
// With so many moving parts, what could go wrong?
//
// Just in terms of the JobQueue refusing to add jobs at shutdown
// there are two specific things that can go wrong.
//
// 1. The path-finding Job queued by makeLegacyPathRequest() might be
// rejected (because we're shutting down).
//
// Fortunately this problem can be addressed by looking at the
// return value of makeLegacyPathRequest(). If
// makeLegacyPathRequest() cannot get a thread to run the path-find
// on, then it returns an empty request.
//
// 2. The path-finding job might run, but the Coro::post() might be
// rejected by the JobQueue (because we're shutting down).
//
// We handle this case by resuming (not posting) the Coro.
// By resuming the Coro, we allow the Coro to run to completion
// on the current thread instead of requiring that it run on a
// new thread from the JobQueue.
//
// Both of these failure modes are hard to recreate in a unit test
// because they are so dependent on inter-thread timing. However
// the failure modes can be observed by synchronously (inside the
// rippled source code) shutting down the application. The code to
// do so looks like this:
//
// context.app.signalStop();
// while (! context.app.getJobQueue().jobCounter().joined()) { }
//
// The first line starts the process of shutting down the app.
// The second line waits until no more jobs can be added to the
// JobQueue before letting the thread continue.
//
// May 2017
// makeLegacyPathRequest enqueues a path-finding job that runs
// asynchronously. We block this thread with a condition_variable
// until the path-finding continuation signals completion.
// If makeLegacyPathRequest cannot schedule the job (e.g. during
// shutdown), it returns an empty request and we skip the wait.
// Replaces the old Coro yield/resume pattern with synchronous
// blocking, eliminating shutdown race conditions.
std::mutex mtx;
std::condition_variable cv;
bool pathDone = false;
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
request,
[&context]() {
// Copying the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the
// captured reference could evaporate when we return from
// coroCopy->resume(). This is not strictly necessary, but
// will make maintenance easier.
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
if (!coroCopy->post())
[&]() {
{
// The post() failed, so we won't get a thread to let
// the Coro finish. We'll call Coro::resume() so the
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
std::lock_guard lk(mtx);
pathDone = true;
}
cv.notify_one();
},
context.consumer,
lpLedger,
context.params);
if (request)
{
context.coro->yield();
using namespace std::chrono_literals;
std::unique_lock lk(mtx);
if (!cv.wait_for(lk, 30s, [&] { return pathDone; }))
{
// Path-finding continuation never fired (e.g. shutdown
// race or unexpected failure). Return an internal error
// rather than blocking the RPC thread indefinitely.
return rpcError(rpcINTERNAL);
}
jvResult = request->doStatus(context.params);
}