mirror of
https://github.com/XRPLF/rippled.git
synced 2026-03-16 09:42:27 +00:00
Compare commits
41 Commits
pratik/std
...
bthomee/no
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
48535d5226 | ||
|
|
d1a6558080 | ||
|
|
84f86b354f | ||
|
|
40a3985b02 | ||
|
|
208bd35d45 | ||
|
|
e90fbbf7b2 | ||
|
|
277450e648 | ||
|
|
e6993524ea | ||
|
|
b117ecc6a2 | ||
|
|
6c3b00c342 | ||
|
|
8c296a935a | ||
|
|
573ba82181 | ||
|
|
1542ab7e27 | ||
|
|
6374f4886d | ||
|
|
ebf336f472 | ||
|
|
ddc15ad612 | ||
|
|
82db6ac498 | ||
|
|
f749c41306 | ||
|
|
f25e47a58d | ||
|
|
2396799bd8 | ||
|
|
4855b9f96a | ||
|
|
b2f65cb7eb | ||
|
|
c523673885 | ||
|
|
caac4d63d3 | ||
|
|
29b0076fa8 | ||
|
|
c9aa1094a7 | ||
|
|
b86f69cb82 | ||
|
|
5d0bf78512 | ||
|
|
554df631c6 | ||
|
|
5e704bfdfb | ||
|
|
fe8cc02bfa | ||
|
|
061c033f52 | ||
|
|
832a7e7e4a | ||
|
|
b2371c4c02 | ||
|
|
b94a7c4b44 | ||
|
|
9b9027112d | ||
|
|
8e7889c66e | ||
|
|
d836c3788d | ||
|
|
1cb7c0293f | ||
|
|
52dabc1f79 | ||
|
|
2d78d41f7b |
@@ -14,7 +14,6 @@ Checks: "-*,
|
||||
bugprone-fold-init-type,
|
||||
bugprone-forward-declaration-namespace,
|
||||
bugprone-inaccurate-erase,
|
||||
bugprone-inc-dec-in-conditions,
|
||||
bugprone-incorrect-enable-if,
|
||||
bugprone-incorrect-roundings,
|
||||
bugprone-infinite-loop,
|
||||
@@ -65,10 +64,8 @@ Checks: "-*,
|
||||
bugprone-undefined-memory-manipulation,
|
||||
bugprone-undelegated-constructor,
|
||||
bugprone-unhandled-exception-at-new,
|
||||
bugprone-unhandled-self-assignment,
|
||||
bugprone-unique-ptr-array-mismatch,
|
||||
bugprone-unsafe-functions,
|
||||
bugprone-unused-raii,
|
||||
bugprone-unused-local-non-trivial-variable,
|
||||
bugprone-virtual-near-miss,
|
||||
cppcoreguidelines-no-suspend-with-lock,
|
||||
@@ -98,10 +95,12 @@ Checks: "-*,
|
||||
# checks that have some issues that need to be resolved:
|
||||
#
|
||||
# bugprone-crtp-constructor-accessibility,
|
||||
# bugprone-inc-dec-in-conditions,
|
||||
# bugprone-move-forwarding-reference,
|
||||
# bugprone-switch-missing-default-case,
|
||||
# bugprone-unused-return-value,
|
||||
# bugprone-use-after-move,
|
||||
# bugprone-unhandled-self-assignment,
|
||||
# bugprone-unused-raii,
|
||||
#
|
||||
# cppcoreguidelines-misleading-capture-default-by-value,
|
||||
|
||||
9
.github/actions/build-deps/action.yml
vendored
9
.github/actions/build-deps/action.yml
vendored
@@ -36,14 +36,7 @@ runs:
|
||||
SANITIZERS: ${{ inputs.sanitizers }}
|
||||
run: |
|
||||
echo 'Installing dependencies.'
|
||||
# Sanitizer builds (especially TSAN) need ASLR disabled because
|
||||
# dependency configure tests compile and run instrumented binaries
|
||||
# that crash with "unexpected memory mapping" on modern kernels.
|
||||
WRAPPER=""
|
||||
if [[ -n "${SANITIZERS}" && "$(uname)" == "Linux" ]]; then
|
||||
WRAPPER="setarch $(uname -m) -R"
|
||||
fi
|
||||
${WRAPPER} conan install \
|
||||
conan install \
|
||||
--profile ci \
|
||||
--build="${BUILD_OPTION}" \
|
||||
--options:host='&:tests=True' \
|
||||
|
||||
@@ -42,6 +42,7 @@ libxrpl.tx > xrpl.server
|
||||
libxrpl.tx > xrpl.tx
|
||||
test.app > test.jtx
|
||||
test.app > test.rpc
|
||||
test.app > test.shamap
|
||||
test.app > test.toplevel
|
||||
test.app > test.unit_test
|
||||
test.app > xrpl.basics
|
||||
@@ -57,6 +58,7 @@ test.app > xrpl.protocol
|
||||
test.app > xrpl.rdb
|
||||
test.app > xrpl.resource
|
||||
test.app > xrpl.server
|
||||
test.app > xrpl.shamap
|
||||
test.app > xrpl.tx
|
||||
test.basics > test.jtx
|
||||
test.basics > test.unit_test
|
||||
|
||||
27
.github/scripts/strategy-matrix/generate.py
vendored
27
.github/scripts/strategy-matrix/generate.py
vendored
@@ -55,7 +55,7 @@ def generate_strategy_matrix(all: bool, config: Config) -> list:
|
||||
# fee to 500.
|
||||
# - Bookworm using GCC 15: Debug on linux/amd64, enable code
|
||||
# coverage (which will be done below).
|
||||
# - Bookworm using Clang 16: Debug on linux/amd64, enable voidstar.
|
||||
# - Bookworm using Clang 16: Debug on linux/arm64, enable voidstar.
|
||||
# - Bookworm using Clang 17: Release on linux/amd64, set the
|
||||
# reference fee to 1000.
|
||||
# - Bookworm using Clang 20: Debug on linux/amd64.
|
||||
@@ -78,7 +78,7 @@ def generate_strategy_matrix(all: bool, config: Config) -> list:
|
||||
if (
|
||||
f"{os['compiler_name']}-{os['compiler_version']}" == "clang-16"
|
||||
and build_type == "Debug"
|
||||
and architecture["platform"] == "linux/amd64"
|
||||
and architecture["platform"] == "linux/arm64"
|
||||
):
|
||||
cmake_args = f"-Dvoidstar=ON {cmake_args}"
|
||||
skip = False
|
||||
@@ -235,16 +235,11 @@ def generate_strategy_matrix(all: bool, config: Config) -> list:
|
||||
# so that they are easier to identify in the GitHub Actions UI, as long
|
||||
# names get truncated.
|
||||
# Add Address and Thread (both coupled with UB) sanitizers for specific bookworm distros.
|
||||
# Note: GCC ASAN's detect_stack_use_after_return produces false positives with
|
||||
# Boost.Context fibers (boost::asio::spawn). Mitigated in reusable-build-test-config.yml
|
||||
# by setting detect_stack_use_after_return=0 for GCC.
|
||||
# See: https://github.com/google/sanitizers/issues/856
|
||||
if os[
|
||||
"distro_version"
|
||||
] == "bookworm" and f"{os['compiler_name']}-{os['compiler_version']}" in [
|
||||
"gcc-13",
|
||||
"clang-17",
|
||||
]:
|
||||
# GCC-Asan rippled-embedded tests are failing because of https://github.com/google/sanitizers/issues/856
|
||||
if (
|
||||
os["distro_version"] == "bookworm"
|
||||
and f"{os['compiler_name']}-{os['compiler_version']}" == "clang-20"
|
||||
):
|
||||
# Add ASAN + UBSAN configuration.
|
||||
configurations.append(
|
||||
{
|
||||
@@ -258,19 +253,19 @@ def generate_strategy_matrix(all: bool, config: Config) -> list:
|
||||
"sanitizers": "address,undefinedbehavior",
|
||||
}
|
||||
)
|
||||
# TSAN is activated on gcc-13 and clang-17.
|
||||
activate_tsan = True
|
||||
# TSAN is deactivated due to seg faults with latest compilers.
|
||||
activate_tsan = False
|
||||
if activate_tsan:
|
||||
configurations.append(
|
||||
{
|
||||
"config_name": config_name + "-tsan",
|
||||
"config_name": config_name + "-tsan-ubsan",
|
||||
"cmake_args": cmake_args,
|
||||
"cmake_target": cmake_target,
|
||||
"build_only": build_only,
|
||||
"build_type": build_type,
|
||||
"os": os,
|
||||
"architecture": architecture,
|
||||
"sanitizers": "thread",
|
||||
"sanitizers": "thread,undefinedbehavior",
|
||||
}
|
||||
)
|
||||
else:
|
||||
|
||||
5
.github/workflows/on-pr.yml
vendored
5
.github/workflows/on-pr.yml
vendored
@@ -141,8 +141,9 @@ jobs:
|
||||
needs:
|
||||
- should-run
|
||||
- build-test
|
||||
# Only run when committing to a PR that targets a release branch.
|
||||
if: ${{ github.repository == 'XRPLF/rippled' && needs.should-run.outputs.go == 'true' && startsWith(github.ref, 'refs/heads/release') }}
|
||||
# Only run when committing to a PR that targets a release branch in the
|
||||
# XRPLF repository.
|
||||
if: ${{ github.repository_owner == 'XRPLF' && needs.should-run.outputs.go == 'true' && startsWith(github.ref, 'refs/heads/release') }}
|
||||
uses: ./.github/workflows/reusable-upload-recipe.yml
|
||||
secrets:
|
||||
remote_username: ${{ secrets.CONAN_REMOTE_USERNAME }}
|
||||
|
||||
3
.github/workflows/on-tag.yml
vendored
3
.github/workflows/on-tag.yml
vendored
@@ -17,7 +17,8 @@ defaults:
|
||||
|
||||
jobs:
|
||||
upload-recipe:
|
||||
if: ${{ github.repository == 'XRPLF/rippled' }}
|
||||
# Only run when a tag is pushed to the XRPLF repository.
|
||||
if: ${{ github.repository_owner == 'XRPLF' }}
|
||||
uses: ./.github/workflows/reusable-upload-recipe.yml
|
||||
secrets:
|
||||
remote_username: ${{ secrets.CONAN_REMOTE_USERNAME }}
|
||||
|
||||
4
.github/workflows/on-trigger.yml
vendored
4
.github/workflows/on-trigger.yml
vendored
@@ -92,8 +92,8 @@ jobs:
|
||||
|
||||
upload-recipe:
|
||||
needs: build-test
|
||||
# Only run when pushing to the develop branch.
|
||||
if: ${{ github.repository == 'XRPLF/rippled' && github.event_name == 'push' && github.ref == 'refs/heads/develop' }}
|
||||
# Only run when pushing to the develop branch in the XRPLF repository.
|
||||
if: ${{ github.repository_owner == 'XRPLF' && github.event_name == 'push' && github.ref == 'refs/heads/develop' }}
|
||||
uses: ./.github/workflows/reusable-upload-recipe.yml
|
||||
secrets:
|
||||
remote_username: ${{ secrets.CONAN_REMOTE_USERNAME }}
|
||||
|
||||
42
.github/workflows/reusable-build-test-config.yml
vendored
42
.github/workflows/reusable-build-test-config.yml
vendored
@@ -75,13 +75,8 @@ jobs:
|
||||
build-and-test:
|
||||
name: ${{ inputs.config_name }}
|
||||
runs-on: ${{ fromJSON(inputs.runs_on) }}
|
||||
# seccomp=unconfined is required for sanitizer builds so that
|
||||
# setarch -R can disable ASLR via the personality() syscall.
|
||||
# Without this, TSAN crashes with "unexpected memory mapping"
|
||||
# because modern kernels (5.18+) randomize library mappings
|
||||
# into address ranges that collide with TSAN's shadow memory.
|
||||
container: ${{ inputs.image != '' && fromJSON(format('{"image":"{0}","options":"{1}"}', inputs.image, inputs.sanitizers != '' && '--security-opt seccomp=unconfined' || '')) || null }}
|
||||
timeout-minutes: ${{ inputs.sanitizers != '' && 360 || 60 }}
|
||||
container: ${{ inputs.image != '' && inputs.image || null }}
|
||||
timeout-minutes: 60
|
||||
env:
|
||||
# Use a namespace to keep the objects separate for each configuration.
|
||||
CCACHE_NAMESPACE: ${{ inputs.config_name }}
|
||||
@@ -181,7 +176,7 @@ jobs:
|
||||
fi
|
||||
|
||||
- name: Upload the binary (Linux)
|
||||
if: ${{ github.repository == 'XRPLF/rippled' && runner.os == 'Linux' }}
|
||||
if: ${{ github.repository_owner == 'XRPLF' && runner.os == 'Linux' }}
|
||||
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
|
||||
with:
|
||||
name: xrpld-${{ inputs.config_name }}
|
||||
@@ -210,14 +205,7 @@ jobs:
|
||||
- name: Set sanitizer options
|
||||
if: ${{ !inputs.build_only && env.SANITIZERS_ENABLED == 'true' }}
|
||||
run: |
|
||||
ASAN_OPTS="print_stacktrace=1:detect_container_overflow=0:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/asan.supp"
|
||||
# GCC ASAN's detect_stack_use_after_return produces false positives with
|
||||
# Boost.Context fiber stack switching (used by boost::asio::spawn).
|
||||
# See: https://github.com/google/sanitizers/issues/856
|
||||
if [[ "${{ inputs.config_name }}" == *gcc* ]]; then
|
||||
ASAN_OPTS="${ASAN_OPTS}:detect_stack_use_after_return=0"
|
||||
fi
|
||||
echo "ASAN_OPTIONS=${ASAN_OPTS}" >> ${GITHUB_ENV}
|
||||
echo "ASAN_OPTIONS=print_stacktrace=1:detect_container_overflow=0:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/asan.supp" >> ${GITHUB_ENV}
|
||||
echo "TSAN_OPTIONS=second_deadlock_stack=1:halt_on_error=0:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/tsan.supp" >> ${GITHUB_ENV}
|
||||
echo "UBSAN_OPTIONS=suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/ubsan.supp" >> ${GITHUB_ENV}
|
||||
echo "LSAN_OPTIONS=suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/lsan.supp" >> ${GITHUB_ENV}
|
||||
@@ -225,18 +213,12 @@ jobs:
|
||||
- name: Run the separate tests
|
||||
if: ${{ !inputs.build_only }}
|
||||
working-directory: ${{ env.BUILD_DIR }}
|
||||
# Windows locks some of the build files while running tests, and parallel jobs can collide.
|
||||
# Sanitizer builds use single-threaded execution and setarch -R to disable ASLR
|
||||
# (TSAN's shadow memory requires deterministic address mappings).
|
||||
# Windows locks some of the build files while running tests, and parallel jobs can collide
|
||||
env:
|
||||
BUILD_TYPE: ${{ inputs.build_type }}
|
||||
PARALLELISM: ${{ env.SANITIZERS_ENABLED == 'true' && '1' || (runner.os == 'Windows' && '1' || steps.nproc.outputs.nproc) }}
|
||||
PARALLELISM: ${{ runner.os == 'Windows' && '1' || steps.nproc.outputs.nproc }}
|
||||
run: |
|
||||
WRAPPER=""
|
||||
if [[ "${SANITIZERS_ENABLED}" == "true" && "$(uname)" == "Linux" ]]; then
|
||||
WRAPPER="setarch $(uname -m) -R"
|
||||
fi
|
||||
${WRAPPER} ctest \
|
||||
ctest \
|
||||
--output-on-failure \
|
||||
-C "${BUILD_TYPE}" \
|
||||
-j "${PARALLELISM}"
|
||||
@@ -245,14 +227,10 @@ jobs:
|
||||
if: ${{ !inputs.build_only }}
|
||||
working-directory: ${{ runner.os == 'Windows' && format('{0}/{1}', env.BUILD_DIR, inputs.build_type) || env.BUILD_DIR }}
|
||||
env:
|
||||
BUILD_NPROC: ${{ env.SANITIZERS_ENABLED == 'true' && '1' || steps.nproc.outputs.nproc }}
|
||||
BUILD_NPROC: ${{ steps.nproc.outputs.nproc }}
|
||||
run: |
|
||||
set -o pipefail
|
||||
WRAPPER=""
|
||||
if [[ "${SANITIZERS_ENABLED}" == "true" && "$(uname)" == "Linux" ]]; then
|
||||
WRAPPER="setarch $(uname -m) -R"
|
||||
fi
|
||||
${WRAPPER} ./xrpld --unittest --unittest-jobs "${BUILD_NPROC}" 2>&1 | tee unittest.log
|
||||
./xrpld --unittest --unittest-jobs "${BUILD_NPROC}" 2>&1 | tee unittest.log
|
||||
|
||||
- name: Show test failure summary
|
||||
if: ${{ failure() && !inputs.build_only }}
|
||||
@@ -288,7 +266,7 @@ jobs:
|
||||
--target coverage
|
||||
|
||||
- name: Upload coverage report
|
||||
if: ${{ github.repository == 'XRPLF/rippled' && !inputs.build_only && env.COVERAGE_ENABLED == 'true' }}
|
||||
if: ${{ github.repository_owner == 'XRPLF' && !inputs.build_only && env.COVERAGE_ENABLED == 'true' }}
|
||||
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
|
||||
with:
|
||||
disable_search: true
|
||||
|
||||
4
.github/workflows/upload-conan-deps.yml
vendored
4
.github/workflows/upload-conan-deps.yml
vendored
@@ -103,11 +103,11 @@ jobs:
|
||||
sanitizers: ${{ matrix.sanitizers }}
|
||||
|
||||
- name: Log into Conan remote
|
||||
if: ${{ github.repository == 'XRPLF/rippled' && (github.event_name == 'push' || github.event_name == 'workflow_dispatch') }}
|
||||
if: ${{ github.repository_owner == 'XRPLF' && (github.event_name == 'push' || github.event_name == 'workflow_dispatch') }}
|
||||
run: conan remote login "${CONAN_REMOTE_NAME}" "${{ secrets.CONAN_REMOTE_USERNAME }}" --password "${{ secrets.CONAN_REMOTE_PASSWORD }}"
|
||||
|
||||
- name: Upload Conan packages
|
||||
if: ${{ github.repository == 'XRPLF/rippled' && (github.event_name == 'push' || github.event_name == 'workflow_dispatch') }}
|
||||
if: ${{ github.repository_owner == 'XRPLF' && (github.event_name == 'push' || github.event_name == 'workflow_dispatch') }}
|
||||
env:
|
||||
FORCE_OPTION: ${{ github.event.inputs.force_upload == 'true' && '--force' || '' }}
|
||||
run: conan upload "*" --remote="${CONAN_REMOTE_NAME}" --confirm ${FORCE_OPTION}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -131,6 +131,7 @@ if(coverage)
|
||||
include(XrplCov)
|
||||
endif()
|
||||
|
||||
set(PROJECT_EXPORT_SET XrplExports)
|
||||
include(XrplCore)
|
||||
include(XrplInstall)
|
||||
include(XrplValidatorKeys)
|
||||
|
||||
60
cmake/XrplConfig.cmake
Normal file
60
cmake/XrplConfig.cmake
Normal file
@@ -0,0 +1,60 @@
|
||||
include(CMakeFindDependencyMacro)
|
||||
# need to represent system dependencies of the lib here
|
||||
#[=========================================================[
|
||||
Boost
|
||||
#]=========================================================]
|
||||
if(static OR APPLE OR MSVC)
|
||||
set(Boost_USE_STATIC_LIBS ON)
|
||||
endif()
|
||||
set(Boost_USE_MULTITHREADED ON)
|
||||
if(static OR MSVC)
|
||||
set(Boost_USE_STATIC_RUNTIME ON)
|
||||
else()
|
||||
set(Boost_USE_STATIC_RUNTIME OFF)
|
||||
endif()
|
||||
find_dependency(
|
||||
Boost
|
||||
COMPONENTS
|
||||
chrono
|
||||
container
|
||||
context
|
||||
coroutine
|
||||
date_time
|
||||
filesystem
|
||||
program_options
|
||||
regex
|
||||
system
|
||||
thread
|
||||
)
|
||||
#[=========================================================[
|
||||
OpenSSL
|
||||
#]=========================================================]
|
||||
if(NOT DEFINED OPENSSL_ROOT_DIR)
|
||||
if(DEFINED ENV{OPENSSL_ROOT})
|
||||
set(OPENSSL_ROOT_DIR $ENV{OPENSSL_ROOT})
|
||||
elseif(APPLE)
|
||||
find_program(homebrew brew)
|
||||
if(homebrew)
|
||||
execute_process(
|
||||
COMMAND ${homebrew} --prefix openssl
|
||||
OUTPUT_VARIABLE OPENSSL_ROOT_DIR
|
||||
OUTPUT_STRIP_TRAILING_WHITESPACE
|
||||
)
|
||||
endif()
|
||||
endif()
|
||||
file(TO_CMAKE_PATH "${OPENSSL_ROOT_DIR}" OPENSSL_ROOT_DIR)
|
||||
endif()
|
||||
|
||||
if(static OR APPLE OR MSVC)
|
||||
set(OPENSSL_USE_STATIC_LIBS ON)
|
||||
endif()
|
||||
set(OPENSSL_MSVC_STATIC_RT ON)
|
||||
find_dependency(OpenSSL REQUIRED)
|
||||
find_dependency(ZLIB)
|
||||
find_dependency(date)
|
||||
if(TARGET ZLIB::ZLIB)
|
||||
set_target_properties(
|
||||
OpenSSL::Crypto
|
||||
PROPERTIES INTERFACE_LINK_LIBRARIES ZLIB::ZLIB
|
||||
)
|
||||
endif()
|
||||
@@ -2,38 +2,100 @@
|
||||
install stuff
|
||||
#]===================================================================]
|
||||
|
||||
include(GNUInstallDirs)
|
||||
include(create_symbolic_link)
|
||||
|
||||
if(is_root_project AND TARGET xrpld)
|
||||
install(
|
||||
TARGETS xrpld
|
||||
RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}" COMPONENT runtime
|
||||
)
|
||||
|
||||
install(
|
||||
FILES "${CMAKE_CURRENT_SOURCE_DIR}/cfg/xrpld-example.cfg"
|
||||
DESTINATION "${CMAKE_INSTALL_SYSCONFDIR}/xrpld"
|
||||
RENAME xrpld.cfg
|
||||
COMPONENT runtime
|
||||
)
|
||||
|
||||
install(
|
||||
FILES "${CMAKE_CURRENT_SOURCE_DIR}/cfg/validators-example.txt"
|
||||
DESTINATION "${CMAKE_INSTALL_SYSCONFDIR}/xrpld"
|
||||
RENAME validators.txt
|
||||
COMPONENT runtime
|
||||
)
|
||||
# If no suffix is defined for executables (e.g. Windows uses .exe but Linux
|
||||
# and macOS use none), then explicitly set it to the empty string.
|
||||
if(NOT DEFINED suffix)
|
||||
set(suffix "")
|
||||
endif()
|
||||
|
||||
install(
|
||||
TARGETS xrpl.libpb xrpl.libxrpl
|
||||
LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}" COMPONENT development
|
||||
ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}" COMPONENT development
|
||||
RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}" COMPONENT development
|
||||
TARGETS
|
||||
common
|
||||
opts
|
||||
xrpl_boost
|
||||
xrpl_libs
|
||||
xrpl_syslibs
|
||||
xrpl.imports.main
|
||||
xrpl.libpb
|
||||
xrpl.libxrpl
|
||||
xrpl.libxrpl.basics
|
||||
xrpl.libxrpl.beast
|
||||
xrpl.libxrpl.conditions
|
||||
xrpl.libxrpl.core
|
||||
xrpl.libxrpl.crypto
|
||||
xrpl.libxrpl.git
|
||||
xrpl.libxrpl.json
|
||||
xrpl.libxrpl.rdb
|
||||
xrpl.libxrpl.ledger
|
||||
xrpl.libxrpl.net
|
||||
xrpl.libxrpl.nodestore
|
||||
xrpl.libxrpl.protocol
|
||||
xrpl.libxrpl.resource
|
||||
xrpl.libxrpl.server
|
||||
xrpl.libxrpl.shamap
|
||||
xrpl.libxrpl.tx
|
||||
antithesis-sdk-cpp
|
||||
EXPORT XrplExports
|
||||
LIBRARY DESTINATION lib
|
||||
ARCHIVE DESTINATION lib
|
||||
RUNTIME DESTINATION bin
|
||||
INCLUDES DESTINATION include
|
||||
)
|
||||
|
||||
install(
|
||||
DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/include/xrpl"
|
||||
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}"
|
||||
COMPONENT development
|
||||
)
|
||||
|
||||
install(
|
||||
EXPORT XrplExports
|
||||
FILE XrplTargets.cmake
|
||||
NAMESPACE Xrpl::
|
||||
DESTINATION lib/cmake/xrpl
|
||||
)
|
||||
include(CMakePackageConfigHelpers)
|
||||
write_basic_package_version_file(
|
||||
XrplConfigVersion.cmake
|
||||
VERSION ${xrpld_version}
|
||||
COMPATIBILITY SameMajorVersion
|
||||
)
|
||||
|
||||
if(is_root_project AND TARGET xrpld)
|
||||
install(TARGETS xrpld RUNTIME DESTINATION bin)
|
||||
set_target_properties(xrpld PROPERTIES INSTALL_RPATH_USE_LINK_PATH ON)
|
||||
# sample configs should not overwrite existing files
|
||||
# install if-not-exists workaround as suggested by
|
||||
# https://cmake.org/Bug/view.php?id=12646
|
||||
install(
|
||||
CODE
|
||||
"
|
||||
macro (copy_if_not_exists SRC DEST NEWNAME)
|
||||
if (NOT EXISTS \"\$ENV{DESTDIR}\${CMAKE_INSTALL_PREFIX}/\${DEST}/\${NEWNAME}\")
|
||||
file (INSTALL FILE_PERMISSIONS OWNER_READ OWNER_WRITE DESTINATION \"\${CMAKE_INSTALL_PREFIX}/\${DEST}\" FILES \"\${SRC}\" RENAME \"\${NEWNAME}\")
|
||||
else ()
|
||||
message (\"-- Skipping : \$ENV{DESTDIR}\${CMAKE_INSTALL_PREFIX}/\${DEST}/\${NEWNAME}\")
|
||||
endif ()
|
||||
endmacro()
|
||||
copy_if_not_exists(\"${CMAKE_CURRENT_SOURCE_DIR}/cfg/xrpld-example.cfg\" etc xrpld.cfg)
|
||||
copy_if_not_exists(\"${CMAKE_CURRENT_SOURCE_DIR}/cfg/validators-example.txt\" etc validators.txt)
|
||||
"
|
||||
)
|
||||
install(
|
||||
CODE
|
||||
"
|
||||
set(CMAKE_MODULE_PATH \"${CMAKE_MODULE_PATH}\")
|
||||
include(create_symbolic_link)
|
||||
create_symbolic_link(xrpld${suffix} \
|
||||
\$ENV{DESTDIR}\${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_BINDIR}/rippled${suffix})
|
||||
"
|
||||
)
|
||||
endif()
|
||||
|
||||
install(
|
||||
FILES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/cmake/XrplConfig.cmake
|
||||
${CMAKE_CURRENT_BINARY_DIR}/XrplConfigVersion.cmake
|
||||
DESTINATION lib/cmake/xrpl
|
||||
)
|
||||
|
||||
@@ -23,6 +23,7 @@ target_compile_definitions(
|
||||
BOOST_FILESYSTEM_NO_DEPRECATED
|
||||
>
|
||||
$<$<NOT:$<BOOL:${boost_show_deprecated}>>:
|
||||
BOOST_COROUTINES_NO_DEPRECATION_WARNING
|
||||
BOOST_BEAST_ALLOW_DEPRECATED
|
||||
BOOST_FILESYSTEM_DEPRECATED
|
||||
>
|
||||
|
||||
@@ -50,13 +50,6 @@ if(MSVC AND CMAKE_GENERATOR_PLATFORM STREQUAL "Win32")
|
||||
message(FATAL_ERROR "Visual Studio 32-bit build is not supported.")
|
||||
endif()
|
||||
|
||||
if(voidstar AND NOT is_amd64)
|
||||
message(
|
||||
FATAL_ERROR
|
||||
"The voidstar library only supported on amd64/x86_64. Detected archictecture was: ${CMAKE_SYSTEM_PROCESSOR}"
|
||||
)
|
||||
endif()
|
||||
|
||||
if(APPLE AND NOT HOMEBREW)
|
||||
find_program(HOMEBREW brew)
|
||||
endif()
|
||||
|
||||
@@ -7,6 +7,7 @@ find_package(
|
||||
COMPONENTS
|
||||
chrono
|
||||
container
|
||||
coroutine
|
||||
date_time
|
||||
filesystem
|
||||
json
|
||||
@@ -25,6 +26,7 @@ target_link_libraries(
|
||||
Boost::headers
|
||||
Boost::chrono
|
||||
Boost::container
|
||||
Boost::coroutine
|
||||
Boost::date_time
|
||||
Boost::filesystem
|
||||
Boost::json
|
||||
|
||||
@@ -196,6 +196,7 @@ class Xrpl(ConanFile):
|
||||
"boost::headers",
|
||||
"boost::chrono",
|
||||
"boost::container",
|
||||
"boost::coroutine",
|
||||
"boost::date_time",
|
||||
"boost::filesystem",
|
||||
"boost::json",
|
||||
|
||||
@@ -71,14 +71,12 @@ words:
|
||||
- coldwallet
|
||||
- compr
|
||||
- conanfile
|
||||
- cppcoro
|
||||
- conanrun
|
||||
- confs
|
||||
- connectability
|
||||
- coro
|
||||
- coros
|
||||
- cowid
|
||||
- cppcoro
|
||||
- cryptocondition
|
||||
- cryptoconditional
|
||||
- cryptoconditions
|
||||
@@ -99,17 +97,13 @@ words:
|
||||
- doxyfile
|
||||
- dxrpl
|
||||
- endmacro
|
||||
- eventfd
|
||||
- exceptioned
|
||||
- Falco
|
||||
- fcontext
|
||||
- finalizers
|
||||
- firewalled
|
||||
- fcontext
|
||||
- fmtdur
|
||||
- fsanitize
|
||||
- funclets
|
||||
- gantt
|
||||
- gcov
|
||||
- gcovr
|
||||
- ghead
|
||||
@@ -193,7 +187,6 @@ words:
|
||||
- ostr
|
||||
- pargs
|
||||
- partitioner
|
||||
- pratik
|
||||
- paychan
|
||||
- paychans
|
||||
- permdex
|
||||
@@ -201,7 +194,6 @@ words:
|
||||
- permissioned
|
||||
- pointee
|
||||
- populator
|
||||
- pratik
|
||||
- preauth
|
||||
- preauthorization
|
||||
- preauthorize
|
||||
@@ -216,7 +208,6 @@ words:
|
||||
- queuable
|
||||
- Raphson
|
||||
- replayer
|
||||
- repost
|
||||
- rerere
|
||||
- retriable
|
||||
- RIPD
|
||||
@@ -247,7 +238,6 @@ words:
|
||||
- soci
|
||||
- socidb
|
||||
- sslws
|
||||
- stackful
|
||||
- statsd
|
||||
- STATSDCOLLECTOR
|
||||
- stissue
|
||||
@@ -266,7 +256,6 @@ words:
|
||||
- takerpays
|
||||
- ters
|
||||
- TMEndpointv2
|
||||
- TOCTOU
|
||||
- trixie
|
||||
- tx
|
||||
- txid
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <limits>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <type_traits>
|
||||
#include <vector>
|
||||
|
||||
@@ -231,4 +232,11 @@ makeSlice(std::basic_string<char, Traits, Alloc> const& s)
|
||||
return Slice(s.data(), s.size());
|
||||
}
|
||||
|
||||
template <class Traits>
|
||||
Slice
|
||||
makeSlice(std::basic_string_view<char, Traits> const& s)
|
||||
{
|
||||
return Slice(s.data(), s.size());
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
122
include/xrpl/core/Coro.ipp
Normal file
122
include/xrpl/core/Coro.ipp
Normal file
@@ -0,0 +1,122 @@
|
||||
#pragma once
|
||||
|
||||
#include <xrpl/basics/ByteUtilities.h>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
template <class F>
|
||||
JobQueue::Coro::Coro(Coro_create_t, JobQueue& jq, JobType type, std::string const& name, F&& f)
|
||||
: jq_(jq)
|
||||
, type_(type)
|
||||
, name_(name)
|
||||
, running_(false)
|
||||
, coro_(
|
||||
[this, fn = std::forward<F>(f)](
|
||||
boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield) {
|
||||
yield_ = &do_yield;
|
||||
yield();
|
||||
fn(shared_from_this());
|
||||
#ifndef NDEBUG
|
||||
finished_ = true;
|
||||
#endif
|
||||
},
|
||||
boost::coroutines::attributes(megabytes(1)))
|
||||
{
|
||||
}
|
||||
|
||||
inline JobQueue::Coro::~Coro()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
XRPL_ASSERT(finished_, "xrpl::JobQueue::Coro::~Coro : is finished");
|
||||
#endif
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::Coro::yield() const
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
++jq_.nSuspend_;
|
||||
}
|
||||
(*yield_)();
|
||||
}
|
||||
|
||||
inline bool
|
||||
JobQueue::Coro::post()
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = true;
|
||||
}
|
||||
|
||||
// sp keeps 'this' alive
|
||||
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// The coroutine will not run. Clean up running_.
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = false;
|
||||
cv_.notify_all();
|
||||
return false;
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::Coro::resume()
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = true;
|
||||
}
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
}
|
||||
auto saved = detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(&lvs_);
|
||||
std::lock_guard lock(mutex_);
|
||||
XRPL_ASSERT(static_cast<bool>(coro_), "xrpl::JobQueue::Coro::resume : is runnable");
|
||||
coro_();
|
||||
detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(saved);
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = false;
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
inline bool
|
||||
JobQueue::Coro::runnable() const
|
||||
{
|
||||
return static_cast<bool>(coro_);
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::Coro::expectEarlyExit()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
if (!finished_)
|
||||
#endif
|
||||
{
|
||||
// expectEarlyExit() must only ever be called from outside the
|
||||
// Coro's stack. It you're inside the stack you can simply return
|
||||
// and be done.
|
||||
//
|
||||
// That said, since we're outside the Coro's stack, we need to
|
||||
// decrement the nSuspend that the Coro's call to yield caused.
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
#ifndef NDEBUG
|
||||
finished_ = true;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
inline void
|
||||
JobQueue::Coro::join()
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex_run_);
|
||||
cv_.wait(lk, [this]() { return running_ == false; });
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -1,699 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <xrpl/beast/utility/instrumentation.h>
|
||||
|
||||
#include <coroutine>
|
||||
#include <exception>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
template <typename T = void>
|
||||
class CoroTask;
|
||||
|
||||
/**
|
||||
* CoroTask<void> -- coroutine return type for void-returning coroutines.
|
||||
*
|
||||
* Class / Dependency Diagram
|
||||
* ==========================
|
||||
*
|
||||
* CoroTask<void>
|
||||
* +-----------------------------------------------+
|
||||
* | - handle_ : Handle (coroutine_handle<promise>) |
|
||||
* +-----------------------------------------------+
|
||||
* | + handle(), done() |
|
||||
* | + await_ready/suspend/resume (Awaiter iface) |
|
||||
* +-----------------------------------------------+
|
||||
* | owns
|
||||
* v
|
||||
* promise_type
|
||||
* +-----------------------------------------------+
|
||||
* | - exception_ : std::exception_ptr |
|
||||
* | - continuation_ : std::coroutine_handle<> |
|
||||
* +-----------------------------------------------+
|
||||
* | + get_return_object() -> CoroTask |
|
||||
* | + initial_suspend() -> suspend_always (lazy) |
|
||||
* | + final_suspend() -> FinalAwaiter |
|
||||
* | + return_void() |
|
||||
* | + unhandled_exception() |
|
||||
* +-----------------------------------------------+
|
||||
* | returns at final_suspend
|
||||
* v
|
||||
* FinalAwaiter
|
||||
* +-----------------------------------------------+
|
||||
* | await_suspend(h): |
|
||||
* | if continuation_ set -> symmetric transfer |
|
||||
* | else -> noop_coroutine |
|
||||
* +-----------------------------------------------+
|
||||
*
|
||||
* Design Notes
|
||||
* ------------
|
||||
* - Lazy start: initial_suspend returns suspend_always, so the coroutine
|
||||
* body does not execute until the handle is explicitly resumed.
|
||||
* - Symmetric transfer: await_suspend returns a coroutine_handle instead
|
||||
* of void/bool, allowing the scheduler to jump directly to the next
|
||||
* coroutine without growing the call stack.
|
||||
* - Continuation chaining: when one CoroTask is co_await-ed inside
|
||||
* another, the caller's handle is stored as continuation_ so
|
||||
* FinalAwaiter can resume it when this task finishes.
|
||||
* - Move-only: the handle is exclusively owned; copy is deleted.
|
||||
*
|
||||
* Usage Examples
|
||||
* ==============
|
||||
*
|
||||
* 1. Basic void coroutine (the most common case in rippled):
|
||||
*
|
||||
* CoroTask<void> doWork(std::shared_ptr<CoroTaskRunner> runner) {
|
||||
* // do something
|
||||
* co_await runner->suspend(); // yield control
|
||||
* // resumed later via runner->post() or runner->resume()
|
||||
* co_return;
|
||||
* }
|
||||
*
|
||||
* 2. co_await-ing one CoroTask<void> from another (chaining):
|
||||
*
|
||||
* CoroTask<void> inner() {
|
||||
* // ...
|
||||
* co_return;
|
||||
* }
|
||||
* CoroTask<void> outer() {
|
||||
* co_await inner(); // continuation_ links outer -> inner
|
||||
* co_return; // FinalAwaiter resumes outer
|
||||
* }
|
||||
*
|
||||
* 3. Exceptions propagate through co_await:
|
||||
*
|
||||
* CoroTask<void> failing() {
|
||||
* throw std::runtime_error("oops");
|
||||
* co_return;
|
||||
* }
|
||||
* CoroTask<void> caller() {
|
||||
* try { co_await failing(); }
|
||||
* catch (std::runtime_error const&) { // caught here }
|
||||
* }
|
||||
*
|
||||
* Caveats / Pitfalls
|
||||
* ==================
|
||||
*
|
||||
* BUG-RISK: Dangling references in coroutine parameters.
|
||||
* Coroutine parameters are copied into the frame, but references
|
||||
* are NOT -- they are stored as-is. If the referent goes out of scope
|
||||
* before the coroutine finishes, you get use-after-free.
|
||||
*
|
||||
* // BROKEN -- local dies before coroutine runs:
|
||||
* CoroTask<void> bad(int& ref) { co_return; }
|
||||
* void launch() {
|
||||
* int local = 42;
|
||||
* auto task = bad(local); // frame stores &local
|
||||
* } // local destroyed; frame holds dangling ref
|
||||
*
|
||||
* // FIX -- pass by value, or ensure lifetime via shared_ptr.
|
||||
*
|
||||
* BUG-RISK: GCC 14 corrupts reference captures in coroutine lambdas.
|
||||
* When a lambda that returns CoroTask captures by reference ([&]),
|
||||
* GCC 14 may generate a corrupted coroutine frame. Always capture
|
||||
* by explicit pointer-to-value instead:
|
||||
*
|
||||
* // BROKEN on GCC 14:
|
||||
* jq.postCoroTask(t, n, [&](auto) -> CoroTask<void> { ... });
|
||||
*
|
||||
* // FIX -- capture pointers explicitly:
|
||||
* jq.postCoroTask(t, n, [ptr = &val](auto) -> CoroTask<void> { ... });
|
||||
*
|
||||
* BUG-RISK: Resuming a destroyed or completed CoroTask.
|
||||
* Calling handle().resume() after the coroutine has already run to
|
||||
* completion (done() == true) is undefined behavior. The CoroTaskRunner
|
||||
* guards against this with an XRPL_ASSERT, but standalone usage of
|
||||
* CoroTask must check done() before resuming.
|
||||
*
|
||||
* BUG-RISK: Moving a CoroTask that is being awaited.
|
||||
* If task A is co_await-ed by task B (so A.continuation_ == B), moving
|
||||
* or destroying A will invalidate the continuation link. Never move
|
||||
* or reassign a CoroTask while it is mid-execution or being awaited.
|
||||
*
|
||||
* LIMITATION: CoroTask is fire-and-forget for the top-level owner.
|
||||
* There is no built-in notification when the coroutine finishes.
|
||||
* The caller must use external synchronization (e.g. CoroTaskRunner::join
|
||||
* or a gate/condition_variable) to know when it is done.
|
||||
*
|
||||
* LIMITATION: No cancellation token.
|
||||
* There is no way to cancel a suspended CoroTask from outside. The
|
||||
* coroutine body must cooperatively check a flag (e.g. jq_.isStopping())
|
||||
* after each co_await and co_return early if needed.
|
||||
*
|
||||
* LIMITATION: Stackless -- cannot suspend from nested non-coroutine calls.
|
||||
* If a coroutine calls a regular function that wants to "yield", it
|
||||
* cannot. Only the immediate coroutine body can use co_await.
|
||||
* This is acceptable for rippled because all yield() sites are shallow.
|
||||
*/
|
||||
template <>
|
||||
class CoroTask<void>
|
||||
{
|
||||
public:
|
||||
struct promise_type;
|
||||
using Handle = std::coroutine_handle<promise_type>;
|
||||
|
||||
/**
|
||||
* Coroutine promise. Compiler uses this to manage coroutine state.
|
||||
* Stores the exception (if any) and the continuation handle for
|
||||
* symmetric transfer back to the awaiting coroutine.
|
||||
*/
|
||||
struct promise_type
|
||||
{
|
||||
// Captured exception from the coroutine body, rethrown in
|
||||
// await_resume() when this task is co_await-ed by a caller.
|
||||
std::exception_ptr exception_;
|
||||
|
||||
// Handle to the coroutine that is co_await-ing this task.
|
||||
// Set by await_suspend(). FinalAwaiter uses it for symmetric
|
||||
// transfer back to the caller. Null if this is a top-level task.
|
||||
std::coroutine_handle<> continuation_;
|
||||
|
||||
/**
|
||||
* Create the CoroTask return object.
|
||||
* Called by the compiler at coroutine creation.
|
||||
*/
|
||||
CoroTask
|
||||
get_return_object()
|
||||
{
|
||||
return CoroTask{Handle::from_promise(*this)};
|
||||
}
|
||||
|
||||
/**
|
||||
* Lazy start. The coroutine body does not execute until the
|
||||
* handle is explicitly resumed (e.g. by CoroTaskRunner::resume).
|
||||
*/
|
||||
std::suspend_always
|
||||
initial_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Awaiter returned by final_suspend(). Uses symmetric transfer:
|
||||
* if a continuation exists, transfers control directly to it
|
||||
* (tail-call, no stack growth). Otherwise returns noop_coroutine
|
||||
* so the coroutine frame stays alive for the owner to destroy.
|
||||
*/
|
||||
struct FinalAwaiter
|
||||
{
|
||||
/**
|
||||
* Always false. We need await_suspend to run for
|
||||
* symmetric transfer.
|
||||
*/
|
||||
bool
|
||||
await_ready() noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Symmetric transfer: returns the continuation handle so
|
||||
* the compiler emits a tail-call instead of a nested resume.
|
||||
* If no continuation is set, returns noop_coroutine to
|
||||
* suspend at final_suspend without destroying the frame.
|
||||
*
|
||||
* @param h Handle to this completing coroutine
|
||||
*
|
||||
* @return Continuation handle, or noop_coroutine
|
||||
*/
|
||||
std::coroutine_handle<>
|
||||
await_suspend(Handle h) noexcept
|
||||
{
|
||||
if (auto cont = h.promise().continuation_)
|
||||
return cont;
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns FinalAwaiter for symmetric transfer at coroutine end.
|
||||
*/
|
||||
FinalAwaiter
|
||||
final_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the compiler for `co_return;` (void coroutine).
|
||||
*/
|
||||
void
|
||||
return_void()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the compiler when an exception escapes the coroutine
|
||||
* body. Captures it for later rethrowing in await_resume().
|
||||
*/
|
||||
void
|
||||
unhandled_exception()
|
||||
{
|
||||
exception_ = std::current_exception();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Default constructor. Creates an empty (null handle) task.
|
||||
*/
|
||||
CoroTask() = default;
|
||||
|
||||
/**
|
||||
* Takes ownership of a compiler-generated coroutine handle.
|
||||
*
|
||||
* @param h Coroutine handle to own
|
||||
*/
|
||||
explicit CoroTask(Handle h) : handle_(h)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroys the coroutine frame if this task owns one.
|
||||
*/
|
||||
~CoroTask()
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
}
|
||||
|
||||
/**
|
||||
* Move constructor. Transfers handle ownership, leaves other empty.
|
||||
*/
|
||||
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Move assignment. Destroys current frame (if any), takes other's.
|
||||
*/
|
||||
CoroTask&
|
||||
operator=(CoroTask&& other) noexcept
|
||||
{
|
||||
if (this != &other)
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
handle_ = std::exchange(other.handle_, {});
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
CoroTask(CoroTask const&) = delete;
|
||||
CoroTask&
|
||||
operator=(CoroTask const&) = delete;
|
||||
|
||||
/**
|
||||
* @return The underlying coroutine_handle
|
||||
*/
|
||||
Handle
|
||||
handle() const
|
||||
{
|
||||
return handle_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the coroutine has run to completion (or thrown)
|
||||
*/
|
||||
bool
|
||||
done() const
|
||||
{
|
||||
return handle_ && handle_.done();
|
||||
}
|
||||
|
||||
// -- Awaiter interface: allows `co_await someCoroTask;` --
|
||||
|
||||
/**
|
||||
* Always false. This task is lazy, so co_await always suspends
|
||||
* the caller to set up the continuation link.
|
||||
*/
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the caller's handle as our continuation, then returns
|
||||
* our handle for symmetric transfer (caller suspends, we resume).
|
||||
*
|
||||
* @param caller Handle of the coroutine doing co_await on us
|
||||
*
|
||||
* @return Our handle for symmetric transfer
|
||||
*/
|
||||
std::coroutine_handle<>
|
||||
await_suspend(std::coroutine_handle<> caller) noexcept
|
||||
{
|
||||
XRPL_ASSERT(handle_, "xrpl::CoroTask<void>::await_suspend : handle is valid");
|
||||
handle_.promise().continuation_ = caller;
|
||||
return handle_; // Symmetric transfer
|
||||
}
|
||||
|
||||
/**
|
||||
* Called in the awaiting coroutine's context after this task
|
||||
* completes. Rethrows any exception captured by
|
||||
* unhandled_exception().
|
||||
*/
|
||||
void
|
||||
await_resume()
|
||||
{
|
||||
XRPL_ASSERT(handle_, "xrpl::CoroTask<void>::await_resume : handle is valid");
|
||||
if (auto& ep = handle_.promise().exception_)
|
||||
std::rethrow_exception(ep);
|
||||
}
|
||||
|
||||
private:
|
||||
// Exclusively-owned coroutine handle. Null after move or default
|
||||
// construction. Destroyed in the destructor.
|
||||
Handle handle_;
|
||||
};
|
||||
|
||||
/**
|
||||
* CoroTask<T> -- coroutine return type for value-returning coroutines.
|
||||
*
|
||||
* Class / Dependency Diagram
|
||||
* ==========================
|
||||
*
|
||||
* CoroTask<T>
|
||||
* +-----------------------------------------------+
|
||||
* | - handle_ : Handle (coroutine_handle<promise>) |
|
||||
* +-----------------------------------------------+
|
||||
* | + handle(), done() |
|
||||
* | + await_ready/suspend/resume (Awaiter iface) |
|
||||
* +-----------------------------------------------+
|
||||
* | owns
|
||||
* v
|
||||
* promise_type
|
||||
* +-----------------------------------------------+
|
||||
* | - result_ : variant<monostate, T, |
|
||||
* | exception_ptr> |
|
||||
* | - continuation_ : std::coroutine_handle<> |
|
||||
* +-----------------------------------------------+
|
||||
* | + get_return_object() -> CoroTask |
|
||||
* | + initial_suspend() -> suspend_always (lazy) |
|
||||
* | + final_suspend() -> FinalAwaiter |
|
||||
* | + return_value(T) -> stores in result_[1] |
|
||||
* | + unhandled_exception -> stores in result_[2] |
|
||||
* +-----------------------------------------------+
|
||||
* | returns at final_suspend
|
||||
* v
|
||||
* FinalAwaiter (same symmetric-transfer pattern as CoroTask<void>)
|
||||
*
|
||||
* Value Extraction
|
||||
* ----------------
|
||||
* await_resume() inspects the variant:
|
||||
* - index 2 (exception_ptr) -> rethrow
|
||||
* - index 1 (T) -> return value via move
|
||||
*
|
||||
* Usage Examples
|
||||
* ==============
|
||||
*
|
||||
* 1. Simple value return:
|
||||
*
|
||||
* CoroTask<int> computeAnswer() { co_return 42; }
|
||||
*
|
||||
* CoroTask<void> caller() {
|
||||
* int v = co_await computeAnswer(); // v == 42
|
||||
* }
|
||||
*
|
||||
* 2. Chaining value-returning coroutines:
|
||||
*
|
||||
* CoroTask<int> add(int a, int b) { co_return a + b; }
|
||||
* CoroTask<int> doubleSum(int a, int b) {
|
||||
* int s = co_await add(a, b);
|
||||
* co_return s * 2;
|
||||
* }
|
||||
*
|
||||
* 3. Exception propagation from inner to outer:
|
||||
*
|
||||
* CoroTask<int> failing() {
|
||||
* throw std::runtime_error("bad");
|
||||
* co_return 0; // never reached
|
||||
* }
|
||||
* CoroTask<void> caller() {
|
||||
* try {
|
||||
* int v = co_await failing(); // throws here
|
||||
* } catch (std::runtime_error const& e) {
|
||||
* // e.what() == "bad"
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* Caveats / Pitfalls (in addition to CoroTask<void> caveats above)
|
||||
* ================================================================
|
||||
*
|
||||
* BUG-RISK: await_resume() moves the value out of the variant.
|
||||
* Calling co_await on the same CoroTask<T> instance twice is undefined
|
||||
* behavior -- the second call will see a moved-from T. CoroTask is
|
||||
* single-shot: one co_return, one co_await.
|
||||
*
|
||||
* BUG-RISK: T must be move-constructible.
|
||||
* return_value(T) takes by value and moves into the variant.
|
||||
* Types that are not movable cannot be used as T.
|
||||
*
|
||||
* LIMITATION: No co_yield support.
|
||||
* CoroTask<T> only supports a single co_return. It does not implement
|
||||
* yield_value(), so using co_yield inside a CoroTask<T> coroutine is a
|
||||
* compile error. For streaming values, a different return type
|
||||
* (e.g. Generator<T>) would be needed.
|
||||
*
|
||||
* LIMITATION: Result is only accessible via co_await.
|
||||
* There is no .get() or .result() method. The value can only be
|
||||
* extracted by co_await-ing the CoroTask<T> from inside another
|
||||
* coroutine. For extracting results in non-coroutine code, pass a
|
||||
* pointer to the caller and write through it (as the tests do).
|
||||
*/
|
||||
template <typename T>
|
||||
class CoroTask
|
||||
{
|
||||
static_assert(
|
||||
std::is_move_constructible_v<T>,
|
||||
"CoroTask<T> requires T to be move-constructible");
|
||||
|
||||
public:
|
||||
struct promise_type;
|
||||
using Handle = std::coroutine_handle<promise_type>;
|
||||
|
||||
/**
|
||||
* Coroutine promise for value-returning coroutines.
|
||||
* Stores the result as a variant: monostate (not yet set),
|
||||
* T (co_return value), or exception_ptr (unhandled exception).
|
||||
*/
|
||||
struct promise_type
|
||||
{
|
||||
// Tri-state result:
|
||||
// index 0 (monostate) -- coroutine has not yet completed
|
||||
// index 1 (T) -- co_return value stored here
|
||||
// index 2 (exception) -- unhandled exception captured here
|
||||
std::variant<std::monostate, T, std::exception_ptr> result_;
|
||||
|
||||
// Handle to the coroutine co_await-ing this task. Used by
|
||||
// FinalAwaiter for symmetric transfer. Null for top-level tasks.
|
||||
std::coroutine_handle<> continuation_;
|
||||
|
||||
/**
|
||||
* Create the CoroTask return object.
|
||||
* Called by the compiler at coroutine creation.
|
||||
*/
|
||||
CoroTask
|
||||
get_return_object()
|
||||
{
|
||||
return CoroTask{Handle::from_promise(*this)};
|
||||
}
|
||||
|
||||
/**
|
||||
* Lazy start. Coroutine body does not run until explicitly resumed.
|
||||
*/
|
||||
std::suspend_always
|
||||
initial_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Symmetric-transfer awaiter at coroutine completion.
|
||||
* Same pattern as CoroTask<void>::FinalAwaiter.
|
||||
*/
|
||||
struct FinalAwaiter
|
||||
{
|
||||
bool
|
||||
await_ready() noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns continuation for symmetric transfer, or
|
||||
* noop_coroutine if this is a top-level task.
|
||||
*
|
||||
* @param h Handle to this completing coroutine
|
||||
*
|
||||
* @return Continuation handle, or noop_coroutine
|
||||
*/
|
||||
std::coroutine_handle<>
|
||||
await_suspend(Handle h) noexcept
|
||||
{
|
||||
if (auto cont = h.promise().continuation_)
|
||||
return cont;
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
FinalAwaiter
|
||||
final_suspend() noexcept
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the compiler for `co_return value;`.
|
||||
* Moves the value into result_ at index 1.
|
||||
*
|
||||
* @param value The value to store
|
||||
*/
|
||||
void
|
||||
return_value(T value)
|
||||
{
|
||||
result_.template emplace<1>(std::move(value));
|
||||
}
|
||||
|
||||
/**
|
||||
* Captures unhandled exceptions at index 2 of result_.
|
||||
* Rethrown later in await_resume().
|
||||
*/
|
||||
void
|
||||
unhandled_exception()
|
||||
{
|
||||
result_.template emplace<2>(std::current_exception());
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Default constructor. Creates an empty (null handle) task.
|
||||
*/
|
||||
CoroTask() = default;
|
||||
|
||||
/**
|
||||
* Takes ownership of a compiler-generated coroutine handle.
|
||||
*
|
||||
* @param h Coroutine handle to own
|
||||
*/
|
||||
explicit CoroTask(Handle h) : handle_(h)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroys the coroutine frame if this task owns one.
|
||||
*/
|
||||
~CoroTask()
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
}
|
||||
|
||||
/**
|
||||
* Move constructor. Transfers handle ownership, leaves other empty.
|
||||
*/
|
||||
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Move assignment. Destroys current frame (if any), takes other's.
|
||||
*/
|
||||
CoroTask&
|
||||
operator=(CoroTask&& other) noexcept
|
||||
{
|
||||
if (this != &other)
|
||||
{
|
||||
if (handle_)
|
||||
handle_.destroy();
|
||||
handle_ = std::exchange(other.handle_, {});
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
CoroTask(CoroTask const&) = delete;
|
||||
CoroTask&
|
||||
operator=(CoroTask const&) = delete;
|
||||
|
||||
/**
|
||||
* @return The underlying coroutine_handle
|
||||
*/
|
||||
Handle
|
||||
handle() const
|
||||
{
|
||||
return handle_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the coroutine has run to completion (or thrown)
|
||||
*/
|
||||
bool
|
||||
done() const
|
||||
{
|
||||
return handle_ && handle_.done();
|
||||
}
|
||||
|
||||
// -- Awaiter interface: allows `T val = co_await someCoroTask;` --
|
||||
|
||||
/**
|
||||
* Always false. co_await always suspends to set up continuation.
|
||||
*/
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores caller as continuation, returns our handle for
|
||||
* symmetric transfer.
|
||||
*
|
||||
* @param caller Handle of the coroutine doing co_await on us
|
||||
*
|
||||
* @return Our handle for symmetric transfer
|
||||
*/
|
||||
std::coroutine_handle<>
|
||||
await_suspend(std::coroutine_handle<> caller) noexcept
|
||||
{
|
||||
XRPL_ASSERT(handle_, "xrpl::CoroTask<T>::await_suspend : handle is valid");
|
||||
handle_.promise().continuation_ = caller;
|
||||
return handle_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the result: rethrows if exception, otherwise moves
|
||||
* the T value out of the variant. Single-shot: calling twice
|
||||
* on the same task is undefined (moved-from T).
|
||||
*
|
||||
* @return The co_return-ed value
|
||||
*/
|
||||
T
|
||||
await_resume()
|
||||
{
|
||||
XRPL_ASSERT(handle_, "xrpl::CoroTask<T>::await_resume : handle is valid");
|
||||
auto& result = handle_.promise().result_;
|
||||
if (auto* ep = std::get_if<2>(&result))
|
||||
std::rethrow_exception(*ep);
|
||||
return std::get<1>(std::move(result));
|
||||
}
|
||||
|
||||
private:
|
||||
// Exclusively-owned coroutine handle. Null after move or default
|
||||
// construction. Destroyed in the destructor.
|
||||
Handle handle_;
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -1,378 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* @file CoroTaskRunner.ipp
|
||||
*
|
||||
* CoroTaskRunner inline implementation.
|
||||
*
|
||||
* This file contains the business logic for managing C++20 coroutines
|
||||
* on the JobQueue. It is included at the bottom of JobQueue.h.
|
||||
*
|
||||
* Data Flow: suspend / post / resume cycle
|
||||
* =========================================
|
||||
*
|
||||
* coroutine body CoroTaskRunner JobQueue
|
||||
* -------------- -------------- --------
|
||||
* |
|
||||
* co_await runner->suspend()
|
||||
* |
|
||||
* +--- await_suspend ------> onSuspend()
|
||||
* | ++nSuspend_ ------------> nSuspend_
|
||||
* | [coroutine is now suspended]
|
||||
* |
|
||||
* . (externally or by yieldAndPost())
|
||||
* .
|
||||
* +--- (caller calls) -----> post()
|
||||
* | ++runCount_
|
||||
* | addJob(resume) ----------> job enqueued
|
||||
* | |
|
||||
* | [worker picks up]
|
||||
* | |
|
||||
* +--- <----- resume() <-----------------------------------+
|
||||
* | --nSuspend_ ------> nSuspend_
|
||||
* | swap in LocalValues (lvs_)
|
||||
* | task_.handle().resume()
|
||||
* | |
|
||||
* | [coroutine body continues here]
|
||||
* | |
|
||||
* | swap out LocalValues
|
||||
* | --runCount_
|
||||
* | cv_.notify_all()
|
||||
* v
|
||||
*
|
||||
* Thread Safety
|
||||
* =============
|
||||
* - mutex_ : guards task_.handle().resume() so that post()-before-suspend
|
||||
* races cannot resume the coroutine while it is still running.
|
||||
* (See the race condition discussion in JobQueue.h)
|
||||
* - mutex_run_ : guards runCount_ counter; used by join() to wait until
|
||||
* all in-flight resume operations complete.
|
||||
* - jq_.m_mutex: guards nSuspend_ increments/decrements.
|
||||
*
|
||||
* Common Mistakes When Modifying This File
|
||||
* =========================================
|
||||
*
|
||||
* 1. Changing lock ordering.
|
||||
* resume() acquires locks sequentially (never held simultaneously):
|
||||
* jq_.m_mutex (released immediately), then mutex_ (held across resume),
|
||||
* then mutex_run_ (released after decrement). post() acquires only
|
||||
* mutex_run_. Any new code path must follow the same order.
|
||||
*
|
||||
* 2. Removing the shared_from_this() capture in post().
|
||||
* The lambda passed to addJob captures [this, sp = shared_from_this()].
|
||||
* If you remove sp, 'this' can be destroyed before the job runs,
|
||||
* causing use-after-free. The sp capture is load-bearing.
|
||||
*
|
||||
* 3. Forgetting to decrement nSuspend_ on a new code path.
|
||||
* Every ++nSuspend_ must have a matching --nSuspend_. If you add a new
|
||||
* suspension path (e.g. a new awaiter) and forget to decrement on resume
|
||||
* or on failure, JobQueue::stop() will hang.
|
||||
*
|
||||
* 4. Calling task_.handle().resume() without holding mutex_.
|
||||
* This allows a race where the coroutine runs on two threads
|
||||
* simultaneously. Always hold mutex_ around resume().
|
||||
*
|
||||
* 5. Swapping LocalValues outside of the mutex_ critical section.
|
||||
* The swap-in and swap-out of LocalValues must bracket the resume()
|
||||
* call. If you move the swap-out before the lock_guard(mutex_) is
|
||||
* released, you break LocalValue isolation for any code that runs
|
||||
* after the coroutine suspends but before the lock is dropped.
|
||||
*/
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/**
|
||||
* Construct a CoroTaskRunner. Sets runCount_ to 0; does not
|
||||
* create the coroutine. Call init() afterwards.
|
||||
*
|
||||
* @param jq The JobQueue this coroutine will run on
|
||||
* @param type Job type for scheduling priority
|
||||
* @param name Human-readable name for logging
|
||||
*/
|
||||
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
|
||||
create_t,
|
||||
JobQueue& jq,
|
||||
JobType type,
|
||||
std::string const& name)
|
||||
: jq_(jq), type_(type), name_(name), runCount_(0)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize with a coroutine-returning callable.
|
||||
* Stores the callable on the heap (FuncStore) so it outlives the
|
||||
* coroutine frame. Coroutine frames store a reference to the
|
||||
* callable's implicit object parameter (the lambda). If the callable
|
||||
* is a temporary, that reference dangles after the caller returns.
|
||||
* Keeping the callable alive here ensures the coroutine's captures
|
||||
* remain valid.
|
||||
*
|
||||
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
|
||||
*/
|
||||
template <class F>
|
||||
void
|
||||
JobQueue::CoroTaskRunner::init(F&& f)
|
||||
{
|
||||
using Fn = std::decay_t<F>;
|
||||
auto store = std::make_unique<FuncStore<Fn>>(std::forward<F>(f));
|
||||
task_ = store->func(shared_from_this());
|
||||
storedFunc_ = std::move(store);
|
||||
}
|
||||
|
||||
/**
|
||||
* Destructor. Waits for any in-flight resume() to complete, then
|
||||
* asserts (debug) that the coroutine has finished or
|
||||
* expectEarlyExit() was called.
|
||||
*
|
||||
* The join() call is necessary because with async dispatch the
|
||||
* coroutine runs on a worker thread. The gate signal (which wakes
|
||||
* the test thread) can arrive before resume() has set finished_.
|
||||
* join() synchronizes via mutex_run_, establishing a happens-before
|
||||
* edge: finished_ = true -> unlock(mutex_run_) in resume() ->
|
||||
* lock(mutex_run_) in join() -> read finished_.
|
||||
*/
|
||||
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
join();
|
||||
XRPL_ASSERT(
|
||||
finished_.load(std::memory_order_acquire),
|
||||
"xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
|
||||
*/
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::onSuspend()
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
++jq_.nSuspend_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrement nSuspend_ without resuming.
|
||||
*/
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::onUndoSuspend()
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a SuspendAwaiter whose await_suspend() increments nSuspend_
|
||||
* before the coroutine actually suspends. The caller must later call
|
||||
* post() or resume() to continue execution.
|
||||
*
|
||||
* @return Awaiter for use with `co_await runner->suspend()`
|
||||
*/
|
||||
inline auto
|
||||
JobQueue::CoroTaskRunner::suspend()
|
||||
{
|
||||
/**
|
||||
* Custom awaiter for suspend(). Always suspends (await_ready
|
||||
* returns false) and increments nSuspend_ in await_suspend().
|
||||
*/
|
||||
struct SuspendAwaiter
|
||||
{
|
||||
CoroTaskRunner& runner_; // The runner that owns this coroutine.
|
||||
|
||||
/**
|
||||
* Always returns false so the coroutine suspends.
|
||||
*/
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the coroutine suspends. Increments nSuspend_
|
||||
* so the JobQueue knows a coroutine is waiting.
|
||||
*/
|
||||
void
|
||||
await_suspend(std::coroutine_handle<>) const
|
||||
{
|
||||
runner_.onSuspend();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() const noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
return SuspendAwaiter{*this};
|
||||
}
|
||||
|
||||
/**
|
||||
* Suspend and immediately repost on the JobQueue. Equivalent to
|
||||
* `co_await JobQueueAwaiter{runner}` but uses an inline struct
|
||||
* to work around a GCC-12 codegen bug (see declaration in JobQueue.h).
|
||||
*
|
||||
* If the JobQueue is stopping (post fails), the suspend count is
|
||||
* undone and the coroutine is resumed immediately via h.resume().
|
||||
*
|
||||
* @return An inline YieldPostAwaiter
|
||||
*/
|
||||
inline auto
|
||||
JobQueue::CoroTaskRunner::yieldAndPost()
|
||||
{
|
||||
struct YieldPostAwaiter
|
||||
{
|
||||
CoroTaskRunner& runner_;
|
||||
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
await_suspend(std::coroutine_handle<> h)
|
||||
{
|
||||
runner_.onSuspend();
|
||||
if (!runner_.post())
|
||||
{
|
||||
runner_.onUndoSuspend();
|
||||
h.resume();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() const noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
return YieldPostAwaiter{*this};
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule coroutine resumption as a job on the JobQueue.
|
||||
* A shared_ptr capture (sp) prevents this CoroTaskRunner from being
|
||||
* destroyed while the job is queued but not yet executed.
|
||||
*
|
||||
* @return false if the JobQueue rejected the job (shutting down)
|
||||
*/
|
||||
inline bool
|
||||
JobQueue::CoroTaskRunner::post()
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
++runCount_;
|
||||
}
|
||||
|
||||
// sp prevents 'this' from being destroyed while the job is pending
|
||||
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// The coroutine will not run. Undo the runCount_ increment.
|
||||
std::lock_guard lk(mutex_run_);
|
||||
--runCount_;
|
||||
cv_.notify_all();
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume the coroutine on the current thread.
|
||||
*
|
||||
* Steps:
|
||||
* 1. Decrement nSuspend_ (under jq_.m_mutex)
|
||||
* 2. Swap in this coroutine's LocalValues for thread-local isolation
|
||||
* 3. Resume the coroutine handle (under mutex_)
|
||||
* 4. Swap out LocalValues, restoring the thread's previous state
|
||||
* 5. Decrement runCount_ and notify join() waiters
|
||||
*
|
||||
* @pre post() must have been called before resume(). Direct calls
|
||||
* without a prior post() will corrupt runCount_ and break join().
|
||||
* Note: runCount_ is NOT incremented here — post() already did that.
|
||||
* This ensures join() stays blocked for the entire post->resume lifetime.
|
||||
*/
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::resume()
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
}
|
||||
auto saved = detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(&lvs_);
|
||||
std::lock_guard lock(mutex_);
|
||||
XRPL_ASSERT(
|
||||
task_.handle() && !task_.done(),
|
||||
"xrpl::JobQueue::CoroTaskRunner::resume : task handle is valid and not done");
|
||||
task_.handle().resume();
|
||||
detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(saved);
|
||||
if (task_.done())
|
||||
{
|
||||
finished_.store(true, std::memory_order_release);
|
||||
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
|
||||
// Use std::move (not task_ = {}) so task_.handle_ is null BEFORE the
|
||||
// frame is destroyed. operator= would destroy the frame while handle_
|
||||
// still holds the old value -- a re-entrancy hazard on GCC-12 if
|
||||
// frame destruction triggers runner cleanup.
|
||||
[[maybe_unused]] auto completed = std::move(task_);
|
||||
}
|
||||
std::lock_guard lk(mutex_run_);
|
||||
--runCount_;
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the coroutine has not yet run to completion.
|
||||
*
|
||||
* Uses the atomic finished_ flag instead of reading task_ directly,
|
||||
* because task_ is modified in resume() under mutex_ and reading it
|
||||
* here without a lock would be a data race visible to TSAN.
|
||||
*/
|
||||
inline bool
|
||||
JobQueue::CoroTaskRunner::runnable() const
|
||||
{
|
||||
return !finished_.load(std::memory_order_acquire);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle early termination when the coroutine never ran (e.g. JobQueue
|
||||
* is stopping). Decrements nSuspend_ and destroys the coroutine frame
|
||||
* to break the shared_ptr cycle: frame -> lambda -> runner -> frame.
|
||||
*/
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::expectEarlyExit()
|
||||
{
|
||||
if (!finished_.load(std::memory_order_acquire))
|
||||
{
|
||||
std::lock_guard lock(jq_.m_mutex);
|
||||
--jq_.nSuspend_;
|
||||
finished_.store(true, std::memory_order_release);
|
||||
}
|
||||
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
|
||||
// The coroutine is at initial_suspend and never ran user code, so
|
||||
// destroying it is safe. Use std::move (not task_ = {}) so
|
||||
// task_.handle_ is null before the frame is destroyed.
|
||||
{
|
||||
[[maybe_unused]] auto completed = std::move(task_);
|
||||
}
|
||||
storedFunc_.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
* Block until all pending/active resume operations complete.
|
||||
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0 or
|
||||
* finished_ becomes true. The finished_ check handles the case
|
||||
* where resume() is called directly (without post()), which
|
||||
* decrements runCount_ below zero. In that scenario runCount_
|
||||
* never returns to 0, but finished_ becoming true guarantees
|
||||
* the coroutine is done and no more resumes will occur.
|
||||
*/
|
||||
inline void
|
||||
JobQueue::CoroTaskRunner::join()
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex_run_);
|
||||
cv_.wait(lk, [this]() { return runCount_ == 0 || finished_.load(std::memory_order_acquire); });
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -2,13 +2,13 @@
|
||||
|
||||
#include <xrpl/basics/LocalValue.h>
|
||||
#include <xrpl/core/ClosureCounter.h>
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/core/JobTypeData.h>
|
||||
#include <xrpl/core/JobTypes.h>
|
||||
#include <xrpl/core/detail/Workers.h>
|
||||
#include <xrpl/json/json_value.h>
|
||||
|
||||
#include <coroutine>
|
||||
#include <boost/coroutine/all.hpp>
|
||||
|
||||
#include <set>
|
||||
|
||||
namespace xrpl {
|
||||
@@ -18,6 +18,10 @@ class PerfLog;
|
||||
}
|
||||
|
||||
class Logs;
|
||||
struct Coro_create_t
|
||||
{
|
||||
explicit Coro_create_t() = default;
|
||||
};
|
||||
|
||||
/** A pool of threads to perform work.
|
||||
|
||||
@@ -32,418 +36,85 @@ class Logs;
|
||||
class JobQueue : private Workers::Callback
|
||||
{
|
||||
public:
|
||||
/** C++20 coroutine lifecycle manager.
|
||||
*
|
||||
* Class / Inheritance / Dependency Diagram
|
||||
* =========================================
|
||||
*
|
||||
* std::enable_shared_from_this<CoroTaskRunner>
|
||||
* ^
|
||||
* | (public inheritance)
|
||||
* |
|
||||
* CoroTaskRunner
|
||||
* +---------------------------------------------------+
|
||||
* | - lvs_ : detail::LocalValues |
|
||||
* | - jq_ : JobQueue& |
|
||||
* | - type_ : JobType |
|
||||
* | - name_ : std::string |
|
||||
* | - runCount_ : int (in-flight resumes) |
|
||||
* | - mutex_ : std::mutex (coroutine guard) |
|
||||
* | - mutex_run_ : std::mutex (join guard) |
|
||||
* | - cv_ : condition_variable |
|
||||
* | - task_ : CoroTask<void> |
|
||||
* | - storedFunc_ : unique_ptr<FuncBase> (type-erased)|
|
||||
* +---------------------------------------------------+
|
||||
* | + init(F&&) : set up coroutine callable |
|
||||
* | + onSuspend() : ++jq_.nSuspend_ |
|
||||
* | + onUndoSuspend() : --jq_.nSuspend_ |
|
||||
* | + suspend() : returns SuspendAwaiter |
|
||||
* | + post() : schedule resume on JobQueue |
|
||||
* | + resume() : resume coroutine on caller |
|
||||
* | + runnable() : !task_.done() |
|
||||
* | + expectEarlyExit() : teardown for failed post |
|
||||
* | + join() : block until not running |
|
||||
* +---------------------------------------------------+
|
||||
* | |
|
||||
* | owns | references
|
||||
* v v
|
||||
* CoroTask<void> JobQueue
|
||||
* (coroutine frame) (thread pool + nSuspend_)
|
||||
*
|
||||
* FuncBase / FuncStore<F> (type-erased heap storage
|
||||
* for the coroutine lambda)
|
||||
*
|
||||
* Coroutine Lifecycle (Control Flow)
|
||||
* ===================================
|
||||
*
|
||||
* Caller thread JobQueue worker thread
|
||||
* ------------- ----------------------
|
||||
* postCoroTask(f)
|
||||
* |
|
||||
* +-- check stopping_ (reject if JQ shutting down)
|
||||
* +-- ++nSuspend_ (lazy start counts as suspended)
|
||||
* +-- make_shared<CoroTaskRunner>
|
||||
* +-- init(f)
|
||||
* | +-- store lambda on heap (FuncStore)
|
||||
* | +-- task_ = f(shared_from_this())
|
||||
* | [coroutine created, suspended at initial_suspend]
|
||||
* +-- post()
|
||||
* | +-- ++runCount_
|
||||
* | +-- addJob(type_, [resume]{})
|
||||
* | resume()
|
||||
* | |
|
||||
* | +-- --nSuspend_
|
||||
* | +-- swap in LocalValues
|
||||
* | +-- task_.handle().resume()
|
||||
* | | [coroutine body runs]
|
||||
* | | ...
|
||||
* | | co_await suspend()
|
||||
* | | +-- ++nSuspend_
|
||||
* | | [coroutine suspends]
|
||||
* | +-- swap out LocalValues
|
||||
* | +-- --runCount_
|
||||
* | +-- cv_.notify_all()
|
||||
* |
|
||||
* post() <-- called externally or by yieldAndPost()
|
||||
* +-- ++runCount_
|
||||
* +-- addJob(type_, [resume]{})
|
||||
* resume()
|
||||
* |
|
||||
* +-- [coroutine body continues]
|
||||
* +-- co_return
|
||||
* +-- --runCount_
|
||||
* +-- cv_.notify_all()
|
||||
* join()
|
||||
* +-- cv_.wait([]{runCount_ == 0})
|
||||
* +-- [done]
|
||||
*
|
||||
* Usage Examples
|
||||
* ==============
|
||||
*
|
||||
* 1. Fire-and-forget coroutine (most common pattern):
|
||||
*
|
||||
* jq.postCoroTask(jtCLIENT, "MyWork",
|
||||
* [](auto runner) -> CoroTask<void> {
|
||||
* doSomeWork();
|
||||
* co_await runner->suspend(); // yield to other jobs
|
||||
* doMoreWork();
|
||||
* co_return;
|
||||
* });
|
||||
*
|
||||
* 2. Manually controlling suspend / resume (external trigger):
|
||||
*
|
||||
* auto runner = jq.postCoroTask(jtCLIENT, "ExtTrigger",
|
||||
* [&result](auto runner) -> CoroTask<void> {
|
||||
* startAsyncOperation(callback);
|
||||
* co_await runner->suspend();
|
||||
* // callback called runner->post() to get here
|
||||
* result = collectResult();
|
||||
* co_return;
|
||||
* });
|
||||
* // ... later, from the callback:
|
||||
* runner->post(); // reschedule the coroutine on the JobQueue
|
||||
*
|
||||
* 3. Using yieldAndPost() for automatic suspend + repost:
|
||||
*
|
||||
* jq.postCoroTask(jtCLIENT, "AutoRepost",
|
||||
* [](auto runner) -> CoroTask<void> {
|
||||
* step1();
|
||||
* co_await runner->yieldAndPost(); // yield + auto-repost
|
||||
* step2();
|
||||
* co_await runner->yieldAndPost();
|
||||
* step3();
|
||||
* co_return;
|
||||
* });
|
||||
*
|
||||
* 4. Checking shutdown after co_await (cooperative cancellation):
|
||||
*
|
||||
* jq.postCoroTask(jtCLIENT, "Cancellable",
|
||||
* [&jq](auto runner) -> CoroTask<void> {
|
||||
* while (moreWork()) {
|
||||
* co_await runner->yieldAndPost();
|
||||
* if (jq.isStopping())
|
||||
* co_return; // bail out cleanly
|
||||
* processNextItem();
|
||||
* }
|
||||
* co_return;
|
||||
* });
|
||||
*
|
||||
* Caveats / Pitfalls
|
||||
* ==================
|
||||
*
|
||||
* BUG-RISK: Calling suspend() without a matching post()/resume().
|
||||
* After co_await runner->suspend(), the coroutine is parked and
|
||||
* nSuspend_ is incremented. If nothing ever calls post() or
|
||||
* resume(), the coroutine is leaked and JobQueue::stop() will
|
||||
* hang forever waiting for nSuspend_ to reach zero.
|
||||
*
|
||||
* BUG-RISK: Calling post() on an already-running coroutine.
|
||||
* post() schedules a resume() job. If the coroutine has not
|
||||
* actually suspended yet (no co_await executed), the resume job
|
||||
* will try to call handle().resume() while the coroutine is still
|
||||
* running on another thread. This is UB. The mutex_ prevents
|
||||
* data corruption but the logic is wrong — always co_await
|
||||
* suspend() before calling post(). (The test incorrect_order()
|
||||
* shows this works only because mutex_ serializes the calls.)
|
||||
*
|
||||
* BUG-RISK: Dropping the shared_ptr<CoroTaskRunner> before join().
|
||||
* The CoroTaskRunner destructor asserts that finished_ is true
|
||||
* (the coroutine completed). If you let the last shared_ptr die
|
||||
* while the coroutine is still running or suspended, you get an
|
||||
* assertion failure in debug and UB in release. Always call
|
||||
* join() or expectEarlyExit() first.
|
||||
*
|
||||
* BUG-RISK: Lambda captures outliving the coroutine frame.
|
||||
* The lambda passed to postCoroTask is heap-allocated (FuncStore)
|
||||
* to prevent dangling. But objects captured by pointer still need
|
||||
* their own lifetime management. If you capture a raw pointer to
|
||||
* a stack variable, and the stack frame exits before the coroutine
|
||||
* finishes, the pointer dangles. Use shared_ptr or ensure the
|
||||
* pointed-to object outlives the coroutine.
|
||||
*
|
||||
* BUG-RISK: Forgetting co_return in a void coroutine.
|
||||
* If the coroutine body falls off the end without co_return,
|
||||
* the compiler may silently treat it as co_return (per standard),
|
||||
* but some compilers warn. Always write explicit co_return.
|
||||
*
|
||||
* LIMITATION: CoroTaskRunner only supports CoroTask<void>.
|
||||
* The task_ member is CoroTask<void>. To return values from
|
||||
* the top-level coroutine, write through a captured pointer
|
||||
* (as the tests demonstrate), or co_await inner CoroTask<T>
|
||||
* coroutines that return values.
|
||||
*
|
||||
* LIMITATION: One coroutine per CoroTaskRunner.
|
||||
* init() must be called exactly once. You cannot reuse a
|
||||
* CoroTaskRunner to run a second coroutine. Create a new one
|
||||
* via postCoroTask() instead.
|
||||
*
|
||||
* LIMITATION: No timeout on join().
|
||||
* join() blocks indefinitely. If the coroutine is suspended
|
||||
* and never posted, join() will deadlock. Use timed waits
|
||||
* on the gate pattern (condition_variable + wait_for) in tests.
|
||||
*/
|
||||
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
|
||||
/** Coroutines must run to completion. */
|
||||
class Coro : public std::enable_shared_from_this<Coro>
|
||||
{
|
||||
private:
|
||||
// Per-coroutine thread-local storage. Swapped in before resume()
|
||||
// and swapped out after, so each coroutine sees its own LocalValue
|
||||
// state regardless of which worker thread executes it.
|
||||
detail::LocalValues lvs_;
|
||||
|
||||
// Back-reference to the owning JobQueue. Used to post jobs,
|
||||
// increment/decrement nSuspend_, and acquire jq_.m_mutex.
|
||||
JobQueue& jq_;
|
||||
|
||||
// Job type passed to addJob() when posting this coroutine.
|
||||
JobType type_;
|
||||
|
||||
// Human-readable name for this coroutine job (for logging).
|
||||
std::string name_;
|
||||
|
||||
// Number of in-flight resume operations (pending + active).
|
||||
// Incremented by post(), decremented when resume() finishes.
|
||||
// Guarded by mutex_run_. join() blocks until this reaches 0.
|
||||
//
|
||||
// A counter (not a bool) is needed because post() can be called
|
||||
// from within the coroutine body (e.g. via yieldAndPost()),
|
||||
// enqueuing a second resume while the first is still running.
|
||||
// A bool would be clobbered: R2.post() sets true, then R1's
|
||||
// cleanup sets false — losing the fact that R2 is still pending.
|
||||
int runCount_;
|
||||
|
||||
// Serializes all coroutine resume() calls, preventing concurrent
|
||||
// execution of the coroutine body on multiple threads. Handles the
|
||||
// race where post() enqueues a resume before the coroutine has
|
||||
// actually suspended (post-before-suspend pattern).
|
||||
bool running_;
|
||||
std::mutex mutex_;
|
||||
|
||||
// Guards runCount_. Used with cv_ for join() to wait
|
||||
// until all pending/active resume operations complete.
|
||||
std::mutex mutex_run_;
|
||||
|
||||
// Notified when runCount_ reaches zero, allowing
|
||||
// join() waiters to wake up.
|
||||
std::condition_variable cv_;
|
||||
|
||||
// The coroutine handle wrapper. Owns the coroutine frame.
|
||||
// Set by init(). Reset to empty in resume() upon coroutine
|
||||
// completion (to break the shared_ptr cycle) or in
|
||||
// expectEarlyExit() on early termination.
|
||||
CoroTask<void> task_;
|
||||
|
||||
/**
|
||||
* Type-erased base for heap-stored callables.
|
||||
* Prevents the coroutine lambda from being destroyed before
|
||||
* the coroutine frame is done with it.
|
||||
*
|
||||
* @see FuncStore
|
||||
*/
|
||||
struct FuncBase
|
||||
{
|
||||
virtual ~FuncBase() = default;
|
||||
};
|
||||
|
||||
/**
|
||||
* Concrete type-erased storage for a callable of type F.
|
||||
* The coroutine frame stores a reference to the lambda's implicit
|
||||
* object parameter. If the lambda is a temporary, that reference
|
||||
* dangles after the call returns. FuncStore keeps it alive on
|
||||
* the heap for the lifetime of the CoroTaskRunner.
|
||||
*/
|
||||
template <class F>
|
||||
struct FuncStore : FuncBase
|
||||
{
|
||||
F func; // The stored callable (coroutine lambda).
|
||||
explicit FuncStore(F&& f) : func(std::move(f))
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
// Heap-allocated callable storage. Set by init(), ensures the
|
||||
// lambda outlives the coroutine frame that references it.
|
||||
std::unique_ptr<FuncBase> storedFunc_;
|
||||
|
||||
// True once the coroutine has completed or expectEarlyExit() was
|
||||
// called. Asserted in the destructor (debug) to catch leaked
|
||||
// runners. Available in all builds to guard expectEarlyExit()
|
||||
// against double-decrementing nSuspend_.
|
||||
// Atomic to allow lock-free reads from runnable(), join(), and
|
||||
// the destructor without requiring the same mutex that guards
|
||||
// the write in resume().
|
||||
std::atomic<bool> finished_{false};
|
||||
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
|
||||
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
|
||||
#ifndef NDEBUG
|
||||
bool finished_ = false;
|
||||
#endif
|
||||
|
||||
public:
|
||||
/**
|
||||
* Tag type for private construction. Prevents external code
|
||||
* from constructing CoroTaskRunner directly. Use postCoroTask().
|
||||
*/
|
||||
struct create_t
|
||||
{
|
||||
explicit create_t() = default;
|
||||
};
|
||||
|
||||
/**
|
||||
* Construct a CoroTaskRunner. Private by convention (create_t tag).
|
||||
*
|
||||
* @param jq The JobQueue this coroutine will run on
|
||||
* @param type Job type for scheduling priority
|
||||
* @param name Human-readable name for logging
|
||||
*/
|
||||
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
|
||||
|
||||
CoroTaskRunner(CoroTaskRunner const&) = delete;
|
||||
CoroTaskRunner&
|
||||
operator=(CoroTaskRunner const&) = delete;
|
||||
|
||||
/**
|
||||
* Destructor. Asserts (debug) that the coroutine has finished
|
||||
* or expectEarlyExit() was called.
|
||||
*/
|
||||
~CoroTaskRunner();
|
||||
|
||||
/**
|
||||
* Initialize with a coroutine-returning callable.
|
||||
* Must be called exactly once, after the object is managed by
|
||||
* shared_ptr (because init uses shared_from_this internally).
|
||||
* This is handled automatically by postCoroTask().
|
||||
*
|
||||
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
|
||||
*/
|
||||
// Private: Used in the implementation
|
||||
template <class F>
|
||||
Coro(Coro_create_t, JobQueue&, JobType, std::string const&, F&&);
|
||||
|
||||
// Not copy-constructible or assignable
|
||||
Coro(Coro const&) = delete;
|
||||
Coro&
|
||||
operator=(Coro const&) = delete;
|
||||
|
||||
~Coro();
|
||||
|
||||
/** Suspend coroutine execution.
|
||||
Effects:
|
||||
The coroutine's stack is saved.
|
||||
The associated Job thread is released.
|
||||
Note:
|
||||
The associated Job function returns.
|
||||
Undefined behavior if called consecutively without a corresponding
|
||||
post.
|
||||
*/
|
||||
void
|
||||
init(F&& f);
|
||||
yield() const;
|
||||
|
||||
/**
|
||||
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
|
||||
* Called when the coroutine is about to suspend. Every call
|
||||
* must be balanced by a corresponding decrement (via resume()
|
||||
* or onUndoSuspend()), or JobQueue::stop() will hang.
|
||||
*/
|
||||
void
|
||||
onSuspend();
|
||||
/** Schedule coroutine execution.
|
||||
Effects:
|
||||
Returns immediately.
|
||||
A new job is scheduled to resume the execution of the coroutine.
|
||||
When the job runs, the coroutine's stack is restored and execution
|
||||
continues at the beginning of coroutine function or the
|
||||
statement after the previous call to yield. Undefined behavior if
|
||||
called after the coroutine has completed with a return (as opposed to
|
||||
a yield()). Undefined behavior if post() or resume() called
|
||||
consecutively without a corresponding yield.
|
||||
|
||||
/**
|
||||
* Decrement nSuspend_ without resuming.
|
||||
* Used to undo onSuspend() when a scheduled post() fails
|
||||
* (e.g. JobQueue is stopping).
|
||||
*/
|
||||
void
|
||||
onUndoSuspend();
|
||||
|
||||
/**
|
||||
* Suspend the coroutine.
|
||||
* The awaiter's await_suspend() increments nSuspend_ before the
|
||||
* coroutine actually suspends. The caller must later call post()
|
||||
* or resume() to continue execution.
|
||||
*
|
||||
* @return An awaiter for use with `co_await runner->suspend()`
|
||||
*/
|
||||
auto
|
||||
suspend();
|
||||
|
||||
/**
|
||||
* Suspend the coroutine and immediately repost it on the
|
||||
* JobQueue. Combines suspend() + post() atomically inside
|
||||
* await_suspend, so there is no window where an external
|
||||
* event could race between the two.
|
||||
*
|
||||
* Equivalent to JobQueueAwaiter but defined as an inline
|
||||
* awaiter returned from a member function. This avoids a
|
||||
* GCC-12 coroutine codegen bug where an external awaiter
|
||||
* struct (JobQueueAwaiter) used at multiple co_await points
|
||||
* corrupts the coroutine state machine's resume index,
|
||||
* causing the coroutine to hang on the third resumption.
|
||||
*
|
||||
* @return An awaiter for use with `co_await runner->yieldAndPost()`
|
||||
*/
|
||||
auto
|
||||
yieldAndPost();
|
||||
|
||||
/**
|
||||
* Schedule coroutine resumption as a job on the JobQueue.
|
||||
* Captures shared_from_this() to prevent this runner from being
|
||||
* destroyed while the job is queued.
|
||||
*
|
||||
* @return true if the job was accepted; false if the JobQueue
|
||||
* is stopping (caller must handle cleanup)
|
||||
*/
|
||||
@return true if the Coro's job is added to the JobQueue.
|
||||
*/
|
||||
bool
|
||||
post();
|
||||
|
||||
/**
|
||||
* Resume the coroutine on the current thread.
|
||||
* Decrements nSuspend_, swaps in LocalValues, resumes the
|
||||
* coroutine handle, swaps out LocalValues, and notifies join()
|
||||
* waiters. Lock ordering (sequential, non-overlapping):
|
||||
* jq_.m_mutex -> mutex_ -> mutex_run_.
|
||||
*
|
||||
* @pre post() must have been called before resume(). Direct
|
||||
* calls without a prior post() will corrupt runCount_
|
||||
* and break join().
|
||||
*/
|
||||
/** Resume coroutine execution.
|
||||
Effects:
|
||||
The coroutine continues execution from where it last left off
|
||||
using this same thread.
|
||||
Undefined behavior if called after the coroutine has completed
|
||||
with a return (as opposed to a yield()).
|
||||
Undefined behavior if resume() or post() called consecutively
|
||||
without a corresponding yield.
|
||||
*/
|
||||
void
|
||||
resume();
|
||||
|
||||
/**
|
||||
* @return true if the coroutine has not yet run to completion
|
||||
*/
|
||||
/** Returns true if the Coro is still runnable (has not returned). */
|
||||
bool
|
||||
runnable() const;
|
||||
|
||||
/**
|
||||
* Handle early termination when the coroutine never ran.
|
||||
* Decrements nSuspend_ and destroys the coroutine frame to
|
||||
* break the shared_ptr cycle (frame -> lambda -> runner -> frame).
|
||||
* Called by postCoroTask() when post() fails.
|
||||
*/
|
||||
/** Once called, the Coro allows early exit without an assert. */
|
||||
void
|
||||
expectEarlyExit();
|
||||
|
||||
/**
|
||||
* Block until all pending/active resume operations complete.
|
||||
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0.
|
||||
* Warning: deadlocks if the coroutine is suspended and never posted.
|
||||
*/
|
||||
/** Waits until coroutine returns from the user function. */
|
||||
void
|
||||
join();
|
||||
};
|
||||
@@ -481,18 +152,18 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Creates a C++20 coroutine and adds a job to the queue to run it.
|
||||
/** Creates a coroutine and adds a job to the queue which will run it.
|
||||
|
||||
@param t The type of job.
|
||||
@param name Name of the job.
|
||||
@param f Callable with signature
|
||||
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
|
||||
@param f Has a signature of void(std::shared_ptr<Coro>). Called when the
|
||||
job executes.
|
||||
|
||||
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
|
||||
@return shared_ptr to posted Coro. nullptr if post was not successful.
|
||||
*/
|
||||
template <class F>
|
||||
std::shared_ptr<CoroTaskRunner>
|
||||
postCoroTask(JobType t, std::string const& name, F&& f);
|
||||
std::shared_ptr<Coro>
|
||||
postCoro(JobType t, std::string const& name, F&& f);
|
||||
|
||||
/** Jobs waiting at this priority.
|
||||
*/
|
||||
@@ -546,6 +217,8 @@ public:
|
||||
isStopped() const;
|
||||
|
||||
private:
|
||||
friend class Coro;
|
||||
|
||||
using JobDataMap = std::map<JobType, JobTypeData>;
|
||||
|
||||
beast::Journal m_journal;
|
||||
@@ -646,75 +319,86 @@ private:
|
||||
getJobLimit(JobType type);
|
||||
};
|
||||
|
||||
/*
|
||||
An RPC command is received and is handled via ServerHandler(HTTP) or
|
||||
Handler(websocket), depending on the connection type. The handler then calls
|
||||
the JobQueue::postCoro() method to create a coroutine and run it at a later
|
||||
point. This frees up the handler thread and allows it to continue handling
|
||||
other requests while the RPC command completes its work asynchronously.
|
||||
|
||||
postCoro() creates a Coro object. When the Coro ctor is called, and its
|
||||
coro_ member is initialized (a boost::coroutines::pull_type), execution
|
||||
automatically passes to the coroutine, which we don't want at this point,
|
||||
since we are still in the handler thread context. It's important to note
|
||||
here that construction of a boost pull_type automatically passes execution to
|
||||
the coroutine. A pull_type object automatically generates a push_type that is
|
||||
passed as a parameter (do_yield) in the signature of the function the
|
||||
pull_type was created with. This function is immediately called during coro_
|
||||
construction and within it, Coro::yield_ is assigned the push_type
|
||||
parameter (do_yield) address and called (yield()) so we can return execution
|
||||
back to the caller's stack.
|
||||
|
||||
postCoro() then calls Coro::post(), which schedules a job on the job
|
||||
queue to continue execution of the coroutine in a JobQueue worker thread at
|
||||
some later time. When the job runs, we lock on the Coro::mutex_ and call
|
||||
coro_ which continues where we had left off. Since we the last thing we did
|
||||
in coro_ was call yield(), the next thing we continue with is calling the
|
||||
function param f, that was passed into Coro ctor. It is within this
|
||||
function body that the caller specifies what he would like to do while
|
||||
running in the coroutine and allow them to suspend and resume execution.
|
||||
A task that relies on other events to complete, such as path finding, calls
|
||||
Coro::yield() to suspend its execution while waiting on those events to
|
||||
complete and continue when signaled via the Coro::post() method.
|
||||
|
||||
There is a potential race condition that exists here where post() can get
|
||||
called before yield() after f is called. Technically the problem only occurs
|
||||
if the job that post() scheduled is executed before yield() is called.
|
||||
If the post() job were to be executed before yield(), undefined behavior
|
||||
would occur. The lock ensures that coro_ is not called again until we exit
|
||||
the coroutine. At which point a scheduled resume() job waiting on the lock
|
||||
would gain entry, harmlessly call coro_ and immediately return as we have
|
||||
already completed the coroutine.
|
||||
|
||||
The race condition occurs as follows:
|
||||
|
||||
1- The coroutine is running.
|
||||
2- The coroutine is about to suspend, but before it can do so, it must
|
||||
arrange for some event to wake it up.
|
||||
3- The coroutine arranges for some event to wake it up.
|
||||
4- Before the coroutine can suspend, that event occurs and the
|
||||
resumption of the coroutine is scheduled on the job queue. 5- Again, before
|
||||
the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
|
||||
Again, before the coroutine can suspend, the resumption code runs the
|
||||
coroutine.
|
||||
The coroutine is now running in two threads.
|
||||
|
||||
The lock prevents this from happening as step 6 will block until the
|
||||
lock is released which only happens after the coroutine completes.
|
||||
*/
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
#include <xrpl/core/CoroTaskRunner.ipp>
|
||||
#include <xrpl/core/Coro.ipp>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
// postCoroTask — entry point for launching a C++20 coroutine on the JobQueue.
|
||||
//
|
||||
// Control Flow
|
||||
// ============
|
||||
//
|
||||
// postCoroTask(t, name, f)
|
||||
// |
|
||||
// +-- 1. Check stopping_ — reject if JQ shutting down
|
||||
// |
|
||||
// +-- 2. ++nSuspend_ (the coroutine uses lazy-start, so it is
|
||||
// | "suspended" from the JQ's perspective before its first resume.
|
||||
// | This keeps the JQ shutdown logic correct — it waits for
|
||||
// | nSuspend_ to reach 0).
|
||||
// |
|
||||
// +-- 3. Create CoroTaskRunner (shared_ptr, ref-counted)
|
||||
// |
|
||||
// +-- 4. runner->init(f)
|
||||
// | +-- Heap-allocate the lambda (FuncStore) to prevent
|
||||
// | | dangling captures in the coroutine frame
|
||||
// | +-- task_ = f(shared_from_this())
|
||||
// | [coroutine created but NOT started — lazy initial_suspend]
|
||||
// |
|
||||
// +-- 5. runner->post()
|
||||
// | +-- addJob(type_, [resume]{}) → resume on worker thread
|
||||
// | +-- failure (JQ stopping):
|
||||
// | +-- runner->expectEarlyExit()
|
||||
// | | --nSuspend_, destroy coroutine frame
|
||||
// | +-- return nullptr
|
||||
//
|
||||
// Why async post() instead of synchronous resume()?
|
||||
// ==================================================
|
||||
// The initial dispatch MUST use async post() so the coroutine body runs on
|
||||
// a JobQueue worker thread, not the caller's thread. resume() swaps the
|
||||
// caller's thread-local LocalValues with the coroutine's private copy.
|
||||
// If the coroutine mutates LocalValues (e.g. thread_specific_storage test),
|
||||
// those mutations bleed back into the caller's thread-local state after the
|
||||
// swap-out, corrupting subsequent tests that share the same thread pool.
|
||||
// Async post() avoids this by running the coroutine on a worker thread whose
|
||||
// LocalValues are managed by the thread pool, not by the caller.
|
||||
//
|
||||
template <class F>
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner>
|
||||
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
|
||||
std::shared_ptr<JobQueue::Coro>
|
||||
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
|
||||
{
|
||||
// Reject if the JQ is shutting down and atomically increment
|
||||
// nSuspend_ under the same lock. Without the lock, a TOCTOU race
|
||||
// exists: stopping_ could become true between the check and the
|
||||
// increment, leaving an orphan nSuspend_ that causes stop() to hang.
|
||||
/* First param is a detail type to make construction private.
|
||||
Last param is the function the coroutine runs. Signature of
|
||||
void(std::shared_ptr<Coro>).
|
||||
*/
|
||||
auto coro = std::make_shared<Coro>(Coro_create_t{}, *this, t, name, std::forward<F>(f));
|
||||
if (!coro->post())
|
||||
{
|
||||
std::lock_guard lock(m_mutex);
|
||||
if (stopping_)
|
||||
return nullptr;
|
||||
++nSuspend_;
|
||||
// The Coro was not successfully posted. Disable it so it's destructor
|
||||
// can run with no negative side effects. Then destroy it.
|
||||
coro->expectEarlyExit();
|
||||
coro.reset();
|
||||
}
|
||||
|
||||
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
|
||||
runner->init(std::forward<F>(f));
|
||||
if (!runner->post())
|
||||
{
|
||||
runner->expectEarlyExit();
|
||||
runner.reset();
|
||||
}
|
||||
return runner;
|
||||
return coro;
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
@@ -1,206 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
|
||||
#include <coroutine>
|
||||
#include <memory>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/**
|
||||
* Awaiter that suspends and immediately reschedules on the JobQueue.
|
||||
* Equivalent to calling yield() followed by post() in the old Coro API.
|
||||
*
|
||||
* Usage:
|
||||
* co_await JobQueueAwaiter{runner};
|
||||
*
|
||||
* What it waits for: The coroutine is re-queued as a job and resumes
|
||||
* when a worker thread picks it up.
|
||||
*
|
||||
* Which thread resumes: A JobQueue worker thread.
|
||||
*
|
||||
* What await_resume() returns: void.
|
||||
*
|
||||
* Dependency Diagram
|
||||
* ==================
|
||||
*
|
||||
* JobQueueAwaiter
|
||||
* +----------------------------------------------+
|
||||
* | + runner : shared_ptr<CoroTaskRunner> |
|
||||
* +----------------------------------------------+
|
||||
* | + await_ready() -> false (always suspend) |
|
||||
* | + await_suspend() -> bool (suspend or cancel) |
|
||||
* | + await_resume() -> void |
|
||||
* +----------------------------------------------+
|
||||
* | |
|
||||
* | uses | uses
|
||||
* v v
|
||||
* CoroTaskRunner JobQueue
|
||||
* .onSuspend() (via runner->post() -> addJob)
|
||||
* .onUndoSuspend()
|
||||
* .post()
|
||||
*
|
||||
* Control Flow (await_suspend)
|
||||
* ============================
|
||||
*
|
||||
* co_await JobQueueAwaiter{runner}
|
||||
* |
|
||||
* +-- await_ready() -> false
|
||||
* +-- await_suspend(handle)
|
||||
* |
|
||||
* +-- runner->onSuspend() // ++nSuspend_
|
||||
* +-- runner->post() // addJob to JobQueue
|
||||
* | |
|
||||
* | +-- success? return noop_coroutine()
|
||||
* | | // coroutine stays suspended;
|
||||
* | | // worker thread will call resume()
|
||||
* | +-- failure? (JQ stopping)
|
||||
* | +-- runner->onUndoSuspend() // --nSuspend_
|
||||
* | +-- return handle // symmetric transfer back
|
||||
* | // coroutine continues immediately
|
||||
* | // so it can clean up and co_return
|
||||
*
|
||||
* DEPRECATED — prefer `co_await runner->yieldAndPost()`
|
||||
* =====================================================
|
||||
*
|
||||
* GCC-12 has a coroutine codegen bug where using this external awaiter
|
||||
* struct at multiple co_await points in the same coroutine corrupts the
|
||||
* state machine's resume index. After the second co_await, the third
|
||||
* resumption enters handle().resume() but never reaches await_resume()
|
||||
* or any subsequent user code — the coroutine hangs indefinitely.
|
||||
*
|
||||
* The fix is `co_await runner->yieldAndPost()`, which defines the
|
||||
* awaiter as an inline struct inside a CoroTaskRunner member function.
|
||||
* GCC-12 handles inline awaiters correctly at multiple co_await points.
|
||||
*
|
||||
* This struct is retained for single-use scenarios and documentation
|
||||
* purposes. For any code that may use co_await in a loop or at
|
||||
* multiple points, always use `runner->yieldAndPost()`.
|
||||
*
|
||||
* Usage Examples
|
||||
* ==============
|
||||
*
|
||||
* 1. Yield and auto-repost (preferred — works on all compilers):
|
||||
*
|
||||
* CoroTask<void> handler(auto runner) {
|
||||
* doPartA();
|
||||
* co_await runner->yieldAndPost(); // yield + repost
|
||||
* doPartB(); // runs on a worker thread
|
||||
* co_return;
|
||||
* }
|
||||
*
|
||||
* 2. Multiple yield points in a loop:
|
||||
*
|
||||
* CoroTask<void> batchProcessor(auto runner) {
|
||||
* for (auto& item : items) {
|
||||
* process(item);
|
||||
* co_await runner->yieldAndPost(); // let other jobs run
|
||||
* }
|
||||
* co_return;
|
||||
* }
|
||||
*
|
||||
* 3. Graceful shutdown — checking after resume:
|
||||
*
|
||||
* CoroTask<void> longTask(auto runner, JobQueue& jq) {
|
||||
* while (hasWork()) {
|
||||
* co_await runner->yieldAndPost();
|
||||
* // If JQ is stopping, await_suspend resumes the coroutine
|
||||
* // immediately without re-queuing. Always check
|
||||
* // isStopping() to decide whether to proceed:
|
||||
* if (jq.isStopping())
|
||||
* co_return;
|
||||
* doNextChunk();
|
||||
* }
|
||||
* co_return;
|
||||
* }
|
||||
*
|
||||
* Caveats / Pitfalls
|
||||
* ==================
|
||||
*
|
||||
* BUG-RISK: Using a stale or null runner.
|
||||
* The runner shared_ptr must be valid and point to the CoroTaskRunner
|
||||
* that owns the coroutine currently executing. Passing a runner from
|
||||
* a different coroutine, or a default-constructed shared_ptr, is UB.
|
||||
*
|
||||
* BUG-RISK: Assuming resume happens on the same thread.
|
||||
* After co_await, the coroutine resumes on whatever worker thread
|
||||
* picks up the job. Do not rely on thread-local state unless it is
|
||||
* managed through LocalValue (which CoroTaskRunner automatically
|
||||
* swaps in/out).
|
||||
*
|
||||
* BUG-RISK: Ignoring the shutdown path.
|
||||
* When the JobQueue is stopping, post() fails and await_suspend()
|
||||
* resumes the coroutine immediately (symmetric transfer back to h).
|
||||
* The coroutine body continues on the same thread. If your code
|
||||
* after co_await assumes it was re-queued and is running on a worker
|
||||
* thread, that assumption breaks during shutdown. Always handle the
|
||||
* "JQ is stopping" case, either by checking jq.isStopping() or by
|
||||
* letting the coroutine fall through to co_return naturally.
|
||||
*
|
||||
* DIFFERENCE from runner->suspend() + runner->post():
|
||||
* Both JobQueueAwaiter and yieldAndPost() combine suspend + post
|
||||
* in one atomic operation. With the manual suspend()/post() pattern,
|
||||
* there is a window between the two calls where an external event
|
||||
* could race. The atomic awaiters remove that window — onSuspend()
|
||||
* and post() happen within the same await_suspend() call while the
|
||||
* coroutine is guaranteed to be suspended. Use yieldAndPost() unless
|
||||
* you need an external party to decide *when* to call post().
|
||||
*/
|
||||
struct JobQueueAwaiter
|
||||
{
|
||||
// The CoroTaskRunner that owns the currently executing coroutine.
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
|
||||
|
||||
/**
|
||||
* Always returns false so the coroutine suspends.
|
||||
*/
|
||||
bool
|
||||
await_ready() const noexcept
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment nSuspend (equivalent to yield()) and schedule resume
|
||||
* on the JobQueue (equivalent to post()). If the JobQueue is
|
||||
* stopping, undoes the suspend count and transfers back to the
|
||||
* coroutine so it can clean up and co_return.
|
||||
*
|
||||
* Returns a coroutine_handle<> (symmetric transfer) instead of
|
||||
* bool to work around a GCC-12 codegen bug where bool-returning
|
||||
* await_suspend leaves the coroutine in an invalid state —
|
||||
* neither properly suspended nor resumed — causing a hang.
|
||||
*
|
||||
* WARNING: GCC-12 has an additional codegen bug where using this
|
||||
* external awaiter struct at multiple co_await points in the same
|
||||
* coroutine corrupts the state machine's resume index, causing the
|
||||
* coroutine to hang on the third resumption. Prefer
|
||||
* `co_await runner->yieldAndPost()` which uses an inline awaiter
|
||||
* that GCC-12 handles correctly.
|
||||
*
|
||||
* @return noop_coroutine() to stay suspended (job posted);
|
||||
* the caller's handle to resume immediately (JQ stopping)
|
||||
*/
|
||||
std::coroutine_handle<>
|
||||
await_suspend(std::coroutine_handle<> h)
|
||||
{
|
||||
XRPL_ASSERT(runner, "xrpl::JobQueueAwaiter::await_suspend : runner is valid");
|
||||
runner->onSuspend();
|
||||
if (!runner->post())
|
||||
{
|
||||
// JobQueue is stopping. Undo the suspend count and
|
||||
// transfer back to the coroutine so it can clean up
|
||||
// and co_return.
|
||||
runner->onUndoSuspend();
|
||||
return h;
|
||||
}
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
|
||||
void
|
||||
await_resume() const noexcept
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -244,7 +244,15 @@ message TMGetObjectByHash {
|
||||
|
||||
message TMLedgerNode {
|
||||
required bytes nodedata = 1;
|
||||
optional bytes nodeid = 2; // missing for ledger base data
|
||||
|
||||
// Used when protocol version <2.3. Not set for ledger base data.
|
||||
optional bytes nodeid = 2;
|
||||
|
||||
// Used when protocol version >=2.3. Neither value is set for ledger base data.
|
||||
oneof reference {
|
||||
bytes id = 3; // Set for inner nodes.
|
||||
uint32 depth = 4; // Set for leaf nodes.
|
||||
}
|
||||
}
|
||||
|
||||
enum TMLedgerInfoType {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
#include <set>
|
||||
#include <stack>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
namespace xrpl {
|
||||
@@ -253,7 +254,7 @@ public:
|
||||
bool
|
||||
getNodeFat(
|
||||
SHAMapNodeID const& wanted,
|
||||
std::vector<std::pair<SHAMapNodeID, Blob>>& data,
|
||||
std::vector<std::tuple<SHAMapNodeID, Blob, bool>>& data,
|
||||
bool fatLeaves,
|
||||
std::uint32_t depth) const;
|
||||
|
||||
@@ -280,10 +281,45 @@ public:
|
||||
void
|
||||
serializeRoot(Serializer& s) const;
|
||||
|
||||
/** Add a root node to the SHAMap during synchronization.
|
||||
*
|
||||
* This function is used when receiving the root node of a SHAMap from a peer during ledger
|
||||
* synchronization. The node must already have been deserialized.
|
||||
*
|
||||
* @param hash The expected hash of the root node.
|
||||
* @param rootNode A deserialized root node to add.
|
||||
* @param filter Optional sync filter to track received nodes.
|
||||
* @return Status indicating whether the node was useful, duplicate, or invalid.
|
||||
*
|
||||
* @note This function expects the rootNode to be a valid, deserialized SHAMapTreeNode. The
|
||||
* caller is responsible for deserialization and basic validation before calling this
|
||||
* function.
|
||||
*/
|
||||
SHAMapAddNode
|
||||
addRootNode(SHAMapHash const& hash, Slice const& rootNode, SHAMapSyncFilter* filter);
|
||||
addRootNode(
|
||||
SHAMapHash const& hash,
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode> rootNode,
|
||||
SHAMapSyncFilter const* filter);
|
||||
|
||||
/** Add a known node at a specific position in the SHAMap during synchronization.
|
||||
*
|
||||
* This function is used when receiving nodes from peers during ledger synchronization. The node
|
||||
* is inserted at the position specified by nodeID. The node must already have been
|
||||
* deserialized.
|
||||
*
|
||||
* @param nodeID The position in the tree where this node belongs.
|
||||
* @param treeNode A deserialized tree node to add.
|
||||
* @param filter Optional sync filter to track received nodes.
|
||||
* @return Status indicating whether the node was useful, duplicate, or invalid.
|
||||
*
|
||||
* @note This function expects that the caller has already validated that the nodeID is
|
||||
* consistent with the node's content.
|
||||
*/
|
||||
SHAMapAddNode
|
||||
addKnownNode(SHAMapNodeID const& nodeID, Slice const& rawNode, SHAMapSyncFilter* filter);
|
||||
addKnownNode(
|
||||
SHAMapNodeID const& nodeID,
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode> treeNode,
|
||||
SHAMapSyncFilter const* filter);
|
||||
|
||||
// status functions
|
||||
void
|
||||
@@ -344,11 +380,11 @@ private:
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode>
|
||||
fetchNodeNT(SHAMapHash const& hash) const;
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode>
|
||||
fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const;
|
||||
fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter const* filter) const;
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode>
|
||||
fetchNode(SHAMapHash const& hash) const;
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode>
|
||||
checkFilter(SHAMapHash const& hash, SHAMapSyncFilter* filter) const;
|
||||
checkFilter(SHAMapHash const& hash, SHAMapSyncFilter const* filter) const;
|
||||
|
||||
/** Update hashes up to the root */
|
||||
void
|
||||
@@ -420,7 +456,7 @@ private:
|
||||
descendAsync(
|
||||
SHAMapInnerNode* parent,
|
||||
int branch,
|
||||
SHAMapSyncFilter* filter,
|
||||
SHAMapSyncFilter const* filter,
|
||||
bool& pending,
|
||||
descendCallback&&) const;
|
||||
|
||||
@@ -429,7 +465,7 @@ private:
|
||||
SHAMapInnerNode* parent,
|
||||
SHAMapNodeID const& parentID,
|
||||
int branch,
|
||||
SHAMapSyncFilter* filter) const;
|
||||
SHAMapSyncFilter const* filter) const;
|
||||
|
||||
// Non-storing
|
||||
// Does not hook the returned node to its parent
|
||||
|
||||
@@ -1,45 +1,102 @@
|
||||
# TSAN suppression file for rippled.
|
||||
# Only suppress issues in third-party libraries and TSAN's own instrumentation.
|
||||
# Races in rippled's own code should be fixed, not suppressed.
|
||||
# The idea is to empty this file gradually by fixing the underlying issues and removing suppresions.
|
||||
|
||||
# Boost ASIO / Boost Context false positives
|
||||
# These are internal to Boost's reactor, fiber context switching, and memory management.
|
||||
# Suppress race in Boost ASIO scheduler detected by GCC-15
|
||||
# This is a false positive in Boost's internal pipe() synchronization
|
||||
race:boost/asio/
|
||||
race:boost/context/
|
||||
race:boost/asio/executor.hpp
|
||||
race:boost::asio
|
||||
race:boost::context::fiber::resume
|
||||
race:boost::asio::detail::spawned_fiber_thread
|
||||
race:boost::asio::detail::spawned_fiber_thread::suspend_with
|
||||
race:boost::asio::detail::spawned_fiber_thread::destroy
|
||||
race:__cxxabiv1::manage_exception_state
|
||||
race:__tsan_memcpy
|
||||
|
||||
# TSAN's own syscall interceptors (false positives from fd tracking)
|
||||
# Suppress tsan related issues in rippled code.
|
||||
race:src/libxrpl/basics/make_SSLContext.cpp
|
||||
race:src/libxrpl/basics/Number.cpp
|
||||
race:src/libxrpl/json/json_value.cpp
|
||||
race:src/libxrpl/json/to_string.cpp
|
||||
race:src/libxrpl/ledger/OpenView.cpp
|
||||
race:src/libxrpl/net/HTTPClient.cpp
|
||||
race:src/libxrpl/nodestore/backend/NuDBFactory.cpp
|
||||
race:src/libxrpl/protocol/InnerObjectFormats.cpp
|
||||
race:src/libxrpl/protocol/STParsedJSON.cpp
|
||||
race:src/libxrpl/resource/ResourceManager.cpp
|
||||
race:src/test/app/Flow_test.cpp
|
||||
race:src/test/app/LedgerReplay_test.cpp
|
||||
race:src/test/app/NFToken_test.cpp
|
||||
race:src/test/app/Offer_test.cpp
|
||||
race:src/test/app/ValidatorSite_test.cpp
|
||||
race:src/test/consensus/NegativeUNL_test.cpp
|
||||
race:src/test/jtx/impl/Env.cpp
|
||||
race:src/test/jtx/impl/JSONRPCClient.cpp
|
||||
race:src/test/jtx/impl/pay.cpp
|
||||
race:src/test/jtx/impl/token.cpp
|
||||
race:src/test/rpc/Book_test.cpp
|
||||
race:src/xrpld/app/ledger/detail/InboundTransactions.cpp
|
||||
race:src/xrpld/app/main/Application.cpp
|
||||
race:src/xrpld/app/main/BasicApp.cpp
|
||||
race:src/xrpld/app/main/GRPCServer.cpp
|
||||
race:src/xrpld/app/misc/detail/AmendmentTable.cpp
|
||||
race:src/xrpld/app/misc/FeeVoteImpl.cpp
|
||||
race:src/xrpld/app/rdb/detail/Wallet.cpp
|
||||
race:src/xrpld/overlay/detail/OverlayImpl.cpp
|
||||
race:src/xrpld/peerfinder/detail/PeerfinderManager.cpp
|
||||
race:src/xrpld/peerfinder/detail/SourceStrings.cpp
|
||||
race:src/xrpld/rpc/detail/ServerHandler.cpp
|
||||
race:xrpl/server/detail/Door.h
|
||||
race:xrpl/server/detail/Spawn.h
|
||||
race:xrpl/server/detail/ServerImpl.h
|
||||
race:xrpl/nodestore/detail/DatabaseNodeImp.h
|
||||
race:src/libxrpl/beast/utility/beast_Journal.cpp
|
||||
race:src/test/beast/LexicalCast_test.cpp
|
||||
race:ripple::ServerHandler
|
||||
|
||||
# More suppressions in external library code.
|
||||
race:crtstuff.c
|
||||
race:pipe
|
||||
race:epoll_ctl
|
||||
race:epoll_create
|
||||
race:closedir
|
||||
race:close
|
||||
race:socket
|
||||
race:accept
|
||||
race:eventfd
|
||||
|
||||
# C++ standard library internals
|
||||
race:ostreambuf_iterator
|
||||
race:basic_ostream
|
||||
called_from_lib:libclang_rt
|
||||
# Deadlock / lock-order-inversion suppressions
|
||||
# Note: GCC's TSAN may not fully support all deadlock suppression patterns
|
||||
deadlock:src/libxrpl/beast/utility/beast_Journal.cpp
|
||||
deadlock:src/libxrpl/beast/utility/beast_PropertyStream.cpp
|
||||
deadlock:src/test/beast/beast_PropertyStream_test.cpp
|
||||
deadlock:src/xrpld/core/detail/Workers.cpp
|
||||
deadlock:src/xrpld/app/misc/detail/Manifest.cpp
|
||||
deadlock:src/xrpld/app/misc/detail/ValidatorList.cpp
|
||||
deadlock:src/xrpld/app/misc/detail/ValidatorSite.cpp
|
||||
|
||||
# Deadlock false positives in Boost and threading primitives
|
||||
signal:src/libxrpl/beast/utility/beast_Journal.cpp
|
||||
signal:src/xrpld/core/detail/Workers.cpp
|
||||
signal:src/xrpld/core/JobQueue.cpp
|
||||
signal:ripple::Workers::Worker
|
||||
|
||||
# Aggressive suppressing of deadlock tsan errors
|
||||
deadlock:pthread_create
|
||||
deadlock:pthread_rwlock_rdlock
|
||||
deadlock:boost::asio
|
||||
|
||||
# Signal/crash suppressions for GCC TSAN instrumentation issues
|
||||
# Suppress SEGV crashes in TSAN itself during stringbuf operations
|
||||
# This appears to be a GCC-15 TSAN instrumentation issue with basic_stringbuf::str()
|
||||
# Commonly triggered in beast::Journal::ScopedStream destructor
|
||||
signal:std::__cxx11::basic_stringbuf
|
||||
signal:basic_stringbuf
|
||||
signal:basic_ostringstream
|
||||
|
||||
called_from_lib:libclang_rt
|
||||
race:ostreambuf_iterator
|
||||
race:basic_ostream
|
||||
|
||||
# Suppress SEGV in Boost ASIO memory allocation with GCC-15 TSAN
|
||||
signal:boost::asio::aligned_new
|
||||
signal:boost::asio::detail::memory
|
||||
|
||||
# Suppress SEGV in execute_native_thread_routine
|
||||
signal:execute_native_thread_routine
|
||||
|
||||
# Suppress data race in Boost Context fiber management
|
||||
# This is a false positive in Boost's exception state management during fiber context switching
|
||||
race:__cxxabiv1::manage_exception_state
|
||||
race:boost::context::fiber::resume
|
||||
race:boost::asio::detail::spawned_fiber_thread
|
||||
race:boost::asio::detail::spawned_fiber_thread::suspend_with
|
||||
race:boost::asio::detail::spawned_fiber_thread::destroy
|
||||
|
||||
# Suppress data race in __tsan_memcpy called from Boost fiber operations
|
||||
race:__tsan_memcpy
|
||||
|
||||
@@ -168,7 +168,7 @@ decode(void* dest, char const* src, std::size_t len)
|
||||
break;
|
||||
++in;
|
||||
c4[i] = v;
|
||||
if (++i; i == 4)
|
||||
if (++i == 4)
|
||||
{
|
||||
c3[0] = (c4[0] << 2) + ((c4[1] & 0x30) >> 4);
|
||||
c3[1] = ((c4[1] & 0xf) << 4) + ((c4[2] & 0x3c) >> 2);
|
||||
|
||||
@@ -729,18 +729,21 @@ Reader::decodeUnicodeCodePoint(Token& token, Location& current, Location end, un
|
||||
|
||||
unsigned int surrogatePair;
|
||||
|
||||
if (*current != '\\' || *(current + 1) != 'u')
|
||||
if (*(current++) == '\\' && *(current++) == 'u')
|
||||
{
|
||||
if (decodeUnicodeEscapeSequence(token, current, end, surrogatePair))
|
||||
{
|
||||
unicode = 0x10000 + ((unicode & 0x3FF) << 10) + (surrogatePair & 0x3FF);
|
||||
}
|
||||
else
|
||||
return false;
|
||||
}
|
||||
else
|
||||
return addError(
|
||||
"expecting another \\u token to begin the second half of a unicode surrogate pair",
|
||||
"expecting another \\u token to begin the second half of a "
|
||||
"unicode surrogate pair",
|
||||
token,
|
||||
current);
|
||||
|
||||
current += 2; // skip two characters checked above
|
||||
|
||||
if (!decodeUnicodeEscapeSequence(token, current, end, surrogatePair))
|
||||
return false;
|
||||
|
||||
unicode = 0x10000 + ((unicode & 0x3FF) << 10) + (surrogatePair & 0x3FF);
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
@@ -319,7 +319,7 @@ StyledWriter::writeValue(Value const& value)
|
||||
document_ += " : ";
|
||||
writeValue(childValue);
|
||||
|
||||
if (++it; it == members.end())
|
||||
if (++it == members.end())
|
||||
break;
|
||||
|
||||
document_ += ",";
|
||||
|
||||
@@ -74,10 +74,8 @@ BookDirs::const_iterator::operator++()
|
||||
XRPL_ASSERT(index_ != zero, "xrpl::BookDirs::const_iterator::operator++ : nonzero index");
|
||||
if (!cdirNext(*view_, cur_key_, sle_, entry_, index_))
|
||||
{
|
||||
if (index_ == 0)
|
||||
cur_key_ = view_->succ(++cur_key_, next_quality_).value_or(zero);
|
||||
|
||||
if (index_ != 0 || cur_key_ == zero)
|
||||
if (index_ != 0 ||
|
||||
(cur_key_ = view_->succ(++cur_key_, next_quality_).value_or(zero)) == zero)
|
||||
{
|
||||
cur_key_ = key_;
|
||||
entry_ = 0;
|
||||
@@ -86,7 +84,9 @@ BookDirs::const_iterator::operator++()
|
||||
else if (!cdirFirst(*view_, cur_key_, sle_, entry_, index_))
|
||||
{
|
||||
// LCOV_EXCL_START
|
||||
UNREACHABLE("xrpl::BookDirs::const_iterator::operator++ : directory is empty");
|
||||
UNREACHABLE(
|
||||
"xrpl::BookDirs::const_iterator::operator++ : directory is "
|
||||
"empty");
|
||||
// LCOV_EXCL_STOP
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,9 +23,6 @@ STBase::STBase(SField const& n) : fName(&n)
|
||||
STBase&
|
||||
STBase::operator=(STBase const& t)
|
||||
{
|
||||
if (this == &t)
|
||||
return *this;
|
||||
|
||||
if (!fName->isUseful())
|
||||
fName = t.fName;
|
||||
return *this;
|
||||
|
||||
@@ -39,9 +39,6 @@ Consumer::~Consumer()
|
||||
Consumer&
|
||||
Consumer::operator=(Consumer const& other)
|
||||
{
|
||||
if (this == &other)
|
||||
return *this;
|
||||
|
||||
// remove old ref
|
||||
if (m_logic && m_entry)
|
||||
m_logic->release(*m_entry);
|
||||
|
||||
@@ -178,7 +178,7 @@ SHAMap::finishFetch(SHAMapHash const& hash, std::shared_ptr<NodeObject> const& o
|
||||
|
||||
// See if a sync filter has a node
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode>
|
||||
SHAMap::checkFilter(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
|
||||
SHAMap::checkFilter(SHAMapHash const& hash, SHAMapSyncFilter const* filter) const
|
||||
{
|
||||
if (auto nodeData = filter->getNode(hash))
|
||||
{
|
||||
@@ -204,7 +204,7 @@ SHAMap::checkFilter(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
|
||||
// Get a node without throwing
|
||||
// Used on maps where missing nodes are expected
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode>
|
||||
SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
|
||||
SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter const* filter) const
|
||||
{
|
||||
auto node = cacheLookup(hash);
|
||||
if (node)
|
||||
@@ -317,7 +317,7 @@ SHAMap::descend(
|
||||
SHAMapInnerNode* parent,
|
||||
SHAMapNodeID const& parentID,
|
||||
int branch,
|
||||
SHAMapSyncFilter* filter) const
|
||||
SHAMapSyncFilter const* filter) const
|
||||
{
|
||||
XRPL_ASSERT(parent->isInner(), "xrpl::SHAMap::descend : valid parent input");
|
||||
XRPL_ASSERT(
|
||||
@@ -346,7 +346,7 @@ SHAMapTreeNode*
|
||||
SHAMap::descendAsync(
|
||||
SHAMapInnerNode* parent,
|
||||
int branch,
|
||||
SHAMapSyncFilter* filter,
|
||||
SHAMapSyncFilter const* filter,
|
||||
bool& pending,
|
||||
descendCallback&& callback) const
|
||||
{
|
||||
|
||||
@@ -118,7 +118,9 @@ selectBranch(SHAMapNodeID const& id, uint256 const& hash)
|
||||
SHAMapNodeID
|
||||
SHAMapNodeID::createID(int depth, uint256 const& key)
|
||||
{
|
||||
XRPL_ASSERT((depth >= 0) && (depth < 65), "xrpl::SHAMapNodeID::createID : valid branch input");
|
||||
XRPL_ASSERT(
|
||||
depth >= 0 && depth <= SHAMap::leafDepth,
|
||||
"xrpl::SHAMapNodeID::createID : valid branch input");
|
||||
return SHAMapNodeID(depth, key & depthMask(depth));
|
||||
}
|
||||
|
||||
|
||||
@@ -386,7 +386,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter)
|
||||
bool
|
||||
SHAMap::getNodeFat(
|
||||
SHAMapNodeID const& wanted,
|
||||
std::vector<std::pair<SHAMapNodeID, Blob>>& data,
|
||||
std::vector<std::tuple<SHAMapNodeID, Blob, bool>>& data,
|
||||
bool fatLeaves,
|
||||
std::uint32_t depth) const
|
||||
{
|
||||
@@ -432,7 +432,7 @@ SHAMap::getNodeFat(
|
||||
// Add this node to the reply
|
||||
s.erase();
|
||||
node->serializeForWire(s);
|
||||
data.emplace_back(std::make_pair(nodeID, s.getData()));
|
||||
data.emplace_back(std::make_tuple(nodeID, s.getData(), node->isLeaf()));
|
||||
|
||||
if (node->isInner())
|
||||
{
|
||||
@@ -462,7 +462,8 @@ SHAMap::getNodeFat(
|
||||
// Just include this node
|
||||
s.erase();
|
||||
childNode->serializeForWire(s);
|
||||
data.emplace_back(std::make_pair(childID, s.getData()));
|
||||
data.emplace_back(
|
||||
std::make_tuple(childID, s.getData(), childNode->isLeaf()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -480,8 +481,18 @@ SHAMap::serializeRoot(Serializer& s) const
|
||||
}
|
||||
|
||||
SHAMapAddNode
|
||||
SHAMap::addRootNode(SHAMapHash const& hash, Slice const& rootNode, SHAMapSyncFilter* filter)
|
||||
SHAMap::addRootNode(
|
||||
SHAMapHash const& hash,
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode> rootNode,
|
||||
SHAMapSyncFilter const* filter)
|
||||
{
|
||||
XRPL_ASSERT(rootNode, "xrpl::SHAMap::addRootNode : non-null root node");
|
||||
if (!rootNode)
|
||||
{
|
||||
JLOG(journal_.error()) << "Null node received";
|
||||
return SHAMapAddNode::invalid();
|
||||
}
|
||||
|
||||
// we already have a root_ node
|
||||
if (root_->getHash().isNonZero())
|
||||
{
|
||||
@@ -491,14 +502,16 @@ SHAMap::addRootNode(SHAMapHash const& hash, Slice const& rootNode, SHAMapSyncFil
|
||||
}
|
||||
|
||||
XRPL_ASSERT(cowid_ >= 1, "xrpl::SHAMap::addRootNode : valid cowid");
|
||||
auto node = SHAMapTreeNode::makeFromWire(rootNode);
|
||||
if (!node || node->getHash() != hash)
|
||||
if (rootNode->getHash() != hash)
|
||||
{
|
||||
JLOG(journal_.warn()) << "Corrupt node received";
|
||||
return SHAMapAddNode::invalid();
|
||||
}
|
||||
|
||||
if (backed_)
|
||||
canonicalize(hash, node);
|
||||
canonicalize(hash, rootNode);
|
||||
|
||||
root_ = node;
|
||||
root_ = std::move(rootNode);
|
||||
|
||||
if (root_->isLeaf())
|
||||
clearSynching();
|
||||
@@ -515,9 +528,18 @@ SHAMap::addRootNode(SHAMapHash const& hash, Slice const& rootNode, SHAMapSyncFil
|
||||
}
|
||||
|
||||
SHAMapAddNode
|
||||
SHAMap::addKnownNode(SHAMapNodeID const& node, Slice const& rawNode, SHAMapSyncFilter* filter)
|
||||
SHAMap::addKnownNode(
|
||||
SHAMapNodeID const& nodeID,
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode> treeNode,
|
||||
SHAMapSyncFilter const* filter)
|
||||
{
|
||||
XRPL_ASSERT(!node.isRoot(), "xrpl::SHAMap::addKnownNode : valid node input");
|
||||
XRPL_ASSERT(!nodeID.isRoot(), "xrpl::SHAMap::addKnownNode : valid node input");
|
||||
XRPL_ASSERT(treeNode, "xrpl::SHAMap::addKnownNode : non-null tree node");
|
||||
if (!treeNode)
|
||||
{
|
||||
JLOG(journal_.error()) << "Null node received";
|
||||
return SHAMapAddNode::invalid();
|
||||
}
|
||||
|
||||
if (!isSynching())
|
||||
{
|
||||
@@ -531,14 +553,14 @@ SHAMap::addKnownNode(SHAMapNodeID const& node, Slice const& rawNode, SHAMapSyncF
|
||||
|
||||
while (currNode->isInner() &&
|
||||
!static_cast<SHAMapInnerNode*>(currNode)->isFullBelow(generation) &&
|
||||
(currNodeID.getDepth() < node.getDepth()))
|
||||
(currNodeID.getDepth() < nodeID.getDepth()))
|
||||
{
|
||||
int const branch = selectBranch(currNodeID, node.getNodeID());
|
||||
int const branch = selectBranch(currNodeID, nodeID.getNodeID());
|
||||
XRPL_ASSERT(branch >= 0, "xrpl::SHAMap::addKnownNode : valid branch");
|
||||
auto inner = static_cast<SHAMapInnerNode*>(currNode);
|
||||
if (inner->isEmptyBranch(branch))
|
||||
{
|
||||
JLOG(journal_.warn()) << "Add known node for empty branch" << node;
|
||||
JLOG(journal_.warn()) << "Add known node for empty branch" << nodeID;
|
||||
return SHAMapAddNode::invalid();
|
||||
}
|
||||
|
||||
@@ -554,67 +576,44 @@ SHAMap::addKnownNode(SHAMapNodeID const& node, Slice const& rawNode, SHAMapSyncF
|
||||
if (currNode != nullptr)
|
||||
continue;
|
||||
|
||||
auto newNode = SHAMapTreeNode::makeFromWire(rawNode);
|
||||
|
||||
if (!newNode || childHash != newNode->getHash())
|
||||
if (childHash != treeNode->getHash())
|
||||
{
|
||||
JLOG(journal_.warn()) << "Corrupt node received";
|
||||
return SHAMapAddNode::invalid();
|
||||
}
|
||||
|
||||
// In rare cases, a node can still be corrupt even after hash
|
||||
// validation. For leaf nodes, we perform an additional check to
|
||||
// ensure the node's position in the tree is consistent with its
|
||||
// content to prevent inconsistencies that could
|
||||
// propagate further down the line.
|
||||
if (newNode->isLeaf())
|
||||
{
|
||||
auto const& actualKey =
|
||||
static_cast<SHAMapLeafNode const*>(newNode.get())->peekItem()->key();
|
||||
|
||||
// Validate that this leaf belongs at the target position
|
||||
auto const expectedNodeID = SHAMapNodeID::createID(node.getDepth(), actualKey);
|
||||
if (expectedNodeID.getNodeID() != node.getNodeID())
|
||||
{
|
||||
JLOG(journal_.debug())
|
||||
<< "Leaf node position mismatch: "
|
||||
<< "expected=" << expectedNodeID.getNodeID() << ", actual=" << node.getNodeID();
|
||||
return SHAMapAddNode::invalid();
|
||||
}
|
||||
}
|
||||
|
||||
// Inner nodes must be at a level strictly less than 64
|
||||
// but leaf nodes (while notionally at level 64) can be
|
||||
// at any depth up to and including 64:
|
||||
if ((currNodeID.getDepth() > leafDepth) ||
|
||||
(newNode->isInner() && currNodeID.getDepth() == leafDepth))
|
||||
(treeNode->isInner() && currNodeID.getDepth() == leafDepth))
|
||||
{
|
||||
// Map is provably invalid
|
||||
state_ = SHAMapState::Invalid;
|
||||
return SHAMapAddNode::useful();
|
||||
}
|
||||
|
||||
if (currNodeID != node)
|
||||
if (currNodeID != nodeID)
|
||||
{
|
||||
// Either this node is broken or we didn't request it (yet)
|
||||
JLOG(journal_.warn()) << "unable to hook node " << node;
|
||||
JLOG(journal_.warn()) << "unable to hook node " << nodeID;
|
||||
JLOG(journal_.info()) << " stuck at " << currNodeID;
|
||||
JLOG(journal_.info()) << "got depth=" << node.getDepth()
|
||||
JLOG(journal_.info()) << "got depth=" << nodeID.getDepth()
|
||||
<< ", walked to= " << currNodeID.getDepth();
|
||||
return SHAMapAddNode::useful();
|
||||
}
|
||||
|
||||
if (backed_)
|
||||
canonicalize(childHash, newNode);
|
||||
canonicalize(childHash, treeNode);
|
||||
|
||||
newNode = prevNode->canonicalizeChild(branch, std::move(newNode));
|
||||
treeNode = prevNode->canonicalizeChild(branch, std::move(treeNode));
|
||||
|
||||
if (filter)
|
||||
{
|
||||
Serializer s;
|
||||
newNode->serializeWithPrefix(s);
|
||||
treeNode->serializeWithPrefix(s);
|
||||
filter->gotNode(
|
||||
false, childHash, ledgerSeq_, std::move(s.modData()), newNode->getType());
|
||||
false, childHash, ledgerSeq_, std::move(s.modData()), treeNode->getType());
|
||||
}
|
||||
|
||||
return SHAMapAddNode::useful();
|
||||
|
||||
358
src/test/app/LedgerNodeHelpers_test.cpp
Normal file
358
src/test/app/LedgerNodeHelpers_test.cpp
Normal file
@@ -0,0 +1,358 @@
|
||||
#include <test/shamap/common.h>
|
||||
|
||||
#include <xrpld/app/ledger/detail/LedgerNodeHelpers.h>
|
||||
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
#include <xrpl/shamap/SHAMap.h>
|
||||
#include <xrpl/shamap/SHAMapAccountStateLeafNode.h>
|
||||
#include <xrpl/shamap/SHAMapInnerNode.h>
|
||||
#include <xrpl/shamap/SHAMapItem.h>
|
||||
#include <xrpl/shamap/SHAMapTreeNode.h>
|
||||
|
||||
#include <bit>
|
||||
|
||||
namespace xrpl {
|
||||
namespace tests {
|
||||
|
||||
class LedgerNodeHelpers_test : public beast::unit_test::suite
|
||||
{
|
||||
// Helper function to create a simple SHAMapItem for testing.
|
||||
static boost::intrusive_ptr<SHAMapItem>
|
||||
makeTestItem(std::uint32_t seed)
|
||||
{
|
||||
Serializer s;
|
||||
s.add32(seed);
|
||||
s.add32(seed + 1);
|
||||
s.add32(seed + 2);
|
||||
return make_shamapitem(s.getSHA512Half(), s.slice());
|
||||
}
|
||||
|
||||
// Helper function to serialize a tree node to wire format.
|
||||
static std::string
|
||||
serializeNode(intr_ptr::SharedPtr<SHAMapTreeNode> const& node)
|
||||
{
|
||||
Serializer s;
|
||||
node->serializeForWire(s);
|
||||
auto const slice = s.slice();
|
||||
return std::string(std::bit_cast<char const*>(slice.data()), slice.size());
|
||||
}
|
||||
|
||||
void
|
||||
testValidateLedgerNode()
|
||||
{
|
||||
// In the tests below the validity of the content of the node data and ID fields is not
|
||||
// checked - only that the fields have values when expected. The content of the fields is
|
||||
// verified in the other tests in this file.
|
||||
testcase("validateLedgerNode");
|
||||
|
||||
// Invalid: missing all fields.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
BEAST_EXPECT(!validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Invalid: missing `nodedata` field.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodeid("test_nodeid");
|
||||
BEAST_EXPECT(!validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Invalid: missing `nodedata` field.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_id("test_nodeid");
|
||||
BEAST_EXPECT(!validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Invalid: missing `nodedata` field.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_depth(1);
|
||||
BEAST_EXPECT(!validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Valid: legacy `nodeid` field.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata("test_data");
|
||||
node.set_nodeid("test_nodeid");
|
||||
BEAST_EXPECT(validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Invalid: has both legacy `nodeid` and new `id` fields.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata("test_data");
|
||||
node.set_nodeid("test_nodeid");
|
||||
node.set_id("test_nodeid");
|
||||
BEAST_EXPECT(!validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Invalid: has both legacy `nodeid` and new `depth` fields.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata("test_data");
|
||||
node.set_nodeid("test_nodeid");
|
||||
node.set_depth(5);
|
||||
BEAST_EXPECT(!validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Valid: new `id` field.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata("test_data");
|
||||
node.set_id("test_id");
|
||||
BEAST_EXPECT(validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Valid: new `depth` field.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata("test_data");
|
||||
node.set_depth(5);
|
||||
BEAST_EXPECT(validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Valid: `depth` at minimum depth.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata("test_data");
|
||||
node.set_depth(0);
|
||||
BEAST_EXPECT(validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Valid: `depth` at arbitrary depth between minimum and maximum.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata("test_data");
|
||||
node.set_depth(10);
|
||||
BEAST_EXPECT(validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Valid: `depth` at maximum depth.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata("test_data");
|
||||
node.set_depth(SHAMap::leafDepth);
|
||||
BEAST_EXPECT(validateLedgerNode(node));
|
||||
}
|
||||
|
||||
// Invalid: `depth` is greater than maximum depth.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata("test_data");
|
||||
node.set_depth(SHAMap::leafDepth + 1);
|
||||
BEAST_EXPECT(!validateLedgerNode(node));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
testGetTreeNode()
|
||||
{
|
||||
testcase("getTreeNode");
|
||||
|
||||
// Valid: inner node. It must have at least one child for `serializeNode` to work.
|
||||
{
|
||||
auto const innerNode = intr_ptr::make_shared<SHAMapInnerNode>(1);
|
||||
auto const childNode = intr_ptr::make_shared<SHAMapInnerNode>(1);
|
||||
innerNode->setChild(0, childNode);
|
||||
auto const innerData = serializeNode(innerNode);
|
||||
auto const result = getTreeNode(innerData);
|
||||
BEAST_EXPECT(result.has_value());
|
||||
BEAST_EXPECT((*result)->isInner());
|
||||
}
|
||||
|
||||
// Valid: leaf node.
|
||||
{
|
||||
auto const leafItem = makeTestItem(12345);
|
||||
auto const leafNode =
|
||||
intr_ptr::make_shared<SHAMapAccountStateLeafNode>(std::move(leafItem), 1);
|
||||
auto const leafData = serializeNode(leafNode);
|
||||
auto result = getTreeNode(leafData);
|
||||
BEAST_EXPECT(result.has_value());
|
||||
BEAST_EXPECT((*result)->isLeaf());
|
||||
}
|
||||
|
||||
// Invalid: empty data.
|
||||
{
|
||||
auto const result = getTreeNode("");
|
||||
BEAST_EXPECT(!result.has_value());
|
||||
}
|
||||
|
||||
// Invalid: garbage data.
|
||||
{
|
||||
auto const result = getTreeNode("invalid");
|
||||
BEAST_EXPECT(!result.has_value());
|
||||
}
|
||||
|
||||
// Invalid: truncated data.
|
||||
{
|
||||
auto const leafItem = makeTestItem(54321);
|
||||
auto const leafNode =
|
||||
intr_ptr::make_shared<SHAMapAccountStateLeafNode>(std::move(leafItem), 1);
|
||||
// Truncate the data to trigger an exception in SHAMapTreeNode::makeAccountState when
|
||||
// the data is used to deserialize the node.
|
||||
uint256 tag;
|
||||
auto const leafData = serializeNode(leafNode).substr(0, tag.bytes - 1);
|
||||
auto const result = getTreeNode(leafData);
|
||||
BEAST_EXPECT(!result.has_value());
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
testGetSHAMapNodeID()
|
||||
{
|
||||
testcase("getSHAMapNodeID");
|
||||
|
||||
{
|
||||
// Tests using inner nodes at various depths.
|
||||
auto const innerNode = intr_ptr::make_shared<SHAMapInnerNode>(1);
|
||||
auto const childNode = intr_ptr::make_shared<SHAMapInnerNode>(1);
|
||||
innerNode->setChild(0, childNode);
|
||||
auto const innerData = serializeNode(innerNode);
|
||||
|
||||
// Valid: legacy `nodeid` field at arbitrary depth.
|
||||
{
|
||||
auto const innerDepth = 3;
|
||||
auto const innerID = SHAMapNodeID::createID(innerDepth, uint256{});
|
||||
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata(innerData);
|
||||
node.set_nodeid(innerID.getRawString());
|
||||
auto const result = getSHAMapNodeID(node, innerNode);
|
||||
BEAST_EXPECT(result.has_value());
|
||||
BEAST_EXPECT(*result == innerID);
|
||||
}
|
||||
|
||||
// Valid: new `id` field at minimum depth.
|
||||
{
|
||||
auto const innerDepth = 0;
|
||||
auto const innerID = SHAMapNodeID::createID(innerDepth, uint256{});
|
||||
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata(innerData);
|
||||
node.set_id(innerID.getRawString());
|
||||
auto const result = getSHAMapNodeID(node, innerNode);
|
||||
BEAST_EXPECT(result.has_value());
|
||||
BEAST_EXPECT(*result == innerID);
|
||||
}
|
||||
|
||||
// Invalid: new `depth` field should not be used for inner nodes.
|
||||
{
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata(innerData);
|
||||
node.set_depth(10);
|
||||
auto const result = getSHAMapNodeID(node, innerNode);
|
||||
BEAST_EXPECT(!result.has_value());
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Tests using leaf nodes at various depths.
|
||||
auto const leafItem = makeTestItem(12345);
|
||||
auto const leafNode = intr_ptr::make_shared<SHAMapAccountStateLeafNode>(leafItem, 1);
|
||||
auto const leafData = serializeNode(leafNode);
|
||||
auto const leafKey = leafItem->key();
|
||||
|
||||
// Valid: legacy `nodeid` field at arbitrary depth.
|
||||
{
|
||||
auto const leafDepth = 5;
|
||||
auto const leafID = SHAMapNodeID::createID(leafDepth, leafKey);
|
||||
|
||||
protocol::TMLedgerNode ledgerNode;
|
||||
ledgerNode.set_nodedata(leafData);
|
||||
ledgerNode.set_nodeid(leafID.getRawString());
|
||||
auto result = getSHAMapNodeID(ledgerNode, leafNode);
|
||||
BEAST_EXPECT(result.has_value());
|
||||
BEAST_EXPECT(*result == leafID);
|
||||
}
|
||||
|
||||
// Invalid: new `id` field should not be used for leaf nodes.
|
||||
{
|
||||
auto const leafDepth = 5;
|
||||
auto const leafID = SHAMapNodeID::createID(leafDepth, leafKey);
|
||||
|
||||
protocol::TMLedgerNode ledgerNode;
|
||||
ledgerNode.set_nodedata(leafData);
|
||||
ledgerNode.set_id(leafID.getRawString());
|
||||
auto result = getSHAMapNodeID(ledgerNode, leafNode);
|
||||
BEAST_EXPECT(!result.has_value());
|
||||
}
|
||||
|
||||
// Valid: new `depth` field at minimum depth.
|
||||
{
|
||||
auto const leafDepth = 0;
|
||||
auto const leafID = SHAMapNodeID::createID(leafDepth, leafKey);
|
||||
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata(leafData);
|
||||
node.set_depth(leafDepth);
|
||||
auto result = getSHAMapNodeID(node, leafNode);
|
||||
BEAST_EXPECT(result.has_value());
|
||||
BEAST_EXPECT(*result == leafID);
|
||||
}
|
||||
|
||||
// Valid: new `depth` field at arbitrary depth between minimum and maximum.
|
||||
{
|
||||
auto const leafDepth = 10;
|
||||
auto const leafID = SHAMapNodeID::createID(leafDepth, leafKey);
|
||||
|
||||
protocol::TMLedgerNode ledgerNode;
|
||||
ledgerNode.set_nodedata(leafData);
|
||||
ledgerNode.set_depth(leafDepth);
|
||||
auto result = getSHAMapNodeID(ledgerNode, leafNode);
|
||||
BEAST_EXPECT(result.has_value());
|
||||
BEAST_EXPECT(*result == leafID);
|
||||
}
|
||||
|
||||
// Valid: new `depth` field at maximum depth.
|
||||
// Note that we do not test a depth greater than the maximum depth, because the proto
|
||||
// message is assumed to have been validated by the time the getSHAMapNodeID function is
|
||||
// called.
|
||||
{
|
||||
auto const leafDepth = SHAMap::leafDepth;
|
||||
auto const leafID = SHAMapNodeID::createID(leafDepth, leafKey);
|
||||
|
||||
protocol::TMLedgerNode node;
|
||||
node.set_nodedata(leafData);
|
||||
node.set_depth(leafDepth);
|
||||
auto result = getSHAMapNodeID(node, leafNode);
|
||||
BEAST_EXPECT(result.has_value());
|
||||
BEAST_EXPECT(*result == leafID);
|
||||
}
|
||||
|
||||
// Invalid: legacy `nodeid` field where the node ID is inconsistent with the key.
|
||||
{
|
||||
auto const otherItem = makeTestItem(54321);
|
||||
auto const otherNode =
|
||||
intr_ptr::make_shared<SHAMapAccountStateLeafNode>(otherItem, 1);
|
||||
auto const otherData = serializeNode(otherNode);
|
||||
auto const otherKey = otherItem->key();
|
||||
auto const otherDepth = 1;
|
||||
auto const otherID = SHAMapNodeID::createID(otherDepth, otherKey);
|
||||
|
||||
protocol::TMLedgerNode ledgerNode;
|
||||
ledgerNode.set_nodedata(otherData);
|
||||
ledgerNode.set_nodeid(otherID.getRawString());
|
||||
auto result = getSHAMapNodeID(ledgerNode, leafNode);
|
||||
BEAST_EXPECT(!result.has_value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testValidateLedgerNode();
|
||||
testGetTreeNode();
|
||||
testGetSHAMapNodeID();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(LedgerNodeHelpers, app, xrpl);
|
||||
|
||||
} // namespace tests
|
||||
} // namespace xrpl
|
||||
@@ -8,7 +8,6 @@
|
||||
#include <xrpld/rpc/detail/Tuning.h>
|
||||
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/json/json_reader.h>
|
||||
#include <xrpl/protocol/ApiVersion.h>
|
||||
@@ -132,6 +131,7 @@ public:
|
||||
c,
|
||||
Role::USER,
|
||||
{},
|
||||
{},
|
||||
RPC::apiVersionIfUnspecified},
|
||||
{},
|
||||
{}};
|
||||
@@ -155,11 +155,11 @@ public:
|
||||
|
||||
Json::Value result;
|
||||
gate g;
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
context.params = std::move(params);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
@@ -240,27 +240,28 @@ public:
|
||||
c,
|
||||
Role::USER,
|
||||
{},
|
||||
{},
|
||||
RPC::apiVersionIfUnspecified},
|
||||
{},
|
||||
{}};
|
||||
Json::Value result;
|
||||
gate g;
|
||||
// Test RPC::Tuning::max_src_cur source currencies.
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(!result.isMember(jss::error));
|
||||
|
||||
// Test more than RPC::Tuning::max_src_cur source currencies.
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur + 1);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(result.isMember(jss::error));
|
||||
@@ -268,22 +269,22 @@ public:
|
||||
// Test RPC::Tuning::max_auto_src_cur source currencies.
|
||||
for (auto i = 0; i < (RPC::Tuning::max_auto_src_cur - 1); ++i)
|
||||
env.trust(Account("alice")[std::to_string(i + 100)](100), "bob");
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
context.params = rpf(Account("alice"), Account("bob"), 0);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(!result.isMember(jss::error));
|
||||
|
||||
// Test more than RPC::Tuning::max_auto_src_cur source currencies.
|
||||
env.trust(Account("alice")["AUD"](100), "bob");
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
context.params = rpf(Account("alice"), Account("bob"), 0);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(result.isMember(jss::error));
|
||||
|
||||
@@ -1,574 +0,0 @@
|
||||
#include <test/jtx.h>
|
||||
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/core/JobQueueAwaiter.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
|
||||
namespace xrpl {
|
||||
namespace test {
|
||||
|
||||
/**
|
||||
* Test suite for the C++20 coroutine primitives: CoroTask, CoroTaskRunner,
|
||||
* and JobQueueAwaiter.
|
||||
*
|
||||
* Dependency Diagram
|
||||
* ==================
|
||||
*
|
||||
* CoroTask_test
|
||||
* +-------------------------------------------------+
|
||||
* | + gate (inner class) : condition_variable helper |
|
||||
* +-------------------------------------------------+
|
||||
* | uses
|
||||
* v
|
||||
* jtx::Env --> JobQueue::postCoroTask()
|
||||
* |
|
||||
* +-- CoroTaskRunner (suspend / post / resume)
|
||||
* +-- CoroTask<void> / CoroTask<T>
|
||||
* +-- JobQueueAwaiter
|
||||
*
|
||||
* Test Coverage Matrix
|
||||
* ====================
|
||||
*
|
||||
* Test | Primitives exercised
|
||||
* --------------------------+----------------------------------------------
|
||||
* testVoidCompletion | CoroTask<void> basic lifecycle
|
||||
* testCorrectOrder | suspend() -> join() -> post() -> complete
|
||||
* testIncorrectOrder | post() before suspend() (race-safe path)
|
||||
* testJobQueueAwaiter | JobQueueAwaiter suspend + auto-repost
|
||||
* testThreadSpecificStorage | LocalValue isolation across coroutines
|
||||
* testExceptionPropagation | unhandled_exception() in promise_type
|
||||
* testMultipleYields | N sequential suspend/resume cycles
|
||||
* testValueReturn | CoroTask<T> co_return value
|
||||
* testValueException | CoroTask<T> exception via co_await
|
||||
* testValueChaining | nested CoroTask<T> -> CoroTask<T>
|
||||
* testShutdownRejection | postCoroTask returns nullptr when stopping
|
||||
* testExpectEarlyExit | expectEarlyExit() with finished_ == false
|
||||
*/
|
||||
class CoroTask_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Simple one-shot gate for synchronizing between test thread
|
||||
* and coroutine worker threads. signal() sets the flag;
|
||||
* wait_for() blocks until signaled or timeout.
|
||||
*/
|
||||
class gate
|
||||
{
|
||||
private:
|
||||
std::condition_variable cv_;
|
||||
std::mutex mutex_;
|
||||
bool signaled_ = false;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Block until signaled or timeout expires.
|
||||
*
|
||||
* @param rel_time Maximum duration to wait
|
||||
*
|
||||
* @return true if signaled before timeout
|
||||
*/
|
||||
template <class Rep, class Period>
|
||||
bool
|
||||
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex_);
|
||||
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
|
||||
signaled_ = false;
|
||||
return b;
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal the gate, waking any waiting thread.
|
||||
*/
|
||||
void
|
||||
signal()
|
||||
{
|
||||
std::lock_guard lk(mutex_);
|
||||
signaled_ = true;
|
||||
cv_.notify_all();
|
||||
}
|
||||
};
|
||||
|
||||
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
|
||||
// pointer-by-value captures instead of [&] to work around a GCC 14
|
||||
// bug where reference captures in coroutine lambdas are corrupted
|
||||
// in the coroutine frame.
|
||||
|
||||
/**
|
||||
* CoroTask<void> runs to completion and runner becomes non-runnable.
|
||||
*/
|
||||
void
|
||||
testVoidCompletion()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("void completion");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* Correct order: suspend, join, post, complete.
|
||||
* Mirrors existing Coroutine_test::correct_order.
|
||||
*/
|
||||
void
|
||||
testCorrectOrder()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("correct order");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g1, g2;
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> r;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[rp = &r, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
|
||||
*rp = runner;
|
||||
g1p->signal();
|
||||
co_await runner->suspend();
|
||||
g2p->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g1.wait_for(5s));
|
||||
runner->join();
|
||||
runner->post();
|
||||
BEAST_EXPECT(g2.wait_for(5s));
|
||||
runner->join();
|
||||
}
|
||||
|
||||
/**
|
||||
* Incorrect order: post() before suspend(). Verifies the
|
||||
* race-safe path. Mirrors Coroutine_test::incorrect_order.
|
||||
*/
|
||||
void
|
||||
testIncorrectOrder()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("incorrect order");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto runner) -> CoroTask<void> {
|
||||
runner->post();
|
||||
co_await runner->suspend();
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
}
|
||||
|
||||
/**
|
||||
* JobQueueAwaiter suspend + auto-repost across multiple yield points.
|
||||
*/
|
||||
void
|
||||
testJobQueueAwaiter()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("JobQueueAwaiter");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int step = 0;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [sp = &step, gp = &g](auto runner) -> CoroTask<void> {
|
||||
*sp = 1;
|
||||
co_await runner->yieldAndPost();
|
||||
*sp = 2;
|
||||
co_await runner->yieldAndPost();
|
||||
*sp = 3;
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(step == 3);
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-coroutine LocalValue isolation. Each coroutine sees its own
|
||||
* copy of thread-local state. Mirrors Coroutine_test::thread_specific_storage.
|
||||
*/
|
||||
void
|
||||
testThreadSpecificStorage()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("thread specific storage");
|
||||
Env env(*this);
|
||||
|
||||
auto& jq = env.app().getJobQueue();
|
||||
|
||||
static int const N = 4;
|
||||
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
|
||||
|
||||
LocalValue<int> lv(-1);
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
|
||||
gate g;
|
||||
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
|
||||
this->BEAST_EXPECT(*lv == -1);
|
||||
*lv = -2;
|
||||
this->BEAST_EXPECT(*lv == -2);
|
||||
g.signal();
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
jq.postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
|
||||
(*ap)[id] = runner;
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
|
||||
this->BEAST_EXPECT(**lvp == -1);
|
||||
**lvp = id;
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
a[i]->join();
|
||||
}
|
||||
for (auto const& r : a)
|
||||
{
|
||||
r->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
r->join();
|
||||
}
|
||||
for (auto const& r : a)
|
||||
{
|
||||
r->post();
|
||||
r->join();
|
||||
}
|
||||
|
||||
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
|
||||
this->BEAST_EXPECT(*lv == -2);
|
||||
g.signal();
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Exception thrown in coroutine body is caught by
|
||||
* promise_type::unhandled_exception(). Coroutine completes.
|
||||
*/
|
||||
void
|
||||
testExceptionPropagation()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("exception propagation");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
|
||||
gp->signal();
|
||||
throw std::runtime_error("test exception");
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
// The exception is caught by promise_type::unhandled_exception()
|
||||
// and the coroutine is considered done
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* Multiple sequential suspend/resume cycles via co_await.
|
||||
*/
|
||||
void
|
||||
testMultipleYields()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("multiple yields");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int counter = 0;
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> r;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTaskTest",
|
||||
[rp = &r, cp = &counter, gp = &g](auto runner) -> CoroTask<void> {
|
||||
*rp = runner;
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
++(*cp);
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 1);
|
||||
runner->join();
|
||||
|
||||
runner->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 2);
|
||||
runner->join();
|
||||
|
||||
runner->post();
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
BEAST_EXPECT(counter == 3);
|
||||
runner->join();
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* CoroTask<T> returns a value via co_return. Outer coroutine
|
||||
* extracts it with co_await.
|
||||
*/
|
||||
void
|
||||
testValueReturn()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value return");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int result = 0;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
|
||||
auto inner = []() -> CoroTask<int> { co_return 42; };
|
||||
*rp = co_await inner();
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(result == 42);
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* CoroTask<T> propagates exceptions from inner coroutines.
|
||||
* Outer coroutine catches via try/catch around co_await.
|
||||
*/
|
||||
void
|
||||
testValueException()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value exception");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
bool caught = false;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [cp = &caught, gp = &g](auto) -> CoroTask<void> {
|
||||
auto inner = []() -> CoroTask<int> {
|
||||
throw std::runtime_error("inner error");
|
||||
co_return 0;
|
||||
};
|
||||
try
|
||||
{
|
||||
co_await inner();
|
||||
}
|
||||
catch (std::runtime_error const& e)
|
||||
{
|
||||
*cp = true;
|
||||
}
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(caught);
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* CoroTask<T> chaining. Nested value-returning coroutines
|
||||
* compose via co_await.
|
||||
*/
|
||||
void
|
||||
testValueChaining()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("value chaining");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
gate g;
|
||||
int result = 0;
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
|
||||
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
|
||||
auto mul = [add](int a, int b) -> CoroTask<int> {
|
||||
int sum = co_await add(a, b);
|
||||
co_return sum * 2;
|
||||
};
|
||||
*rp = co_await mul(3, 4);
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
BEAST_EXPECT(runner);
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
runner->join();
|
||||
BEAST_EXPECT(result == 14); // (3 + 4) * 2
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
/**
|
||||
* postCoroTask returns nullptr when JobQueue is stopping.
|
||||
*/
|
||||
void
|
||||
testShutdownRejection()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
|
||||
testcase("shutdown rejection");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
// Stop the JobQueue
|
||||
env.app().getJobQueue().stop();
|
||||
|
||||
auto runner = env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTaskTest", [](auto) -> CoroTask<void> { co_return; });
|
||||
BEAST_EXPECT(!runner);
|
||||
}
|
||||
|
||||
/**
|
||||
* Exercises expectEarlyExit() when the coroutine has never run
|
||||
* (finished_ is false). This covers the if-body that decrements
|
||||
* nSuspend_ and sets finished_ = true.
|
||||
*/
|
||||
void
|
||||
testExpectEarlyExit()
|
||||
{
|
||||
using namespace jtx;
|
||||
|
||||
testcase("expectEarlyExit with unfinished coroutine");
|
||||
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FORCE_MULTI_THREAD = true;
|
||||
return cfg;
|
||||
}));
|
||||
|
||||
auto& jq = env.app().getJobQueue();
|
||||
|
||||
// Create a runner directly (bypassing postCoroTask) so we can
|
||||
// control the lifecycle and exercise the early-exit path.
|
||||
auto runner = std::make_shared<JobQueue::CoroTaskRunner>(
|
||||
JobQueue::CoroTaskRunner::create_t{}, jq, jtCLIENT, "TestEarlyExit");
|
||||
runner->init([](auto) -> CoroTask<void> { co_return; });
|
||||
|
||||
// Simulate the nSuspend_ increment that postCoroTask normally does.
|
||||
runner->onSuspend();
|
||||
|
||||
// expectEarlyExit: finished_ is false, so the if-body runs
|
||||
// (decrements nSuspend_, sets finished_ = true, destroys frame).
|
||||
runner->expectEarlyExit();
|
||||
|
||||
BEAST_EXPECT(!runner->runnable());
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testVoidCompletion();
|
||||
testCorrectOrder();
|
||||
testIncorrectOrder();
|
||||
testJobQueueAwaiter();
|
||||
testThreadSpecificStorage();
|
||||
testExceptionPropagation();
|
||||
testMultipleYields();
|
||||
testValueReturn();
|
||||
testValueException();
|
||||
testValueChaining();
|
||||
testShutdownRejection();
|
||||
testExpectEarlyExit();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
|
||||
|
||||
} // namespace test
|
||||
} // namespace xrpl
|
||||
@@ -40,11 +40,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
|
||||
// pointer-by-value captures instead of [&] to work around a GCC 14
|
||||
// bug where reference captures in coroutine lambdas are corrupted
|
||||
// in the coroutine frame.
|
||||
|
||||
void
|
||||
correct_order()
|
||||
{
|
||||
@@ -59,15 +54,13 @@ public:
|
||||
}));
|
||||
|
||||
gate g1, g2;
|
||||
std::shared_ptr<JobQueue::CoroTaskRunner> c;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTest", [cp = &c, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
|
||||
*cp = runner;
|
||||
g1p->signal();
|
||||
co_await runner->suspend();
|
||||
g2p->signal();
|
||||
co_return;
|
||||
});
|
||||
std::shared_ptr<JobQueue::Coro> c;
|
||||
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& cr) {
|
||||
c = cr;
|
||||
g1.signal();
|
||||
c->yield();
|
||||
g2.signal();
|
||||
});
|
||||
BEAST_EXPECT(g1.wait_for(5s));
|
||||
c->join();
|
||||
c->post();
|
||||
@@ -88,17 +81,11 @@ public:
|
||||
}));
|
||||
|
||||
gate g;
|
||||
env.app().getJobQueue().postCoroTask(
|
||||
jtCLIENT, "CoroTest", [gp = &g](auto runner) -> CoroTask<void> {
|
||||
// Schedule a resume before suspending. The posted job
|
||||
// cannot actually call resume() until the current resume()
|
||||
// releases CoroTaskRunner::mutex_, which only happens after
|
||||
// the coroutine suspends at co_await.
|
||||
runner->post();
|
||||
co_await runner->suspend();
|
||||
gp->signal();
|
||||
co_return;
|
||||
});
|
||||
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& c) {
|
||||
c->post();
|
||||
c->yield();
|
||||
g.signal();
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
}
|
||||
|
||||
@@ -114,7 +101,7 @@ public:
|
||||
auto& jq = env.app().getJobQueue();
|
||||
|
||||
static int const N = 4;
|
||||
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
|
||||
std::array<std::shared_ptr<JobQueue::Coro>, N> a;
|
||||
|
||||
LocalValue<int> lv(-1);
|
||||
BEAST_EXPECT(*lv == -1);
|
||||
@@ -131,23 +118,19 @@ public:
|
||||
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
jq.postCoroTask(
|
||||
jtCLIENT,
|
||||
"CoroTest",
|
||||
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
|
||||
(*ap)[id] = runner;
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
jq.postCoro(jtCLIENT, "CoroTest", [&, id = i](auto const& c) {
|
||||
a[id] = c;
|
||||
g.signal();
|
||||
c->yield();
|
||||
|
||||
this->BEAST_EXPECT(**lvp == -1);
|
||||
**lvp = id;
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
gp->signal();
|
||||
co_await runner->suspend();
|
||||
this->BEAST_EXPECT(*lv == -1);
|
||||
*lv = id;
|
||||
this->BEAST_EXPECT(*lv == id);
|
||||
g.signal();
|
||||
c->yield();
|
||||
|
||||
this->BEAST_EXPECT(**lvp == id);
|
||||
co_return;
|
||||
});
|
||||
this->BEAST_EXPECT(*lv == id);
|
||||
});
|
||||
BEAST_EXPECT(g.wait_for(5s));
|
||||
a[i]->join();
|
||||
}
|
||||
|
||||
@@ -43,91 +43,87 @@ class JobQueue_test : public beast::unit_test::suite
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
|
||||
// pointer-by-value captures instead of [&] to work around a GCC 14
|
||||
// bug where reference captures in coroutine lambdas are corrupted
|
||||
// in the coroutine frame.
|
||||
|
||||
void
|
||||
testPostCoroTask()
|
||||
testPostCoro()
|
||||
{
|
||||
jtx::Env env{*this};
|
||||
|
||||
JobQueue& jQueue = env.app().getJobQueue();
|
||||
{
|
||||
// Test repeated post()s until the coroutine completes.
|
||||
// Test repeated post()s until the Coro completes.
|
||||
std::atomic<int> yieldCount{0};
|
||||
auto const runner = jQueue.postCoroTask(
|
||||
jtCLIENT, "PostCoroTest1", [ycp = &yieldCount](auto runner) -> CoroTask<void> {
|
||||
while (++(*ycp) < 4)
|
||||
co_await runner->suspend();
|
||||
co_return;
|
||||
auto const coro = jQueue.postCoro(
|
||||
jtCLIENT,
|
||||
"PostCoroTest1",
|
||||
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
|
||||
while (++yieldCount < 4)
|
||||
coroCopy->yield();
|
||||
});
|
||||
BEAST_EXPECT(runner != nullptr);
|
||||
BEAST_EXPECT(coro != nullptr);
|
||||
|
||||
// Wait for the Job to run and yield.
|
||||
while (yieldCount == 0)
|
||||
;
|
||||
|
||||
// Now re-post until the CoroTaskRunner says it is done.
|
||||
// Now re-post until the Coro says it is done.
|
||||
int old = yieldCount;
|
||||
while (runner->runnable())
|
||||
while (coro->runnable())
|
||||
{
|
||||
BEAST_EXPECT(runner->post());
|
||||
BEAST_EXPECT(coro->post());
|
||||
while (old == yieldCount)
|
||||
{
|
||||
}
|
||||
runner->join();
|
||||
coro->join();
|
||||
BEAST_EXPECT(++old == yieldCount);
|
||||
}
|
||||
BEAST_EXPECT(yieldCount == 4);
|
||||
}
|
||||
{
|
||||
// Test repeated resume()s until the coroutine completes.
|
||||
// Test repeated resume()s until the Coro completes.
|
||||
int yieldCount{0};
|
||||
auto const runner = jQueue.postCoroTask(
|
||||
jtCLIENT, "PostCoroTest2", [ycp = &yieldCount](auto runner) -> CoroTask<void> {
|
||||
while (++(*ycp) < 4)
|
||||
co_await runner->suspend();
|
||||
co_return;
|
||||
auto const coro = jQueue.postCoro(
|
||||
jtCLIENT,
|
||||
"PostCoroTest2",
|
||||
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
|
||||
while (++yieldCount < 4)
|
||||
coroCopy->yield();
|
||||
});
|
||||
if (!runner)
|
||||
if (!coro)
|
||||
{
|
||||
// There's no good reason we should not get a runner, but we
|
||||
// There's no good reason we should not get a Coro, but we
|
||||
// can't continue without one.
|
||||
BEAST_EXPECT(false);
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for the Job to run and yield.
|
||||
runner->join();
|
||||
coro->join();
|
||||
|
||||
// Now resume until the CoroTaskRunner says it is done.
|
||||
// Now resume until the Coro says it is done.
|
||||
int old = yieldCount;
|
||||
while (runner->runnable())
|
||||
while (coro->runnable())
|
||||
{
|
||||
runner->resume(); // Resume runs synchronously on this thread.
|
||||
coro->resume(); // Resume runs synchronously on this thread.
|
||||
BEAST_EXPECT(++old == yieldCount);
|
||||
}
|
||||
BEAST_EXPECT(yieldCount == 4);
|
||||
}
|
||||
{
|
||||
// If the JobQueue is stopped, we should no
|
||||
// longer be able to post a coroutine (and calling postCoroTask()
|
||||
// should return nullptr).
|
||||
// longer be able to add a Coro (and calling postCoro() should
|
||||
// return false).
|
||||
using namespace std::chrono_literals;
|
||||
jQueue.stop();
|
||||
|
||||
// The coroutine should never run, so having it access this
|
||||
// The Coro should never run, so having the Coro access this
|
||||
// unprotected variable on the stack should be completely safe.
|
||||
// Not recommended for the faint of heart...
|
||||
bool unprotected;
|
||||
auto const runner = jQueue.postCoroTask(
|
||||
jtCLIENT, "PostCoroTest3", [up = &unprotected](auto) -> CoroTask<void> {
|
||||
*up = false;
|
||||
co_return;
|
||||
auto const coro = jQueue.postCoro(
|
||||
jtCLIENT, "PostCoroTest3", [&unprotected](std::shared_ptr<JobQueue::Coro> const&) {
|
||||
unprotected = false;
|
||||
});
|
||||
BEAST_EXPECT(runner == nullptr);
|
||||
BEAST_EXPECT(coro == nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,7 +132,7 @@ public:
|
||||
run() override
|
||||
{
|
||||
testAddJob();
|
||||
testPostCoroTask();
|
||||
testPostCoro();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -39,9 +39,9 @@ public:
|
||||
a = std::move(b);
|
||||
Account c(std::move(a));
|
||||
}
|
||||
Account("alice"); // NOLINT(bugprone-unused-raii)
|
||||
Account("alice", KeyType::secp256k1); // NOLINT(bugprone-unused-raii)
|
||||
Account("alice", KeyType::ed25519); // NOLINT(bugprone-unused-raii)
|
||||
Account("alice");
|
||||
Account("alice", KeyType::secp256k1);
|
||||
Account("alice", KeyType::ed25519);
|
||||
auto const gw = Account("gw");
|
||||
[](AccountID) {}(gw);
|
||||
auto const USD = gw["USD"];
|
||||
@@ -56,11 +56,11 @@ public:
|
||||
{
|
||||
using namespace jtx;
|
||||
|
||||
PrettyAmount(0); // NOLINT(bugprone-unused-raii)
|
||||
PrettyAmount(1); // NOLINT(bugprone-unused-raii)
|
||||
PrettyAmount(0u); // NOLINT(bugprone-unused-raii)
|
||||
PrettyAmount(1u); // NOLINT(bugprone-unused-raii)
|
||||
PrettyAmount(-1); // NOLINT(bugprone-unused-raii)
|
||||
PrettyAmount(0);
|
||||
PrettyAmount(1);
|
||||
PrettyAmount(0u);
|
||||
PrettyAmount(1u);
|
||||
PrettyAmount(-1);
|
||||
static_assert(!std::is_trivially_constructible<PrettyAmount, char>::value, "");
|
||||
static_assert(!std::is_trivially_constructible<PrettyAmount, unsigned char>::value, "");
|
||||
static_assert(!std::is_trivially_constructible<PrettyAmount, short>::value, "");
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
#include <xrpld/rpc/RPCHandler.h>
|
||||
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/protocol/ApiVersion.h>
|
||||
#include <xrpl/protocol/STParsedJSON.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
@@ -194,6 +193,7 @@ AMMTest::find_paths_request(
|
||||
c,
|
||||
Role::USER,
|
||||
{},
|
||||
{},
|
||||
RPC::apiVersionIfUnspecified},
|
||||
{},
|
||||
{}};
|
||||
@@ -215,11 +215,11 @@ AMMTest::find_paths_request(
|
||||
|
||||
Json::Value result;
|
||||
gate g;
|
||||
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
|
||||
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
|
||||
context.params = std::move(params);
|
||||
context.coro = coro;
|
||||
RPC::doCommand(context, result);
|
||||
g.signal();
|
||||
co_return;
|
||||
});
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -66,8 +66,8 @@ public:
|
||||
negotiateProtocolVersion("RTXP/1.2, XRPL/2.0, XRPL/2.1") == make_protocol(2, 1));
|
||||
BEAST_EXPECT(negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
|
||||
BEAST_EXPECT(
|
||||
negotiateProtocolVersion("RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
|
||||
make_protocol(2, 2));
|
||||
negotiateProtocolVersion("RTXP/1.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
|
||||
make_protocol(2, 3));
|
||||
BEAST_EXPECT(negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") == std::nullopt);
|
||||
BEAST_EXPECT(negotiateProtocolVersion("") == std::nullopt);
|
||||
}
|
||||
|
||||
@@ -103,14 +103,17 @@ public:
|
||||
destination.setSynching();
|
||||
|
||||
{
|
||||
std::vector<std::pair<SHAMapNodeID, Blob>> a;
|
||||
std::vector<std::tuple<SHAMapNodeID, Blob, bool>> a;
|
||||
|
||||
BEAST_EXPECT(source.getNodeFat(SHAMapNodeID(), a, rand_bool(eng_), rand_int(eng_, 2)));
|
||||
|
||||
unexpected(a.size() < 1, "NodeSize");
|
||||
|
||||
BEAST_EXPECT(destination.addRootNode(source.getHash(), makeSlice(a[0].second), nullptr)
|
||||
.isGood());
|
||||
auto node = SHAMapTreeNode::makeFromWire(makeSlice(std::get<1>(a[0])));
|
||||
if (!node)
|
||||
fail("", __FILE__, __LINE__);
|
||||
BEAST_EXPECT(
|
||||
destination.addRootNode(source.getHash(), std::move(node), nullptr).isGood());
|
||||
}
|
||||
|
||||
do
|
||||
@@ -124,7 +127,7 @@ public:
|
||||
break;
|
||||
|
||||
// get as many nodes as possible based on this information
|
||||
std::vector<std::pair<SHAMapNodeID, Blob>> b;
|
||||
std::vector<std::tuple<SHAMapNodeID, Blob, bool>> b;
|
||||
|
||||
for (auto& it : nodesMissing)
|
||||
{
|
||||
@@ -146,7 +149,10 @@ public:
|
||||
// Don't use BEAST_EXPECT here b/c it will be called a
|
||||
// non-deterministic number of times and the number of tests run
|
||||
// should be deterministic
|
||||
if (!destination.addKnownNode(b[i].first, makeSlice(b[i].second), nullptr)
|
||||
auto node = SHAMapTreeNode::makeFromWire(makeSlice(std::get<1>(b[i])));
|
||||
if (!node)
|
||||
fail("", __FILE__, __LINE__);
|
||||
if (!destination.addKnownNode(std::get<0>(b[i]), std::move(node), nullptr)
|
||||
.isUseful())
|
||||
fail("", __FILE__, __LINE__);
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <string_view>
|
||||
#include <utility>
|
||||
|
||||
namespace xrpl {
|
||||
@@ -131,16 +132,16 @@ private:
|
||||
processData(std::shared_ptr<Peer> peer, protocol::TMLedgerData& data);
|
||||
|
||||
bool
|
||||
takeHeader(std::string const& data);
|
||||
takeHeader(std::string_view data);
|
||||
|
||||
void
|
||||
receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode&);
|
||||
receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san);
|
||||
|
||||
bool
|
||||
takeTxRootNode(Slice const& data, SHAMapAddNode&);
|
||||
takeTxRootNode(std::string_view data, SHAMapAddNode& san);
|
||||
|
||||
bool
|
||||
takeAsRootNode(Slice const& data, SHAMapAddNode&);
|
||||
takeAsRootNode(std::string_view data, SHAMapAddNode& san);
|
||||
|
||||
std::vector<uint256>
|
||||
neededTxHashes(int max, SHAMapSyncFilter* filter) const;
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
#include <xrpld/app/ledger/InboundLedgers.h>
|
||||
#include <xrpld/app/ledger/LedgerMaster.h>
|
||||
#include <xrpld/app/ledger/TransactionStateSF.h>
|
||||
#include <xrpld/app/ledger/detail/LedgerNodeHelpers.h>
|
||||
#include <xrpld/app/main/Application.h>
|
||||
#include <xrpld/overlay/Overlay.h>
|
||||
|
||||
@@ -754,7 +755,7 @@ InboundLedger::filterNodes(
|
||||
*/
|
||||
// data must not have hash prefix
|
||||
bool
|
||||
InboundLedger::takeHeader(std::string const& data)
|
||||
InboundLedger::takeHeader(std::string_view data)
|
||||
{
|
||||
// Return value: true=normal, false=bad data
|
||||
JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;
|
||||
@@ -839,21 +840,28 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
|
||||
{
|
||||
auto const f = filter.get();
|
||||
|
||||
for (auto const& node : packet.nodes())
|
||||
for (auto const& ledger_node : packet.nodes())
|
||||
{
|
||||
auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
|
||||
auto treeNode = getTreeNode(ledger_node.nodedata());
|
||||
if (!treeNode)
|
||||
{
|
||||
JLOG(journal_.warn()) << "Got invalid node data";
|
||||
san.incInvalid();
|
||||
return;
|
||||
}
|
||||
|
||||
auto const nodeID = getSHAMapNodeID(ledger_node, *treeNode);
|
||||
if (!nodeID)
|
||||
throw std::runtime_error("data does not properly deserialize");
|
||||
{
|
||||
JLOG(journal_.warn()) << "Got invalid node id";
|
||||
san.incInvalid();
|
||||
return;
|
||||
}
|
||||
|
||||
if (nodeID->isRoot())
|
||||
{
|
||||
san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
|
||||
}
|
||||
san += map.addRootNode(rootHash, std::move(*treeNode), f);
|
||||
else
|
||||
{
|
||||
san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
|
||||
}
|
||||
san += map.addKnownNode(*nodeID, std::move(*treeNode), f);
|
||||
|
||||
if (!san.isGood())
|
||||
{
|
||||
@@ -888,7 +896,7 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
|
||||
Call with a lock
|
||||
*/
|
||||
bool
|
||||
InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
|
||||
InboundLedger::takeAsRootNode(std::string_view data, SHAMapAddNode& san)
|
||||
{
|
||||
if (failed_ || mHaveState)
|
||||
{
|
||||
@@ -904,9 +912,17 @@ InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
|
||||
// LCOV_EXCL_STOP
|
||||
}
|
||||
|
||||
auto treeNode = getTreeNode(data);
|
||||
if (!treeNode)
|
||||
{
|
||||
JLOG(journal_.warn()) << "Got invalid node data";
|
||||
san.incInvalid();
|
||||
return false;
|
||||
}
|
||||
|
||||
AccountStateSF filter(mLedger->stateMap().family().db(), app_.getLedgerMaster());
|
||||
san +=
|
||||
mLedger->stateMap().addRootNode(SHAMapHash{mLedger->header().accountHash}, data, &filter);
|
||||
san += mLedger->stateMap().addRootNode(
|
||||
SHAMapHash{mLedger->header().accountHash}, std::move(*treeNode), &filter);
|
||||
return san.isGood();
|
||||
}
|
||||
|
||||
@@ -914,7 +930,7 @@ InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
|
||||
Call with a lock
|
||||
*/
|
||||
bool
|
||||
InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
|
||||
InboundLedger::takeTxRootNode(std::string_view data, SHAMapAddNode& san)
|
||||
{
|
||||
if (failed_ || mHaveTransactions)
|
||||
{
|
||||
@@ -930,8 +946,17 @@ InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
|
||||
// LCOV_EXCL_STOP
|
||||
}
|
||||
|
||||
auto treeNode = getTreeNode(data);
|
||||
if (!treeNode)
|
||||
{
|
||||
JLOG(journal_.warn()) << "Got invalid node data";
|
||||
san.incInvalid();
|
||||
return false;
|
||||
}
|
||||
|
||||
TransactionStateSF filter(mLedger->txMap().family().db(), app_.getLedgerMaster());
|
||||
san += mLedger->txMap().addRootNode(SHAMapHash{mLedger->header().txHash}, data, &filter);
|
||||
san += mLedger->txMap().addRootNode(
|
||||
SHAMapHash{mLedger->header().txHash}, std::move(*treeNode), &filter);
|
||||
return san.isGood();
|
||||
}
|
||||
|
||||
@@ -1028,13 +1053,13 @@ InboundLedger::processData(std::shared_ptr<Peer> peer, protocol::TMLedgerData& p
|
||||
}
|
||||
|
||||
if (!mHaveState && (packet.nodes().size() > 1) &&
|
||||
!takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
|
||||
!takeAsRootNode(packet.nodes(1).nodedata(), san))
|
||||
{
|
||||
JLOG(journal_.warn()) << "Included AS root invalid";
|
||||
}
|
||||
|
||||
if (!mHaveTransactions && (packet.nodes().size() > 2) &&
|
||||
!takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
|
||||
!takeTxRootNode(packet.nodes(2).nodedata(), san))
|
||||
{
|
||||
JLOG(journal_.warn()) << "Included TX root invalid";
|
||||
}
|
||||
@@ -1065,13 +1090,13 @@ InboundLedger::processData(std::shared_ptr<Peer> peer, protocol::TMLedgerData& p
|
||||
|
||||
ScopedLockType sl(mtx_);
|
||||
|
||||
// Verify node IDs and data are complete
|
||||
for (auto const& node : packet.nodes())
|
||||
// Verify nodes are complete
|
||||
for (auto const& ledger_node : packet.nodes())
|
||||
{
|
||||
if (!node.has_nodeid() || !node.has_nodedata())
|
||||
if (!validateLedgerNode(ledger_node))
|
||||
{
|
||||
JLOG(journal_.warn()) << "Got bad node";
|
||||
peer->charge(Resource::feeMalformedRequest, "ledger_data bad node");
|
||||
JLOG(journal_.warn()) << "Got malformed ledger node";
|
||||
peer->charge(Resource::feeMalformedRequest, "ledger_node");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#include <xrpld/app/ledger/InboundLedgers.h>
|
||||
#include <xrpld/app/ledger/LedgerMaster.h>
|
||||
#include <xrpld/app/ledger/detail/LedgerNodeHelpers.h>
|
||||
#include <xrpld/app/main/Application.h>
|
||||
|
||||
#include <xrpl/basics/DecayingSample.h>
|
||||
@@ -222,23 +223,21 @@ public:
|
||||
Serializer s;
|
||||
try
|
||||
{
|
||||
for (int i = 0; i < packet_ptr->nodes().size(); ++i)
|
||||
for (auto const& ledger_node : packet_ptr->nodes())
|
||||
{
|
||||
auto const& node = packet_ptr->nodes(i);
|
||||
|
||||
if (!node.has_nodeid() || !node.has_nodedata())
|
||||
if (!validateLedgerNode(ledger_node))
|
||||
return;
|
||||
|
||||
auto newNode = SHAMapTreeNode::makeFromWire(makeSlice(node.nodedata()));
|
||||
|
||||
if (!newNode)
|
||||
auto const treeNode = getTreeNode(ledger_node.nodedata());
|
||||
if (!treeNode)
|
||||
return;
|
||||
auto const tn = *treeNode;
|
||||
|
||||
s.erase();
|
||||
newNode->serializeWithPrefix(s);
|
||||
tn->serializeWithPrefix(s);
|
||||
|
||||
app_.getLedgerMaster().addFetchPack(
|
||||
newNode->getHash().as_uint256(), std::make_shared<Blob>(s.begin(), s.end()));
|
||||
tn->getHash().as_uint256(), std::make_shared<Blob>(s.begin(), s.end()));
|
||||
}
|
||||
}
|
||||
catch (std::exception const&) // NOLINT(bugprone-empty-catch)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#include <xrpld/app/ledger/InboundLedgers.h>
|
||||
#include <xrpld/app/ledger/InboundTransactions.h>
|
||||
#include <xrpld/app/ledger/detail/LedgerNodeHelpers.h>
|
||||
#include <xrpld/app/ledger/detail/TransactionAcquire.h>
|
||||
#include <xrpld/app/main/Application.h>
|
||||
|
||||
@@ -132,29 +133,38 @@ public:
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<std::pair<SHAMapNodeID, Slice>> data;
|
||||
std::vector<std::pair<SHAMapNodeID, intr_ptr::SharedPtr<SHAMapTreeNode>>> data;
|
||||
data.reserve(packet.nodes().size());
|
||||
|
||||
for (auto const& node : packet.nodes())
|
||||
for (auto const& ledger_node : packet.nodes())
|
||||
{
|
||||
if (!node.has_nodeid() || !node.has_nodedata())
|
||||
if (!validateLedgerNode(ledger_node))
|
||||
{
|
||||
peer->charge(Resource::feeMalformedRequest, "ledger_data");
|
||||
JLOG(j_.warn()) << "Got malformed ledger node";
|
||||
peer->charge(Resource::feeMalformedRequest, "ledger_node");
|
||||
return;
|
||||
}
|
||||
|
||||
auto const id = deserializeSHAMapNodeID(node.nodeid());
|
||||
|
||||
if (!id)
|
||||
auto treeNode = getTreeNode(ledger_node.nodedata());
|
||||
if (!treeNode)
|
||||
{
|
||||
peer->charge(Resource::feeInvalidData, "ledger_data");
|
||||
JLOG(j_.warn()) << "Got invalid node data";
|
||||
peer->charge(Resource::feeInvalidData, "node_data");
|
||||
return;
|
||||
}
|
||||
|
||||
data.emplace_back(std::make_pair(*id, makeSlice(node.nodedata())));
|
||||
auto const nodeID = getSHAMapNodeID(ledger_node, *treeNode);
|
||||
if (!nodeID)
|
||||
{
|
||||
JLOG(j_.warn()) << "Got invalid node id";
|
||||
peer->charge(Resource::feeInvalidData, "node_id");
|
||||
return;
|
||||
}
|
||||
|
||||
data.emplace_back(std::make_pair(*nodeID, std::move(*treeNode)));
|
||||
}
|
||||
|
||||
if (!ta->takeNodes(data, peer).isUseful())
|
||||
if (!ta->takeNodes(std::move(data), peer).isUseful())
|
||||
peer->charge(Resource::feeUselessData, "ledger_data not useful");
|
||||
}
|
||||
|
||||
|
||||
93
src/xrpld/app/ledger/detail/LedgerNodeHelpers.cpp
Normal file
93
src/xrpld/app/ledger/detail/LedgerNodeHelpers.cpp
Normal file
@@ -0,0 +1,93 @@
|
||||
#include <xrpld/app/ledger/detail/LedgerNodeHelpers.h>
|
||||
|
||||
#include <xrpl/basics/IntrusivePointer.h>
|
||||
#include <xrpl/basics/Slice.h>
|
||||
#include <xrpl/beast/utility/instrumentation.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
#include <xrpl/shamap/SHAMap.h>
|
||||
#include <xrpl/shamap/SHAMapLeafNode.h>
|
||||
#include <xrpl/shamap/SHAMapNodeID.h>
|
||||
#include <xrpl/shamap/SHAMapTreeNode.h>
|
||||
|
||||
#include <optional>
|
||||
#include <string>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
bool
|
||||
validateLedgerNode(protocol::TMLedgerNode const& ledger_node)
|
||||
{
|
||||
if (!ledger_node.has_nodedata())
|
||||
return false;
|
||||
|
||||
if (ledger_node.has_nodeid())
|
||||
return !ledger_node.has_id() && !ledger_node.has_depth();
|
||||
|
||||
return ledger_node.has_id() ||
|
||||
(ledger_node.has_depth() && ledger_node.depth() <= SHAMap::leafDepth);
|
||||
}
|
||||
|
||||
std::optional<intr_ptr::SharedPtr<SHAMapTreeNode>>
|
||||
getTreeNode(std::string_view data)
|
||||
{
|
||||
auto const slice = makeSlice(data);
|
||||
try
|
||||
{
|
||||
auto treeNode = SHAMapTreeNode::makeFromWire(slice);
|
||||
if (!treeNode)
|
||||
return std::nullopt;
|
||||
return treeNode;
|
||||
}
|
||||
catch (std::exception const&)
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<SHAMapNodeID>
|
||||
getSHAMapNodeID(
|
||||
protocol::TMLedgerNode const& ledger_node,
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode> const& treeNode)
|
||||
{
|
||||
if (ledger_node.has_id() || ledger_node.has_depth())
|
||||
{
|
||||
if (treeNode->isInner())
|
||||
{
|
||||
if (!ledger_node.has_id())
|
||||
return std::nullopt;
|
||||
|
||||
return deserializeSHAMapNodeID(ledger_node.id());
|
||||
}
|
||||
|
||||
if (treeNode->isLeaf())
|
||||
{
|
||||
if (!ledger_node.has_depth())
|
||||
return std::nullopt;
|
||||
|
||||
auto const key = static_cast<SHAMapLeafNode const*>(treeNode.get())->peekItem()->key();
|
||||
return SHAMapNodeID::createID(ledger_node.depth(), key);
|
||||
}
|
||||
|
||||
UNREACHABLE("xrpl::getSHAMapNodeID : tree node is neither inner nor leaf");
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
if (!ledger_node.has_nodeid())
|
||||
return std::nullopt;
|
||||
|
||||
auto const nodeID = deserializeSHAMapNodeID(ledger_node.nodeid());
|
||||
if (!nodeID.has_value())
|
||||
return std::nullopt;
|
||||
|
||||
if (treeNode->isLeaf())
|
||||
{
|
||||
auto const key = static_cast<SHAMapLeafNode const*>(treeNode.get())->peekItem()->key();
|
||||
auto const expected_id = SHAMapNodeID::createID(static_cast<int>(nodeID->getDepth()), key);
|
||||
if (nodeID->getNodeID() != expected_id.getNodeID())
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return nodeID;
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
75
src/xrpld/app/ledger/detail/LedgerNodeHelpers.h
Normal file
75
src/xrpld/app/ledger/detail/LedgerNodeHelpers.h
Normal file
@@ -0,0 +1,75 @@
|
||||
#pragma once
|
||||
|
||||
#include <xrpl/basics/IntrusivePointer.h>
|
||||
#include <xrpl/shamap/SHAMapNodeID.h>
|
||||
#include <xrpl/shamap/SHAMapTreeNode.h>
|
||||
|
||||
#include <optional>
|
||||
#include <string_view>
|
||||
|
||||
namespace protocol {
|
||||
class TMLedgerNode;
|
||||
} // namespace protocol
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/**
|
||||
* @brief Validates a ledger node proto message.
|
||||
*
|
||||
* This function checks whether a ledger node has the expected fields (for non-ledger base data):
|
||||
* - The node must have `nodedata`.
|
||||
* - If the legacy `nodeid` field is present then the new `id` and `depth` fields must not be
|
||||
* present.
|
||||
* - If the new `id` or `depth` fields are present (it is a oneof field, so only one of the two can
|
||||
* be set) then the legacy `nodeid` must not be present.
|
||||
* - If the `depth` field is present then it must be between 0 and SHAMap::leafDepth (inclusive).
|
||||
*
|
||||
* @param ledger_node The ledger node to validate.
|
||||
* @return true if the ledger node has the expected fields, false otherwise.
|
||||
*/
|
||||
[[nodiscard]] bool
|
||||
validateLedgerNode(protocol::TMLedgerNode const& ledger_node);
|
||||
|
||||
/**
|
||||
* @brief Deserializes a SHAMapTreeNode from wire format data.
|
||||
*
|
||||
* This function attempts to create a SHAMapTreeNode from the provided data string. If the data is
|
||||
* malformed or deserialization fails, the function returns std::nullopt instead of throwing an
|
||||
* exception.
|
||||
*
|
||||
* @param data The serialized node data in wire format.
|
||||
* @return An optional containing the deserialized tree node if successful, or std::nullopt if
|
||||
* deserialization fails.
|
||||
*/
|
||||
[[nodiscard]] std::optional<intr_ptr::SharedPtr<SHAMapTreeNode>>
|
||||
getTreeNode(std::string_view data);
|
||||
|
||||
/**
|
||||
* @brief Extracts or reconstructs the SHAMapNodeID from a ledger node proto message.
|
||||
*
|
||||
* This function retrieves the SHAMapNodeID for a tree node, with behavior that depends on which
|
||||
* field is set and the node type (inner vs. leaf).
|
||||
*
|
||||
* When the legacy `nodeid` field is set in the message:
|
||||
* - For all nodes: Deserializes the node ID from the field.
|
||||
* - For leaf nodes: Validates that the node ID is consistent with the leaf's key.
|
||||
*
|
||||
* When the new `id` or `depth` field is set in the message:
|
||||
* - For inner nodes: Deserializes the node ID from the `id` field. Note that root nodes are also
|
||||
* inner nodes.
|
||||
* - For leaf nodes: Reconstructs the node ID using both the depth from the `depth` field and the
|
||||
* key from the leaf node's item.
|
||||
*
|
||||
* @param ledger_node The validated protocol message containing the ledger node data.
|
||||
* @param treeNode The deserialized tree node (inner or leaf node).
|
||||
* @return An optional containing the node ID if extraction/reconstruction succeeds, or std::nullopt
|
||||
* if the required fields are missing or validation fails.
|
||||
* @note This function expects that the caller has already validated the ledger node by calling the
|
||||
* `validateLedgerNode` function and obtained a valid tree node by calling `getTreeNode`.
|
||||
*/
|
||||
[[nodiscard]] std::optional<SHAMapNodeID>
|
||||
getSHAMapNodeID(
|
||||
protocol::TMLedgerNode const& ledger_node,
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode> const& treeNode);
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -154,7 +154,7 @@ TransactionAcquire::trigger(std::shared_ptr<Peer> const& peer)
|
||||
|
||||
SHAMapAddNode
|
||||
TransactionAcquire::takeNodes(
|
||||
std::vector<std::pair<SHAMapNodeID, Slice>> const& data,
|
||||
std::vector<std::pair<SHAMapNodeID, intr_ptr::SharedPtr<SHAMapTreeNode>>>&& data,
|
||||
std::shared_ptr<Peer> const& peer)
|
||||
{
|
||||
ScopedLockType sl(mtx_);
|
||||
@@ -178,20 +178,19 @@ TransactionAcquire::takeNodes(
|
||||
|
||||
ConsensusTransSetSF sf(app_, app_.getTempNodeCache());
|
||||
|
||||
for (auto const& d : data)
|
||||
for (auto& d : data)
|
||||
{
|
||||
if (d.first.isRoot())
|
||||
{
|
||||
if (mHaveRoot)
|
||||
JLOG(journal_.debug()) << "Got root TXS node, already have it";
|
||||
else if (!mMap->addRootNode(SHAMapHash{hash_}, d.second, nullptr).isGood())
|
||||
{
|
||||
else if (!mMap->addRootNode(SHAMapHash{hash_}, std::move(d.second), nullptr)
|
||||
.isGood())
|
||||
JLOG(journal_.warn()) << "TX acquire got bad root node";
|
||||
}
|
||||
else
|
||||
mHaveRoot = true;
|
||||
}
|
||||
else if (!mMap->addKnownNode(d.first, d.second, &sf).isGood())
|
||||
else if (!mMap->addKnownNode(d.first, std::move(d.second), &sf).isGood())
|
||||
{
|
||||
JLOG(journal_.warn()) << "TX acquire got bad non-root node";
|
||||
return SHAMapAddNode::invalid();
|
||||
|
||||
@@ -20,8 +20,8 @@ public:
|
||||
|
||||
SHAMapAddNode
|
||||
takeNodes(
|
||||
std::vector<std::pair<SHAMapNodeID, Slice>> const& data,
|
||||
std::shared_ptr<Peer> const&);
|
||||
std::vector<std::pair<SHAMapNodeID, intr_ptr::SharedPtr<SHAMapTreeNode>>>&& data,
|
||||
std::shared_ptr<Peer> const& peer);
|
||||
|
||||
void
|
||||
init(int startPeers);
|
||||
|
||||
@@ -1431,6 +1431,7 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
|
||||
c,
|
||||
Role::ADMIN,
|
||||
{},
|
||||
{},
|
||||
RPC::apiMaximumSupportedVersion},
|
||||
jvCommand};
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
#include <xrpl/beast/core/CurrentThreadName.h>
|
||||
#include <xrpl/beast/net/IPAddressConversion.h>
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
|
||||
namespace xrpl {
|
||||
@@ -100,14 +99,13 @@ GRPCServerImpl::CallData<Request, Response>::process()
|
||||
// ensures that finished is always true when this CallData object
|
||||
// is returned as a tag in handleRpcs(), after sending the response
|
||||
finished_ = true;
|
||||
auto runner = app_.getJobQueue().postCoroTask(
|
||||
JobType::jtRPC, "gRPC-Client", [thisShared](auto) -> CoroTask<void> {
|
||||
thisShared->processRequest();
|
||||
co_return;
|
||||
auto coro = app_.getJobQueue().postCoro(
|
||||
JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
|
||||
thisShared->process(coro);
|
||||
});
|
||||
|
||||
// If runner is null, then the JobQueue has already been shutdown
|
||||
if (!runner)
|
||||
// If coro is null, then the JobQueue has already been shutdown
|
||||
if (!coro)
|
||||
{
|
||||
grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
|
||||
responder_.FinishWithError(status, this);
|
||||
@@ -116,7 +114,7 @@ GRPCServerImpl::CallData<Request, Response>::process()
|
||||
|
||||
template <class Request, class Response>
|
||||
void
|
||||
GRPCServerImpl::CallData<Request, Response>::processRequest()
|
||||
GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::Coro> coro)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -158,6 +156,7 @@ GRPCServerImpl::CallData<Request, Response>::processRequest()
|
||||
app_.getLedgerMaster(),
|
||||
usage,
|
||||
role,
|
||||
coro,
|
||||
InfoSub::pointer(),
|
||||
apiVersion},
|
||||
request_};
|
||||
|
||||
@@ -206,12 +206,9 @@ private:
|
||||
clone() override;
|
||||
|
||||
private:
|
||||
/**
|
||||
* Process the gRPC request. Called inside the CoroTask lambda
|
||||
* posted to the JobQueue by process().
|
||||
*/
|
||||
// process the request. Called inside the coroutine passed to JobQueue
|
||||
void
|
||||
processRequest();
|
||||
process(std::shared_ptr<JobQueue::Coro> coro);
|
||||
|
||||
// return load type of this RPC
|
||||
Resource::Charge
|
||||
|
||||
@@ -17,6 +17,7 @@ enum class ProtocolFeature {
|
||||
ValidatorListPropagation,
|
||||
ValidatorList2Propagation,
|
||||
LedgerReplay,
|
||||
LedgerNodeDepth,
|
||||
};
|
||||
|
||||
/** Represents a peer connection in the overlay. */
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include <mutex>
|
||||
#include <numeric>
|
||||
#include <sstream>
|
||||
#include <tuple>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -471,6 +472,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const
|
||||
return protocol_ >= make_protocol(2, 1);
|
||||
case ProtocolFeature::ValidatorList2Propagation:
|
||||
return protocol_ >= make_protocol(2, 2);
|
||||
case ProtocolFeature::LedgerNodeDepth:
|
||||
return protocol_ >= make_protocol(2, 3);
|
||||
case ProtocolFeature::LedgerReplay:
|
||||
return ledgerReplayEnabled_;
|
||||
}
|
||||
@@ -3237,13 +3240,19 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
{
|
||||
auto const queryDepth{m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
|
||||
|
||||
std::vector<std::pair<SHAMapNodeID, Blob>> data;
|
||||
std::vector<std::tuple<SHAMapNodeID, Blob, bool>> data;
|
||||
auto const useLedgerNodeDepth = supportsFeature(ProtocolFeature::LedgerNodeDepth);
|
||||
|
||||
for (int i = 0;
|
||||
i < m->nodeids_size() && ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
|
||||
++i)
|
||||
{
|
||||
auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
|
||||
if (!shaMapNodeId.has_value())
|
||||
{
|
||||
JLOG(p_journal_.error()) << "processLedgerRequest: Invalid SHAMap node ID";
|
||||
return;
|
||||
}
|
||||
|
||||
data.clear();
|
||||
data.reserve(Tuning::softMaxReplyNodes);
|
||||
@@ -3259,9 +3268,22 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
{
|
||||
if (ledgerData.nodes_size() >= Tuning::hardMaxReplyNodes)
|
||||
break;
|
||||
|
||||
protocol::TMLedgerNode* node{ledgerData.add_nodes()};
|
||||
node->set_nodeid(d.first.getRawString());
|
||||
node->set_nodedata(d.second.data(), d.second.size());
|
||||
|
||||
auto const& nodeData = std::get<1>(d);
|
||||
node->set_nodedata(nodeData.data(), nodeData.size());
|
||||
|
||||
// When the LedgerNodeDepth protocol feature is not supported by the peer,
|
||||
// we always set the `nodeid` field. However, when it is supported then we
|
||||
// set the `id` field for inner nodes and the `depth` field for leaf nodes.
|
||||
auto const& nodeID = std::get<0>(d);
|
||||
if (!useLedgerNodeDepth)
|
||||
node->set_nodeid(nodeID.getRawString());
|
||||
else if (std::get<2>(d))
|
||||
node->set_depth(nodeID.getDepth());
|
||||
else
|
||||
node->set_id(nodeID.getRawString());
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
@@ -21,7 +21,8 @@ namespace xrpl {
|
||||
constexpr ProtocolVersion const supportedProtocolList[]
|
||||
{
|
||||
{2, 1},
|
||||
{2, 2}
|
||||
{2, 2},
|
||||
{2, 3},
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
#include <xrpld/rpc/Role.h>
|
||||
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/server/InfoSub.h>
|
||||
|
||||
namespace xrpl {
|
||||
@@ -23,6 +24,7 @@ struct Context
|
||||
LedgerMaster& ledgerMaster;
|
||||
Resource::Consumer& consumer;
|
||||
Role role;
|
||||
std::shared_ptr<JobQueue::Coro> coro{};
|
||||
InfoSub::pointer infoSub{};
|
||||
unsigned int apiVersion;
|
||||
};
|
||||
|
||||
@@ -169,10 +169,13 @@ public:
|
||||
|
||||
private:
|
||||
Json::Value
|
||||
processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv);
|
||||
processSession(
|
||||
std::shared_ptr<WSSession> const& session,
|
||||
std::shared_ptr<JobQueue::Coro> const& coro,
|
||||
Json::Value const& jv);
|
||||
|
||||
void
|
||||
processSession(std::shared_ptr<Session> const&);
|
||||
processSession(std::shared_ptr<Session> const&, std::shared_ptr<JobQueue::Coro> coro);
|
||||
|
||||
void
|
||||
processRequest(
|
||||
@@ -180,6 +183,7 @@ private:
|
||||
std::string const& request,
|
||||
beast::IP::Endpoint const& remoteIPAddress,
|
||||
Output&&,
|
||||
std::shared_ptr<JobQueue::Coro> coro,
|
||||
std::string_view forwardedFor,
|
||||
std::string_view user);
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
#include <xrpl/basics/make_SSLContext.h>
|
||||
#include <xrpl/beast/net/IPAddressConversion.h>
|
||||
#include <xrpl/beast/rfc2616.h>
|
||||
#include <xrpl/core/CoroTask.h>
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/json/json_reader.h>
|
||||
#include <xrpl/json/to_string.h>
|
||||
@@ -285,17 +284,9 @@ ServerHandler::onRequest(Session& session)
|
||||
}
|
||||
|
||||
std::shared_ptr<Session> detachedSession = session.detach();
|
||||
auto const postResult = m_jobQueue.postCoroTask(
|
||||
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](auto) -> CoroTask<void> {
|
||||
try
|
||||
{
|
||||
processSession(detachedSession);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(m_journal.error()) << "RPC-Client coroutine exception: " << e.what();
|
||||
}
|
||||
co_return;
|
||||
auto const postResult = m_jobQueue.postCoro(
|
||||
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
|
||||
processSession(detachedSession, coro);
|
||||
});
|
||||
if (postResult == nullptr)
|
||||
{
|
||||
@@ -331,26 +322,17 @@ ServerHandler::onWSMessage(
|
||||
|
||||
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
|
||||
|
||||
auto const postResult = m_jobQueue.postCoroTask(
|
||||
auto const postResult = m_jobQueue.postCoro(
|
||||
jtCLIENT_WEBSOCKET,
|
||||
"WS-Client",
|
||||
[this, session, jv = std::move(jv)](auto) -> CoroTask<void> {
|
||||
try
|
||||
{
|
||||
auto const jr = this->processSession(session, jv);
|
||||
auto const s = to_string(jr);
|
||||
auto const n = s.length();
|
||||
boost::beast::multi_buffer sb(n);
|
||||
sb.commit(
|
||||
boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
|
||||
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
|
||||
session->complete();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(m_journal.error()) << "WS-Client coroutine exception: " << e.what();
|
||||
}
|
||||
co_return;
|
||||
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
|
||||
auto const jr = this->processSession(session, coro, jv);
|
||||
auto const s = to_string(jr);
|
||||
auto const n = s.length();
|
||||
boost::beast::multi_buffer sb(n);
|
||||
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
|
||||
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
|
||||
session->complete();
|
||||
});
|
||||
if (postResult == nullptr)
|
||||
{
|
||||
@@ -391,7 +373,10 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ
|
||||
}
|
||||
|
||||
Json::Value
|
||||
ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv)
|
||||
ServerHandler::processSession(
|
||||
std::shared_ptr<WSSession> const& session,
|
||||
std::shared_ptr<JobQueue::Coro> const& coro,
|
||||
Json::Value const& jv)
|
||||
{
|
||||
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
|
||||
if (is->getConsumer().disconnect(m_journal))
|
||||
@@ -458,6 +443,7 @@ ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::V
|
||||
app_.getLedgerMaster(),
|
||||
is->getConsumer(),
|
||||
role,
|
||||
coro,
|
||||
is,
|
||||
apiVersion},
|
||||
jv,
|
||||
@@ -528,14 +514,18 @@ ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::V
|
||||
return jr;
|
||||
}
|
||||
|
||||
// Run as a coroutine.
|
||||
void
|
||||
ServerHandler::processSession(std::shared_ptr<Session> const& session)
|
||||
ServerHandler::processSession(
|
||||
std::shared_ptr<Session> const& session,
|
||||
std::shared_ptr<JobQueue::Coro> coro)
|
||||
{
|
||||
processRequest(
|
||||
session->port(),
|
||||
buffers_to_string(session->request().body().data()),
|
||||
session->remoteAddress().at_port(0),
|
||||
makeOutput(*session),
|
||||
coro,
|
||||
forwardedFor(session->request()),
|
||||
[&] {
|
||||
auto const iter = session->request().find("X-User");
|
||||
@@ -572,6 +562,7 @@ ServerHandler::processRequest(
|
||||
std::string const& request,
|
||||
beast::IP::Endpoint const& remoteIPAddress,
|
||||
Output&& output,
|
||||
std::shared_ptr<JobQueue::Coro> coro,
|
||||
std::string_view forwardedFor,
|
||||
std::string_view user)
|
||||
{
|
||||
@@ -828,6 +819,7 @@ ServerHandler::processRequest(
|
||||
app_.getLedgerMaster(),
|
||||
usage,
|
||||
role,
|
||||
coro,
|
||||
InfoSub::pointer(),
|
||||
apiVersion},
|
||||
params,
|
||||
|
||||
@@ -7,9 +7,6 @@
|
||||
#include <xrpl/protocol/RPCErr.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
// This interface is deprecated.
|
||||
@@ -40,40 +37,98 @@ doRipplePathFind(RPC::JsonContext& context)
|
||||
PathRequest::pointer request;
|
||||
lpLedger = context.ledgerMaster.getClosedLedger();
|
||||
|
||||
// makeLegacyPathRequest enqueues a path-finding job that runs
|
||||
// asynchronously. We block this thread with a condition_variable
|
||||
// until the path-finding continuation signals completion.
|
||||
// If makeLegacyPathRequest cannot schedule the job (e.g. during
|
||||
// shutdown), it returns an empty request and we skip the wait.
|
||||
// Replaces the old Coro yield/resume pattern with synchronous
|
||||
// blocking, eliminating shutdown race conditions.
|
||||
std::mutex mtx;
|
||||
std::condition_variable cv;
|
||||
bool pathDone = false;
|
||||
|
||||
// It doesn't look like there's much odd happening here, but you should
|
||||
// be aware this code runs in a JobQueue::Coro, which is a coroutine.
|
||||
// And we may be flipping around between threads. Here's an overview:
|
||||
//
|
||||
// 1. We're running doRipplePathFind() due to a call to
|
||||
// ripple_path_find. doRipplePathFind() is currently running
|
||||
// inside of a JobQueue::Coro using a JobQueue thread.
|
||||
//
|
||||
// 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the
|
||||
// path-finding request. That request will (probably) run at some
|
||||
// indeterminate future time on a (probably different) JobQueue
|
||||
// thread.
|
||||
//
|
||||
// 3. As a continuation from that path-finding JobQueue thread, the
|
||||
// coroutine we're currently running in (!) is posted to the
|
||||
// JobQueue. Because it is a continuation, that post won't
|
||||
// happen until the path-finding request completes.
|
||||
//
|
||||
// 4. Once the continuation is enqueued, and we have reason to think
|
||||
// the path-finding job is likely to run, then the coroutine we're
|
||||
// running in yield()s. That means it surrenders its thread in
|
||||
// the JobQueue. The coroutine is suspended, but ready to run,
|
||||
// because it is kept resident by a shared_ptr in the
|
||||
// path-finding continuation.
|
||||
//
|
||||
// 5. If all goes well then path-finding runs on a JobQueue thread
|
||||
// and executes its continuation. The continuation posts this
|
||||
// same coroutine (!) to the JobQueue.
|
||||
//
|
||||
// 6. When the JobQueue calls this coroutine, this coroutine resumes
|
||||
// from the line below the coro->yield() and returns the
|
||||
// path-finding result.
|
||||
//
|
||||
// With so many moving parts, what could go wrong?
|
||||
//
|
||||
// Just in terms of the JobQueue refusing to add jobs at shutdown
|
||||
// there are two specific things that can go wrong.
|
||||
//
|
||||
// 1. The path-finding Job queued by makeLegacyPathRequest() might be
|
||||
// rejected (because we're shutting down).
|
||||
//
|
||||
// Fortunately this problem can be addressed by looking at the
|
||||
// return value of makeLegacyPathRequest(). If
|
||||
// makeLegacyPathRequest() cannot get a thread to run the path-find
|
||||
// on, then it returns an empty request.
|
||||
//
|
||||
// 2. The path-finding job might run, but the Coro::post() might be
|
||||
// rejected by the JobQueue (because we're shutting down).
|
||||
//
|
||||
// We handle this case by resuming (not posting) the Coro.
|
||||
// By resuming the Coro, we allow the Coro to run to completion
|
||||
// on the current thread instead of requiring that it run on a
|
||||
// new thread from the JobQueue.
|
||||
//
|
||||
// Both of these failure modes are hard to recreate in a unit test
|
||||
// because they are so dependent on inter-thread timing. However
|
||||
// the failure modes can be observed by synchronously (inside the
|
||||
// rippled source code) shutting down the application. The code to
|
||||
// do so looks like this:
|
||||
//
|
||||
// context.app.signalStop();
|
||||
// while (! context.app.getJobQueue().jobCounter().joined()) { }
|
||||
//
|
||||
// The first line starts the process of shutting down the app.
|
||||
// The second line waits until no more jobs can be added to the
|
||||
// JobQueue before letting the thread continue.
|
||||
//
|
||||
// May 2017
|
||||
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
|
||||
request,
|
||||
[&]() {
|
||||
[&context]() {
|
||||
// Copying the shared_ptr keeps the coroutine alive up
|
||||
// through the return. Otherwise the storage under the
|
||||
// captured reference could evaporate when we return from
|
||||
// coroCopy->resume(). This is not strictly necessary, but
|
||||
// will make maintenance easier.
|
||||
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
|
||||
if (!coroCopy->post())
|
||||
{
|
||||
std::lock_guard lk(mtx);
|
||||
pathDone = true;
|
||||
// The post() failed, so we won't get a thread to let
|
||||
// the Coro finish. We'll call Coro::resume() so the
|
||||
// Coro can finish on our thread. Otherwise the
|
||||
// application will hang on shutdown.
|
||||
coroCopy->resume();
|
||||
}
|
||||
cv.notify_one();
|
||||
},
|
||||
context.consumer,
|
||||
lpLedger,
|
||||
context.params);
|
||||
if (request)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
std::unique_lock lk(mtx);
|
||||
if (!cv.wait_for(lk, 30s, [&] { return pathDone; }))
|
||||
{
|
||||
// Path-finding continuation never fired (e.g. shutdown
|
||||
// race or unexpected failure). Return an internal error
|
||||
// rather than blocking the RPC thread indefinitely.
|
||||
return rpcError(rpcINTERNAL);
|
||||
}
|
||||
context.coro->yield();
|
||||
jvResult = request->doStatus(context.params);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user